diff --git a/lib/_stream_wrap.js b/lib/_stream_wrap.js deleted file mode 100644 index c3dcfe51b6c9b5..00000000000000 --- a/lib/_stream_wrap.js +++ /dev/null @@ -1,118 +0,0 @@ -const util = require('util'); -const Socket = require('net').Socket; -const JSStream = process.binding('js_stream').JSStream; -const uv = process.binding('uv'); - -function StreamWrap(stream) { - var handle = new JSStream(); - - this.stream = stream; - - var self = this; - handle.close = function(cb) { - cb(); - }; - handle.isAlive = function() { - return self.isAlive(); - }; - handle.isClosing = function() { - return self.isClosing(); - }; - handle.onreadstart = function() { - return self.readStart(); - }; - handle.onreadstop = function() { - return self.readStop(); - }; - handle.onshutdown = function(req) { - return self.shutdown(req); - }; - handle.onwrite = function(req, bufs) { - return self.write(req, bufs); - }; - - this.stream.pause(); - this.stream.on('data', function(chunk) { - self._handle.readBuffer(chunk); - }); - this.stream.once('end', function() { - self._handle.emitEOF(); - }); - this.stream.on('error', function(err) { - self.emit('error', err); - }); - - Socket.call(this, { - handle: handle - }); -} -util.inherits(StreamWrap, Socket); -module.exports = StreamWrap; - -// require('_stream_wrap').StreamWrap -StreamWrap.StreamWrap = StreamWrap; - -StreamWrap.prototype.isAlive = function isAlive() { - return this.readable && this.writable; -}; - -StreamWrap.prototype.isClosing = function isClosing() { - return !this.isAlive(); -}; - -StreamWrap.prototype.readStart = function readStart() { - this.stream.resume(); - return 0; -}; - -StreamWrap.prototype.readStop = function readStop() { - this.stream.pause(); - return 0; -}; - -StreamWrap.prototype.shutdown = function shutdown(req) { - var self = this; - - this.stream.end(function() { - // Ensure that write was dispatched - setImmediate(function() { - self._handle.finishShutdown(req, 0); - }); - }); - return 0; -}; - -StreamWrap.prototype.write = function write(req, bufs) { - var pending = bufs.length; - var self = this; - - self.stream.cork(); - bufs.forEach(function(buf) { - self.stream.write(buf, done); - }); - self.stream.uncork(); - - function done(err) { - if (!err && --pending !== 0) - return; - - // Ensure that this is called once in case of error - pending = 0; - - // Ensure that write was dispatched - setImmediate(function() { - var errCode = 0; - if (err) { - if (err.code && uv['UV_' + err.code]) - errCode = uv['UV_' + err.code]; - else - errCode = uv.UV_EPIPE; - } - - self._handle.doAfterWrite(req); - self._handle.finishWrite(req, errCode); - }); - } - - return 0; -}; diff --git a/lib/_tls_legacy.js b/lib/_tls_legacy.js index fc0d115aee2e45..4148085503fc64 100644 --- a/lib/_tls_legacy.js +++ b/lib/_tls_legacy.js @@ -92,11 +92,11 @@ function onCryptoStreamFinish() { // Generate close notify // NOTE: first call checks if client has sent us shutdown, // second call enqueues shutdown into the BIO. - if (this.pair.ssl.shutdownSSL() !== 1) { + if (this.pair.ssl.shutdown() !== 1) { if (this.pair.ssl && this.pair.ssl.error) return this.pair.error(); - this.pair.ssl.shutdownSSL(); + this.pair.ssl.shutdown(); } if (this.pair.ssl && this.pair.ssl.error) diff --git a/lib/_tls_wrap.js b/lib/_tls_wrap.js index fcc216bf289dfb..fb63667581c54f 100644 --- a/lib/_tls_wrap.js +++ b/lib/_tls_wrap.js @@ -7,29 +7,18 @@ const tls = require('tls'); const util = require('util'); const listenerCount = require('events').listenerCount; const common = require('_tls_common'); -const StreamWrap = require('_stream_wrap').StreamWrap; -const Duplex = require('stream').Duplex; const debug = util.debuglog('tls'); const Timer = process.binding('timer_wrap').Timer; const tls_wrap = process.binding('tls_wrap'); -// constructor for lazy loading -function createTCP() { - var TCP = process.binding('tcp_wrap').TCP; - return new TCP(); -} - -// constructor for lazy loading -function createPipe() { - var Pipe = process.binding('pipe_wrap').Pipe; - return new Pipe(); -} +// Lazy load +var tls_legacy; function onhandshakestart() { debug('onhandshakestart'); var self = this; - var ssl = self._handle; + var ssl = self.ssl; var now = Timer.now(); assert(now >= ssl.lastHandshakeTime); @@ -74,7 +63,7 @@ function loadSession(self, hello, cb) { // NOTE: That we have disabled OpenSSL's internal session storage in // `node_crypto.cc` and hence its safe to rely on getting servername only // from clienthello or this place. - var ret = self._handle.loadSession(session); + var ret = self.ssl.loadSession(session); cb(null, ret); } @@ -103,9 +92,9 @@ function loadSNI(self, servername, cb) { // TODO(indutny): eventually disallow raw `SecureContext` if (context) - self._handle.sni_context = context.context || context; + self.ssl.sni_context = context.context || context; - cb(null, self._handle.sni_context); + cb(null, self.ssl.sni_context); }); } @@ -138,7 +127,7 @@ function requestOCSP(self, hello, ctx, cb) { return cb(err); if (response) - self._handle.setOCSPResponse(response); + self.ssl.setOCSPResponse(response); cb(null); } } @@ -172,7 +161,7 @@ function onclienthello(hello) { if (err) return self.destroy(err); - self._handle.endParser(); + self.ssl.endParser(); }); }); }); @@ -195,7 +184,7 @@ function onnewsession(key, session) { return; once = true; - self._handle.newSessionDone(); + self.ssl.newSessionDone(); self._newSessionPending = false; if (self._securePending) @@ -215,43 +204,47 @@ function onocspresponse(resp) { */ function TLSSocket(socket, options) { + // Disallow wrapping TLSSocket in TLSSocket + assert(!(socket instanceof TLSSocket)); + + net.Socket.call(this, { + handle: socket && socket._handle, + allowHalfOpen: socket && socket.allowHalfOpen, + readable: false, + writable: false + }); + if (socket) { + this._parent = socket; + + // To prevent assertion in afterConnect() + this._connecting = socket._connecting; + } + this._tlsOptions = options; this._secureEstablished = false; this._securePending = false; this._newSessionPending = false; this._controlReleased = false; this._SNICallback = null; + this.ssl = null; this.servername = null; this.npnProtocol = null; this.authorized = false; this.authorizationError = null; - // Wrap plain JS Stream into StreamWrap - var wrap; - if (!(socket instanceof net.Socket) && socket instanceof Duplex) - wrap = new StreamWrap(socket); - else if ((socket instanceof net.Socket) && !socket._handle) - wrap = new StreamWrap(socket); - else - wrap = socket; - // Just a documented property to make secure sockets // distinguishable from regular ones. this.encrypted = true; - net.Socket.call(this, { - handle: this._wrapHandle(wrap && wrap._handle), - allowHalfOpen: socket && socket.allowHalfOpen, - readable: false, - writable: false - }); - - // Proxy for API compatibility - this.ssl = this._handle; - this.on('error', this._tlsError); - this._init(socket, wrap); + if (!this._handle) { + this.once('connect', function() { + this._init(null); + }); + } else { + this._init(socket); + } // Make sure to setup all required properties like: `_connecting` before // starting the flow of the data @@ -262,63 +255,23 @@ function TLSSocket(socket, options) { util.inherits(TLSSocket, net.Socket); exports.TLSSocket = TLSSocket; -var proxiedMethods = [ - 'close', 'ref', 'unref', 'open', 'bind', 'listen', 'connect', 'bind6', - 'connect6', 'getsockname', 'getpeername', 'setNoDelay', 'setKeepAlive', - 'setSimultaneousAccepts', 'setBlocking', - - // PipeWrap - 'setPendingInstances' -]; +TLSSocket.prototype._init = function(socket) { + assert(this._handle); -// Proxy HandleWrap, PipeWrap and TCPWrap methods -proxiedMethods.forEach(function(name) { - tls_wrap.TLSWrap.prototype[name] = function methodProxy() { - if (this._parent[name]) - return this._parent[name].apply(this._parent, arguments); - }; -}); - -TLSSocket.prototype._wrapHandle = function(handle) { - var res; + // lib/net.js expect this value to be non-zero if write hasn't been flushed + // immediately + // TODO(indutny): rewise this solution, it might be 1 before handshake and + // represent real writeQueueSize during regular writes. + this._handle.writeQueueSize = 1; + var self = this; var options = this._tlsOptions; - if (!handle) { - handle = options.pipe ? createPipe() : createTCP(); - handle.owner = this; - } // Wrap socket's handle var context = options.secureContext || options.credentials || tls.createSecureContext(); - res = tls_wrap.wrap(handle, context.context, options.isServer); - res._parent = handle; - res._secureContext = context; - res.reading = handle.reading; - Object.defineProperty(handle, 'reading', { - get: function readingGetter() { - return res.reading; - }, - set: function readingSetter(value) { - res.reading = value; - } - }); - - return res; -}; - -TLSSocket.prototype._init = function(socket, wrap) { - var self = this; - var options = this._tlsOptions; - var ssl = this._handle; - - // lib/net.js expect this value to be non-zero if write hasn't been flushed - // immediately - // TODO(indutny): rewise this solution, it might be 1 before handshake and - // represent real writeQueueSize during regular writes. - ssl.writeQueueSize = 1; - + this.ssl = tls_wrap.wrap(this._handle, context.context, options.isServer); this.server = options.server || null; // For clients, we will always have either a given ca list or be using @@ -329,32 +282,32 @@ TLSSocket.prototype._init = function(socket, wrap) { this._requestCert = requestCert; this._rejectUnauthorized = rejectUnauthorized; if (requestCert || rejectUnauthorized) - ssl.setVerifyMode(requestCert, rejectUnauthorized); + this.ssl.setVerifyMode(requestCert, rejectUnauthorized); if (options.isServer) { - ssl.onhandshakestart = onhandshakestart.bind(this); - ssl.onhandshakedone = onhandshakedone.bind(this); - ssl.onclienthello = onclienthello.bind(this); - ssl.onnewsession = onnewsession.bind(this); - ssl.lastHandshakeTime = 0; - ssl.handshakes = 0; + this.ssl.onhandshakestart = onhandshakestart.bind(this); + this.ssl.onhandshakedone = onhandshakedone.bind(this); + this.ssl.onclienthello = onclienthello.bind(this); + this.ssl.onnewsession = onnewsession.bind(this); + this.ssl.lastHandshakeTime = 0; + this.ssl.handshakes = 0; if (this.server && (listenerCount(this.server, 'resumeSession') > 0 || listenerCount(this.server, 'newSession') > 0 || listenerCount(this.server, 'OCSPRequest') > 0)) { - ssl.enableSessionCallbacks(); + this.ssl.enableSessionCallbacks(); } } else { - ssl.onhandshakestart = function() {}; - ssl.onhandshakedone = this._finishInit.bind(this); - ssl.onocspresponse = onocspresponse.bind(this); + this.ssl.onhandshakestart = function() {}; + this.ssl.onhandshakedone = this._finishInit.bind(this); + this.ssl.onocspresponse = onocspresponse.bind(this); if (options.session) - ssl.setSession(options.session); + this.ssl.setSession(options.session); } - ssl.onerror = function(err) { + this.ssl.onerror = function(err) { if (self._writableState.errorEmitted) return; self._writableState.errorEmitted = true; @@ -384,41 +337,20 @@ TLSSocket.prototype._init = function(socket, wrap) { options.server._contexts.length)) { assert(typeof options.SNICallback === 'function'); this._SNICallback = options.SNICallback; - ssl.enableHelloParser(); + this.ssl.enableHelloParser(); } if (process.features.tls_npn && options.NPNProtocols) - ssl.setNPNProtocols(options.NPNProtocols); + this.ssl.setNPNProtocols(options.NPNProtocols); if (options.handshakeTimeout > 0) this.setTimeout(options.handshakeTimeout, this._handleTimeout); // Socket already has some buffered data - emulate receiving it - if (socket && socket._readableState && socket._readableState.length) { + if (socket && socket._readableState.length) { var buf; while ((buf = socket.read()) !== null) - ssl.receive(buf); - } - - if (socket instanceof net.Socket) { - this._parent = socket; - - // To prevent assertion in afterConnect() and properly kick off readStart - this._connecting = socket._connecting || !socket._handle; - socket.once('connect', function() { - self._connecting = false; - self.emit('connect'); - }); - } - - // Assume `tls.connect()` - if (wrap) { - wrap.on('error', function(err) { - self._tlsError(err); - }); - } else { - assert(!socket); - this._connecting = true; + this.ssl.receive(buf); } }; @@ -433,11 +365,11 @@ TLSSocket.prototype.renegotiate = function(options, callback) { if (requestCert !== this._requestCert || rejectUnauthorized !== this._rejectUnauthorized) { - this._handle.setVerifyMode(requestCert, rejectUnauthorized); + this.ssl.setVerifyMode(requestCert, rejectUnauthorized); this._requestCert = requestCert; this._rejectUnauthorized = rejectUnauthorized; } - if (!this._handle.renegotiate()) { + if (!this.ssl.renegotiate()) { if (callback) { process.nextTick(function() { callback(new Error('Failed to renegotiate')); @@ -459,11 +391,11 @@ TLSSocket.prototype.renegotiate = function(options, callback) { }; TLSSocket.prototype.setMaxSendFragment = function setMaxSendFragment(size) { - return this._handle.setMaxSendFragment(size) == 1; + return this.ssl.setMaxSendFragment(size) == 1; }; TLSSocket.prototype.getTLSTicket = function getTLSTicket() { - return this._handle.getTLSTicket(); + return this.ssl.getTLSTicket(); }; TLSSocket.prototype._handleTimeout = function() { @@ -492,11 +424,11 @@ TLSSocket.prototype._finishInit = function() { } if (process.features.tls_npn) { - this.npnProtocol = this._handle.getNegotiatedProtocol(); + this.npnProtocol = this.ssl.getNegotiatedProtocol(); } if (process.features.tls_sni && this._tlsOptions.isServer) { - this.servername = this._handle.getServername(); + this.servername = this.ssl.getServername(); } debug('secure established'); @@ -507,57 +439,49 @@ TLSSocket.prototype._finishInit = function() { }; TLSSocket.prototype._start = function() { - if (this._connecting) { - this.once('connect', function() { - this._start(); - }); - return; - } - - debug('start'); if (this._tlsOptions.requestOCSP) - this._handle.requestOCSP(); - this._handle.start(); + this.ssl.requestOCSP(); + this.ssl.start(); }; TLSSocket.prototype.setServername = function(name) { - this._handle.setServername(name); + this.ssl.setServername(name); }; TLSSocket.prototype.setSession = function(session) { if (typeof session === 'string') session = new Buffer(session, 'binary'); - this._handle.setSession(session); + this.ssl.setSession(session); }; TLSSocket.prototype.getPeerCertificate = function(detailed) { - if (this._handle) { + if (this.ssl) { return common.translatePeerCertificate( - this._handle.getPeerCertificate(detailed)); + this.ssl.getPeerCertificate(detailed)); } return null; }; TLSSocket.prototype.getSession = function() { - if (this._handle) { - return this._handle.getSession(); + if (this.ssl) { + return this.ssl.getSession(); } return null; }; TLSSocket.prototype.isSessionReused = function() { - if (this._handle) { - return this._handle.isSessionReused(); + if (this.ssl) { + return this.ssl.isSessionReused(); } return null; }; TLSSocket.prototype.getCipher = function(err) { - if (this._handle) { - return this._handle.getCurrentCipher(); + if (this.ssl) { + return this.ssl.getCurrentCipher(); } else { return null; } @@ -696,7 +620,7 @@ function Server(/* [options], listener */) { socket.on('secure', function() { if (socket._requestCert) { - var verifyError = socket._handle.verifyError(); + var verifyError = socket.ssl.verifyError(); if (verifyError) { socket.authorizationError = verifyError.code; @@ -851,6 +775,28 @@ function normalizeConnectArgs(listArgs) { return (cb) ? [options, cb] : [options]; } +function legacyConnect(hostname, options, NPN, context) { + assert(options.socket); + if (!tls_legacy) + tls_legacy = require('_tls_legacy'); + + var pair = tls_legacy.createSecurePair(context, + false, + true, + !!options.rejectUnauthorized, + { + NPNProtocols: NPN.NPNProtocols, + servername: hostname + }); + tls_legacy.pipe(pair, options.socket); + pair.cleartext._controlReleased = true; + pair.on('error', function(err) { + pair.cleartext.emit('error', err); + }); + + return pair; +} + exports.connect = function(/* [port, host], options, cb */) { var args = normalizeConnectArgs(arguments); var options = args[0]; @@ -873,21 +819,51 @@ exports.connect = function(/* [port, host], options, cb */) { context = tls.createSecureContext(options); tls.convertNPNProtocols(options.NPNProtocols, NPN); - var socket = new TLSSocket(options.socket, { - pipe: options.path && !options.port, - secureContext: context, - isServer: false, - requestCert: true, - rejectUnauthorized: options.rejectUnauthorized, - session: options.session, - NPNProtocols: NPN.NPNProtocols, - requestOCSP: options.requestOCSP - }); + // Wrapping TLS socket inside another TLS socket was requested - + // create legacy secure pair + var socket; + var legacy; + var result; + if (options.socket instanceof TLSSocket) { + debug('legacy connect'); + legacy = true; + socket = legacyConnect(hostname, options, NPN, context); + result = socket.cleartext; + } else { + legacy = false; + socket = new TLSSocket(options.socket, { + secureContext: context, + isServer: false, + requestCert: true, + rejectUnauthorized: options.rejectUnauthorized, + session: options.session, + NPNProtocols: NPN.NPNProtocols, + requestOCSP: options.requestOCSP + }); + result = socket; + } + + if (socket._handle && !socket._connecting) { + onHandle(); + } else { + // Not even started connecting yet (or probably resolving dns address), + // catch socket errors and assign handle. + if (!legacy && options.socket) { + options.socket.once('connect', function() { + assert(options.socket._handle); + socket._handle = options.socket._handle; + socket._handle.owner = socket; + socket.emit('connect'); + }); + } + socket.once('connect', onHandle); + } if (cb) - socket.once('secureConnect', cb); + result.once('secureConnect', cb); if (!options.socket) { + assert(!legacy); var connect_opt; if (options.path && !options.port) { connect_opt = { path: options.path }; @@ -898,62 +874,63 @@ exports.connect = function(/* [port, host], options, cb */) { localAddress: options.localAddress }; } - socket.connect(connect_opt, function() { - socket._start(); - }); + socket.connect(connect_opt); } - socket._releaseControl(); - - if (options.session) - socket.setSession(options.session); + return result; - if (options.servername) - socket.setServername(options.servername); + function onHandle() { + if (!legacy) + socket._releaseControl(); - if (options.socket) - socket._start(); + if (options.session) + socket.setSession(options.session); - socket.on('secure', function() { - var verifyError = socket._handle.verifyError(); + if (!legacy) { + if (options.servername) + socket.setServername(options.servername); - // Verify that server's identity matches it's certificate's names - if (!verifyError) { - var cert = socket.getPeerCertificate(); - verifyError = options.checkServerIdentity(hostname, cert); + socket._start(); } + socket.on('secure', function() { + var verifyError = socket.ssl.verifyError(); - if (verifyError) { - socket.authorized = false; - socket.authorizationError = verifyError.code || verifyError.message; + // Verify that server's identity matches it's certificate's names + if (!verifyError) { + var cert = result.getPeerCertificate(); + verifyError = options.checkServerIdentity(hostname, cert); + } - if (options.rejectUnauthorized) { - socket.emit('error', verifyError); - socket.destroy(); - return; + if (verifyError) { + result.authorized = false; + result.authorizationError = verifyError.code || verifyError.message; + + if (options.rejectUnauthorized) { + result.emit('error', verifyError); + result.destroy(); + return; + } else { + result.emit('secureConnect'); + } } else { - socket.emit('secureConnect'); + result.authorized = true; + result.emit('secureConnect'); } - } else { - socket.authorized = true; - socket.emit('secureConnect'); - } - // Uncork incoming data - socket.removeListener('end', onHangUp); - }); + // Uncork incoming data + result.removeListener('end', onHangUp); + }); - function onHangUp() { - // NOTE: This logic is shared with _http_client.js - if (!socket._hadError) { - socket._hadError = true; - var error = new Error('socket hang up'); - error.code = 'ECONNRESET'; - socket.destroy(); - socket.emit('error', error); + function onHangUp() { + // NOTE: This logic is shared with _http_client.js + if (!socket._hadError) { + socket._hadError = true; + var error = new Error('socket hang up'); + error.code = 'ECONNRESET'; + socket.destroy(); + socket.emit('error', error); + } } + result.once('end', onHangUp); } - socket.once('end', onHangUp); - - return socket; }; diff --git a/lib/net.js b/lib/net.js index aafb707ca45654..c4ac794cc41e2f 100644 --- a/lib/net.js +++ b/lib/net.js @@ -976,9 +976,7 @@ function afterConnect(status, handle, req, readable, writable) { return; } - // Update handle if it was wrapped - // TODO(indutny): assert that the handle is actually an ancestor of old one - handle = self._handle; + assert(handle === self._handle, 'handle != self._handle'); debug('afterConnect'); diff --git a/node.gyp b/node.gyp index cae5340b273a48..4963cf6e1318a5 100644 --- a/node.gyp +++ b/node.gyp @@ -55,7 +55,6 @@ 'lib/_stream_duplex.js', 'lib/_stream_transform.js', 'lib/_stream_passthrough.js', - 'lib/_stream_wrap.js', 'lib/string_decoder.js', 'lib/sys.js', 'lib/timers.js', @@ -95,7 +94,6 @@ 'src/fs_event_wrap.cc', 'src/cares_wrap.cc', 'src/handle_wrap.cc', - 'src/js_stream.cc', 'src/node.cc', 'src/node_buffer.cc', 'src/node_constants.cc', @@ -116,7 +114,6 @@ 'src/smalloc.cc', 'src/spawn_sync.cc', 'src/string_bytes.cc', - 'src/stream_base.cc', 'src/stream_wrap.cc', 'src/tcp_wrap.cc', 'src/timer_wrap.cc', @@ -133,7 +130,6 @@ 'src/env.h', 'src/env-inl.h', 'src/handle_wrap.h', - 'src/js_stream.h', 'src/node.h', 'src/node_buffer.h', 'src/node_constants.h', @@ -154,8 +150,6 @@ 'src/req-wrap.h', 'src/req-wrap-inl.h', 'src/string_bytes.h', - 'src/stream_base.h', - 'src/stream_base-inl.h', 'src/stream_wrap.h', 'src/tree.h', 'src/util.h', diff --git a/src/async-wrap.h b/src/async-wrap.h index 5e898fe4c24534..86748a5fefd89b 100644 --- a/src/async-wrap.h +++ b/src/async-wrap.h @@ -17,7 +17,6 @@ namespace node { V(FSREQWRAP) \ V(GETADDRINFOREQWRAP) \ V(GETNAMEINFOREQWRAP) \ - V(JSSTREAM) \ V(PIPEWRAP) \ V(PROCESSWRAP) \ V(QUERYWRAP) \ diff --git a/src/env.h b/src/env.h index 73940ad0d483fc..d779279351efe9 100644 --- a/src/env.h +++ b/src/env.h @@ -108,8 +108,6 @@ namespace node { V(ipv4_string, "IPv4") \ V(ipv6_lc_string, "ipv6") \ V(ipv6_string, "IPv6") \ - V(isalive_string, "isAlive") \ - V(isclosing_string, "isClosing") \ V(issuer_string, "issuer") \ V(issuercert_string, "issuerCertificate") \ V(kill_signal_string, "killSignal") \ @@ -144,13 +142,9 @@ namespace node { V(onnewsessiondone_string, "onnewsessiondone") \ V(onocspresponse_string, "onocspresponse") \ V(onread_string, "onread") \ - V(onreadstart_string, "onreadstart") \ - V(onreadstop_string, "onreadstop") \ V(onselect_string, "onselect") \ - V(onshutdown_string, "onshutdown") \ V(onsignal_string, "onsignal") \ V(onstop_string, "onstop") \ - V(onwrite_string, "onwrite") \ V(output_string, "output") \ V(order_string, "order") \ V(owner_string, "owner") \ @@ -232,7 +226,6 @@ namespace node { V(context, v8::Context) \ V(domain_array, v8::Array) \ V(fs_stats_constructor_function, v8::Function) \ - V(jsstream_constructor_template, v8::FunctionTemplate) \ V(module_load_list_array, v8::Array) \ V(pipe_constructor_template, v8::FunctionTemplate) \ V(process_object, v8::Object) \ @@ -243,10 +236,8 @@ namespace node { V(tcp_constructor_template, v8::FunctionTemplate) \ V(tick_callback_function, v8::Function) \ V(tls_wrap_constructor_function, v8::Function) \ - V(tls_wrap_constructor_template, v8::FunctionTemplate) \ V(tty_constructor_template, v8::FunctionTemplate) \ V(udp_constructor_function, v8::Function) \ - V(write_wrap_constructor_function, v8::Function) \ class Environment; diff --git a/src/js_stream.cc b/src/js_stream.cc deleted file mode 100644 index 7fcdfd9a94d9fd..00000000000000 --- a/src/js_stream.cc +++ /dev/null @@ -1,222 +0,0 @@ -#include "js_stream.h" - -#include "async-wrap.h" -#include "env.h" -#include "env-inl.h" -#include "node_buffer.h" -#include "stream_base.h" -#include "stream_base-inl.h" -#include "v8.h" - -namespace node { - -using v8::Array; -using v8::Context; -using v8::External; -using v8::FunctionCallbackInfo; -using v8::FunctionTemplate; -using v8::Handle; -using v8::HandleScope; -using v8::Local; -using v8::Object; -using v8::Value; - - -JSStream::JSStream(Environment* env, Handle obj, AsyncWrap* parent) - : StreamBase(env), - AsyncWrap(env, obj, AsyncWrap::PROVIDER_JSSTREAM, parent) { - node::Wrap(obj, this); - MakeWeak(this); -} - - -JSStream::~JSStream() { -} - - -void* JSStream::Cast() { - return static_cast(this); -} - - -AsyncWrap* JSStream::GetAsyncWrap() { - return static_cast(this); -} - - -bool JSStream::IsAlive() { - return MakeCallback(env()->isalive_string(), 0, nullptr)->IsTrue(); -} - - -bool JSStream::IsClosing() { - return MakeCallback(env()->isclosing_string(), 0, nullptr)->IsTrue(); -} - - -int JSStream::ReadStart() { - return MakeCallback(env()->onreadstart_string(), 0, nullptr)->Int32Value(); -} - - -int JSStream::ReadStop() { - return MakeCallback(env()->onreadstop_string(), 0, nullptr)->Int32Value(); -} - - -int JSStream::DoShutdown(ShutdownWrap* req_wrap) { - HandleScope scope(env()->isolate()); - - Local argv[] = { - req_wrap->object() - }; - - Local res = - MakeCallback(env()->onshutdown_string(), ARRAY_SIZE(argv), argv); - - return res->Int32Value(); -} - - -int JSStream::DoWrite(WriteWrap* w, - uv_buf_t* bufs, - size_t count, - uv_stream_t* send_handle) { - CHECK_EQ(send_handle, nullptr); - - HandleScope scope(env()->isolate()); - - Local bufs_arr = Array::New(env()->isolate(), count); - for (size_t i = 0; i < count; i++) - bufs_arr->Set(i, Buffer::New(env(), bufs[0].base, bufs[0].len)); - - Local argv[] = { - w->object(), - bufs_arr - }; - - Local res = - MakeCallback(env()->onwrite_string(), ARRAY_SIZE(argv), argv); - - return res->Int32Value(); -} - - -void JSStream::New(const FunctionCallbackInfo& args) { - // This constructor should not be exposed to public javascript. - // Therefore we assert that we are not trying to call this as a - // normal function. - CHECK(args.IsConstructCall()); - Environment* env = Environment::GetCurrent(args); - JSStream* wrap; - - if (args.Length() == 0) { - wrap = new JSStream(env, args.This(), nullptr); - } else if (args[0]->IsExternal()) { - void* ptr = args[0].As()->Value(); - wrap = new JSStream(env, args.This(), static_cast(ptr)); - } else { - UNREACHABLE(); - } - CHECK(wrap); -} - - -static void FreeCallback(char* data, void* hint) { - // Intentional no-op -} - - -void JSStream::DoAlloc(const FunctionCallbackInfo& args) { - JSStream* wrap = Unwrap(args.Holder()); - - uv_buf_t buf; - wrap->OnAlloc(args[0]->Int32Value(), &buf); - args.GetReturnValue().Set(Buffer::New(wrap->env(), - buf.base, - buf.len, - FreeCallback, - nullptr)); -} - - -void JSStream::DoRead(const FunctionCallbackInfo& args) { - JSStream* wrap = Unwrap(args.Holder()); - - CHECK(Buffer::HasInstance(args[1])); - uv_buf_t buf = uv_buf_init(Buffer::Data(args[1]), Buffer::Length(args[1])); - wrap->OnRead(args[0]->Int32Value(), &buf); -} - - -void JSStream::DoAfterWrite(const FunctionCallbackInfo& args) { - JSStream* wrap = Unwrap(args.Holder()); - WriteWrap* w = Unwrap(args[0].As()); - - wrap->OnAfterWrite(w); -} - - -template -void JSStream::Finish(const FunctionCallbackInfo& args) { - Wrap* w = Unwrap(args[0].As()); - - w->Done(args[0]->Int32Value()); -} - - -void JSStream::ReadBuffer(const FunctionCallbackInfo& args) { - JSStream* wrap = Unwrap(args.Holder()); - - CHECK(Buffer::HasInstance(args[0])); - char* data = Buffer::Data(args[0]); - int len = Buffer::Length(args[0]); - - do { - uv_buf_t buf; - ssize_t avail = len; - wrap->OnAlloc(len, &buf); - if (static_cast(buf.len) < avail) - avail = buf.len; - - memcpy(buf.base, data, avail); - data += avail; - len -= avail; - wrap->OnRead(avail, &buf); - } while (len != 0); -} - - -void JSStream::EmitEOF(const FunctionCallbackInfo& args) { - JSStream* wrap = Unwrap(args.Holder()); - - wrap->OnRead(UV_EOF, nullptr); -} - - -void JSStream::Initialize(Handle target, - Handle unused, - Handle context) { - Environment* env = Environment::GetCurrent(context); - - Local t = env->NewFunctionTemplate(New); - t->SetClassName(FIXED_ONE_BYTE_STRING(env->isolate(), "JSStream")); - t->InstanceTemplate()->SetInternalFieldCount(1); - - env->SetProtoMethod(t, "doAlloc", DoAlloc); - env->SetProtoMethod(t, "doRead", DoRead); - env->SetProtoMethod(t, "doAfterWrite", DoAfterWrite); - env->SetProtoMethod(t, "finishWrite", Finish); - env->SetProtoMethod(t, "finishShutdown", Finish); - env->SetProtoMethod(t, "readBuffer", ReadBuffer); - env->SetProtoMethod(t, "emitEOF", EmitEOF); - - StreamBase::AddMethods(env, t, StreamBase::kFlagHasWritev); - target->Set(FIXED_ONE_BYTE_STRING(env->isolate(), "JSStream"), - t->GetFunction()); - env->set_jsstream_constructor_template(t); -} - -} // namespace node - -NODE_MODULE_CONTEXT_AWARE_BUILTIN(js_stream, node::JSStream::Initialize) diff --git a/src/js_stream.h b/src/js_stream.h deleted file mode 100644 index 6bc763b36e2bd1..00000000000000 --- a/src/js_stream.h +++ /dev/null @@ -1,49 +0,0 @@ -#ifndef SRC_JS_STREAM_H_ -#define SRC_JS_STREAM_H_ - -#include "async-wrap.h" -#include "env.h" -#include "stream_base.h" -#include "v8.h" - -namespace node { - -class JSStream : public StreamBase, public AsyncWrap { - public: - static void Initialize(v8::Handle target, - v8::Handle unused, - v8::Handle context); - - ~JSStream(); - - void* Cast() override; - bool IsAlive() override; - bool IsClosing() override; - int ReadStart() override; - int ReadStop() override; - - int DoShutdown(ShutdownWrap* req_wrap) override; - int DoWrite(WriteWrap* w, - uv_buf_t* bufs, - size_t count, - uv_stream_t* send_handle) override; - - protected: - JSStream(Environment* env, v8::Handle obj, AsyncWrap* parent); - - AsyncWrap* GetAsyncWrap() override; - - static void New(const v8::FunctionCallbackInfo& args); - static void DoAlloc(const v8::FunctionCallbackInfo& args); - static void DoRead(const v8::FunctionCallbackInfo& args); - static void DoAfterWrite(const v8::FunctionCallbackInfo& args); - static void ReadBuffer(const v8::FunctionCallbackInfo& args); - static void EmitEOF(const v8::FunctionCallbackInfo& args); - - template - static void Finish(const v8::FunctionCallbackInfo& args); -}; - -} // namespace node - -#endif // SRC_JS_STREAM_H_ diff --git a/src/node_crypto.cc b/src/node_crypto.cc index a650d4fef70cbd..9ce0aadf17d2f7 100644 --- a/src/node_crypto.cc +++ b/src/node_crypto.cc @@ -3,7 +3,7 @@ #include "node_crypto.h" #include "node_crypto_bio.h" #include "node_crypto_groups.h" -#include "tls_wrap.h" // TLSWrap +#include "tls_wrap.h" // TLSCallbacks #include "async-wrap.h" #include "async-wrap-inl.h" @@ -98,28 +98,28 @@ const char* const root_certs[] = { X509_STORE* root_cert_store; // Just to generate static methods -template class SSLWrap; -template void SSLWrap::AddMethods(Environment* env, - Handle t); -template void SSLWrap::InitNPN(SecureContext* sc); -template SSL_SESSION* SSLWrap::GetSessionCallback( +template class SSLWrap; +template void SSLWrap::AddMethods(Environment* env, + Handle t); +template void SSLWrap::InitNPN(SecureContext* sc); +template SSL_SESSION* SSLWrap::GetSessionCallback( SSL* s, unsigned char* key, int len, int* copy); -template int SSLWrap::NewSessionCallback(SSL* s, - SSL_SESSION* sess); -template void SSLWrap::OnClientHello( +template int SSLWrap::NewSessionCallback(SSL* s, + SSL_SESSION* sess); +template void SSLWrap::OnClientHello( void* arg, const ClientHelloParser::ClientHello& hello); #ifdef OPENSSL_NPN_NEGOTIATED -template int SSLWrap::AdvertiseNextProtoCallback( +template int SSLWrap::AdvertiseNextProtoCallback( SSL* s, const unsigned char** data, unsigned int* len, void* arg); -template int SSLWrap::SelectNextProtoCallback( +template int SSLWrap::SelectNextProtoCallback( SSL* s, unsigned char** out, unsigned char* outlen, @@ -127,7 +127,7 @@ template int SSLWrap::SelectNextProtoCallback( unsigned int inlen, void* arg); #endif -template int SSLWrap::TLSExtStatusCallback(SSL* s, void* arg); +template int SSLWrap::TLSExtStatusCallback(SSL* s, void* arg); static void crypto_threadid_cb(CRYPTO_THREADID* tid) { @@ -973,7 +973,7 @@ void SSLWrap::AddMethods(Environment* env, Handle t) { env->SetProtoMethod(t, "getCurrentCipher", GetCurrentCipher); env->SetProtoMethod(t, "endParser", EndParser); env->SetProtoMethod(t, "renegotiate", Renegotiate); - env->SetProtoMethod(t, "shutdownSSL", Shutdown); + env->SetProtoMethod(t, "shutdown", Shutdown); env->SetProtoMethod(t, "getTLSTicket", GetTLSTicket); env->SetProtoMethod(t, "newSessionDone", NewSessionDone); env->SetProtoMethod(t, "setOCSPResponse", SetOCSPResponse); diff --git a/src/node_wrap.h b/src/node_wrap.h index 58b042a63b475a..80d679606e9169 100644 --- a/src/node_wrap.h +++ b/src/node_wrap.h @@ -3,7 +3,6 @@ #include "env.h" #include "env-inl.h" -#include "js_stream.h" #include "pipe_wrap.h" #include "tcp_wrap.h" #include "tty_wrap.h" @@ -15,7 +14,7 @@ namespace node { -#define WITH_GENERIC_UV_STREAM(env, obj, BODY, ELSE) \ +#define WITH_GENERIC_STREAM(env, obj, BODY) \ do { \ if (env->tcp_constructor_template().IsEmpty() == false && \ env->tcp_constructor_template()->HasInstance(obj)) { \ @@ -29,33 +28,16 @@ namespace node { env->pipe_constructor_template()->HasInstance(obj)) { \ PipeWrap* const wrap = Unwrap(obj); \ BODY \ - } else { \ - ELSE \ } \ } while (0) -#define WITH_GENERIC_STREAM(env, obj, BODY) \ - do { \ - WITH_GENERIC_UV_STREAM(env, obj, BODY, { \ - if (env->tls_wrap_constructor_template().IsEmpty() == false && \ - env->tls_wrap_constructor_template()->HasInstance(obj)) { \ - TLSWrap* const wrap = Unwrap(obj); \ - BODY \ - } else if (env->jsstream_constructor_template().IsEmpty() == false && \ - env->jsstream_constructor_template()->HasInstance(obj)) { \ - JSStream* const wrap = Unwrap(obj); \ - BODY \ - } \ - }); \ - } while (0) - inline uv_stream_t* HandleToStream(Environment* env, v8::Local obj) { v8::HandleScope scope(env->isolate()); - WITH_GENERIC_UV_STREAM(env, obj, { + WITH_GENERIC_STREAM(env, obj, { return reinterpret_cast(wrap->UVHandle()); - }, {}); + }); return nullptr; } diff --git a/src/pipe_wrap.cc b/src/pipe_wrap.cc index 08fed68741f614..55d5f84ff49858 100644 --- a/src/pipe_wrap.cc +++ b/src/pipe_wrap.cc @@ -77,11 +77,30 @@ void PipeWrap::Initialize(Handle target, t->SetClassName(FIXED_ONE_BYTE_STRING(env->isolate(), "Pipe")); t->InstanceTemplate()->SetInternalFieldCount(1); + enum PropertyAttribute attributes = + static_cast(v8::ReadOnly | v8::DontDelete); + t->InstanceTemplate()->SetAccessor(env->fd_string(), + StreamWrap::GetFD, + nullptr, + Handle(), + v8::DEFAULT, + attributes); + env->SetProtoMethod(t, "close", HandleWrap::Close); env->SetProtoMethod(t, "unref", HandleWrap::Unref); env->SetProtoMethod(t, "ref", HandleWrap::Ref); - StreamWrap::AddMethods(env, t); + env->SetProtoMethod(t, "setBlocking", StreamWrap::SetBlocking); + + env->SetProtoMethod(t, "readStart", StreamWrap::ReadStart); + env->SetProtoMethod(t, "readStop", StreamWrap::ReadStop); + env->SetProtoMethod(t, "shutdown", StreamWrap::Shutdown); + + env->SetProtoMethod(t, "writeBuffer", StreamWrap::WriteBuffer); + env->SetProtoMethod(t, "writeAsciiString", StreamWrap::WriteAsciiString); + env->SetProtoMethod(t, "writeUtf8String", StreamWrap::WriteUtf8String); + env->SetProtoMethod(t, "writeUcs2String", StreamWrap::WriteUcs2String); + env->SetProtoMethod(t, "writeBinaryString", StreamWrap::WriteBinaryString); env->SetProtoMethod(t, "bind", Bind); env->SetProtoMethod(t, "listen", Listen); diff --git a/src/stream_base-inl.h b/src/stream_base-inl.h deleted file mode 100644 index 8f7f5fea413393..00000000000000 --- a/src/stream_base-inl.h +++ /dev/null @@ -1,113 +0,0 @@ -#ifndef SRC_STREAM_BASE_INL_H_ -#define SRC_STREAM_BASE_INL_H_ - -#include "stream_base.h" - -#include "node.h" -#include "env.h" -#include "env-inl.h" -#include "v8.h" - -namespace node { - -using v8::FunctionCallbackInfo; -using v8::FunctionTemplate; -using v8::Handle; -using v8::HandleScope; -using v8::Local; -using v8::Object; -using v8::PropertyAttribute; -using v8::PropertyCallbackInfo; -using v8::String; -using v8::Value; - -template -void StreamBase::AddMethods(Environment* env, - Handle t, - int flags) { - HandleScope scope(env->isolate()); - - enum PropertyAttribute attributes = - static_cast(v8::ReadOnly | v8::DontDelete); - t->InstanceTemplate()->SetAccessor(env->fd_string(), - GetFD, - nullptr, - Handle(), - v8::DEFAULT, - attributes); - - env->SetProtoMethod(t, "readStart", JSMethod); - env->SetProtoMethod(t, "readStop", JSMethod); - if ((flags & kFlagNoShutdown) == 0) - env->SetProtoMethod(t, "shutdown", JSMethod); - if ((flags & kFlagHasWritev) != 0) - env->SetProtoMethod(t, "writev", JSMethod); - env->SetProtoMethod(t, - "writeBuffer", - JSMethod); - env->SetProtoMethod(t, - "writeAsciiString", - JSMethod >); - env->SetProtoMethod(t, - "writeUtf8String", - JSMethod >); - env->SetProtoMethod(t, - "writeUcs2String", - JSMethod >); - env->SetProtoMethod(t, - "writeBinaryString", - JSMethod >); -} - - -template -void StreamBase::GetFD(Local key, - const PropertyCallbackInfo& args) { - StreamBase* wrap = Unwrap(args.Holder()); - - if (!wrap->IsAlive()) - return args.GetReturnValue().Set(UV_EINVAL); - - args.GetReturnValue().Set(wrap->GetFD()); -} - - -template & args)> -void StreamBase::JSMethod(const FunctionCallbackInfo& args) { - StreamBase* wrap = Unwrap(args.Holder()); - - if (!wrap->IsAlive()) - return args.GetReturnValue().Set(UV_EINVAL); - - args.GetReturnValue().Set((wrap->*Method)(args)); -} - - -WriteWrap* WriteWrap::New(Environment* env, - Local obj, - StreamBase* wrap, - DoneCb cb, - size_t extra) { - size_t storage_size = ROUND_UP(sizeof(WriteWrap), kAlignSize) + extra; - char* storage = new char[storage_size]; - - return new(storage) WriteWrap(env, obj, wrap, cb); -} - - -void WriteWrap::Dispose() { - this->~WriteWrap(); - delete[] reinterpret_cast(this); -} - - -char* WriteWrap::Extra(size_t offset) { - return reinterpret_cast(this) + - ROUND_UP(sizeof(*this), kAlignSize) + - offset; -} - -} // namespace node - -#endif // SRC_STREAM_BASE_INL_H_ diff --git a/src/stream_base.cc b/src/stream_base.cc deleted file mode 100644 index 3a9f30f279bd2c..00000000000000 --- a/src/stream_base.cc +++ /dev/null @@ -1,450 +0,0 @@ -#include "stream_base.h" -#include "stream_base-inl.h" -#include "stream_wrap.h" - -#include "node.h" -#include "node_buffer.h" -#include "env.h" -#include "env-inl.h" -#include "js_stream.h" -#include "string_bytes.h" -#include "util.h" -#include "util-inl.h" -#include "v8.h" - -#include // INT_MAX - -namespace node { - -using v8::Array; -using v8::Context; -using v8::FunctionCallbackInfo; -using v8::Handle; -using v8::HandleScope; -using v8::Integer; -using v8::Local; -using v8::Number; -using v8::Object; -using v8::String; -using v8::Value; - -template int StreamBase::WriteString( - const FunctionCallbackInfo& args); -template int StreamBase::WriteString( - const FunctionCallbackInfo& args); -template int StreamBase::WriteString( - const FunctionCallbackInfo& args); -template int StreamBase::WriteString( - const FunctionCallbackInfo& args); - - -int StreamBase::ReadStart(const FunctionCallbackInfo& args) { - return ReadStart(); -} - - -int StreamBase::ReadStop(const FunctionCallbackInfo& args) { - return ReadStop(); -} - - -int StreamBase::Shutdown(const FunctionCallbackInfo& args) { - Environment* env = Environment::GetCurrent(args); - - CHECK(args[0]->IsObject()); - Local req_wrap_obj = args[0].As(); - - ShutdownWrap* req_wrap = new ShutdownWrap(env, - req_wrap_obj, - this, - AfterShutdown); - - int err = DoShutdown(req_wrap); - req_wrap->Dispatched(); - if (err) - delete req_wrap; - return err; -} - - -void StreamBase::AfterShutdown(ShutdownWrap* req_wrap, int status) { - StreamBase* wrap = req_wrap->wrap(); - Environment* env = req_wrap->env(); - - // The wrap and request objects should still be there. - CHECK_EQ(req_wrap->persistent().IsEmpty(), false); - CHECK_EQ(wrap->GetAsyncWrap()->persistent().IsEmpty(), false); - - HandleScope handle_scope(env->isolate()); - Context::Scope context_scope(env->context()); - - Local req_wrap_obj = req_wrap->object(); - Local argv[3] = { - Integer::New(env->isolate(), status), - wrap->GetAsyncWrap()->object(), - req_wrap_obj - }; - - if (req_wrap->object()->Has(env->oncomplete_string())) - req_wrap->MakeCallback(env->oncomplete_string(), ARRAY_SIZE(argv), argv); - - delete req_wrap; -} - - -int StreamBase::Writev(const FunctionCallbackInfo& args) { - Environment* env = Environment::GetCurrent(args); - - CHECK(args[0]->IsObject()); - CHECK(args[1]->IsArray()); - - Local req_wrap_obj = args[0].As(); - Local chunks = args[1].As(); - - size_t count = chunks->Length() >> 1; - - uv_buf_t bufs_[16]; - uv_buf_t* bufs = bufs_; - - // Determine storage size first - size_t storage_size = 0; - for (size_t i = 0; i < count; i++) { - storage_size = ROUND_UP(storage_size, WriteWrap::kAlignSize); - - Handle chunk = chunks->Get(i * 2); - - if (Buffer::HasInstance(chunk)) - continue; - // Buffer chunk, no additional storage required - - // String chunk - Handle string = chunk->ToString(env->isolate()); - enum encoding encoding = ParseEncoding(env->isolate(), - chunks->Get(i * 2 + 1)); - size_t chunk_size; - if (encoding == UTF8 && string->Length() > 65535) - chunk_size = StringBytes::Size(env->isolate(), string, encoding); - else - chunk_size = StringBytes::StorageSize(env->isolate(), string, encoding); - - storage_size += chunk_size; - } - - if (storage_size > INT_MAX) - return UV_ENOBUFS; - - if (ARRAY_SIZE(bufs_) < count) - bufs = new uv_buf_t[count]; - - WriteWrap* req_wrap = WriteWrap::New(env, - req_wrap_obj, - this, - AfterWrite, - storage_size); - - uint32_t bytes = 0; - size_t offset = 0; - for (size_t i = 0; i < count; i++) { - Handle chunk = chunks->Get(i * 2); - - // Write buffer - if (Buffer::HasInstance(chunk)) { - bufs[i].base = Buffer::Data(chunk); - bufs[i].len = Buffer::Length(chunk); - bytes += bufs[i].len; - continue; - } - - // Write string - offset = ROUND_UP(offset, WriteWrap::kAlignSize); - CHECK_LT(offset, storage_size); - char* str_storage = req_wrap->Extra(offset); - size_t str_size = storage_size - offset; - - Handle string = chunk->ToString(env->isolate()); - enum encoding encoding = ParseEncoding(env->isolate(), - chunks->Get(i * 2 + 1)); - str_size = StringBytes::Write(env->isolate(), - str_storage, - str_size, - string, - encoding); - bufs[i].base = str_storage; - bufs[i].len = str_size; - offset += str_size; - bytes += str_size; - } - - int err = DoWrite(req_wrap, bufs, count, nullptr); - - // Deallocate space - if (bufs != bufs_) - delete[] bufs; - - req_wrap->Dispatched(); - req_wrap->object()->Set(env->async(), True(env->isolate())); - req_wrap->object()->Set(env->bytes_string(), - Number::New(env->isolate(), bytes)); - const char* msg = Error(); - if (msg != nullptr) { - req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg)); - ClearError(); - } - - if (err) - req_wrap->Dispose(); - - return err; -} - - - - -int StreamBase::WriteBuffer(const FunctionCallbackInfo& args) { - CHECK(args[0]->IsObject()); - CHECK(Buffer::HasInstance(args[1])); - Environment* env = Environment::GetCurrent(args); - - Local req_wrap_obj = args[0].As(); - const char* data = Buffer::Data(args[1]); - size_t length = Buffer::Length(args[1]); - - WriteWrap* req_wrap; - uv_buf_t buf; - buf.base = const_cast(data); - buf.len = length; - - // Try writing immediately without allocation - uv_buf_t* bufs = &buf; - size_t count = 1; - int err = DoTryWrite(&bufs, &count); - if (err != 0) - goto done; - if (count == 0) - goto done; - CHECK_EQ(count, 1); - - // Allocate, or write rest - req_wrap = WriteWrap::New(env, req_wrap_obj, this, AfterWrite); - - err = DoWrite(req_wrap, bufs, count, nullptr); - req_wrap->Dispatched(); - req_wrap_obj->Set(env->async(), True(env->isolate())); - - if (err) - req_wrap->Dispose(); - - done: - const char* msg = Error(); - if (msg != nullptr) { - req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg)); - ClearError(); - } - req_wrap_obj->Set(env->bytes_string(), - Integer::NewFromUnsigned(env->isolate(), length)); - return err; -} - - -template -int StreamBase::WriteString(const FunctionCallbackInfo& args) { - Environment* env = Environment::GetCurrent(args); - CHECK(args[0]->IsObject()); - CHECK(args[1]->IsString()); - - Local req_wrap_obj = args[0].As(); - Local string = args[1].As(); - Local send_handle_obj; - if (args[2]->IsObject()) - send_handle_obj = args[2].As(); - - int err; - - // Compute the size of the storage that the string will be flattened into. - // For UTF8 strings that are very long, go ahead and take the hit for - // computing their actual size, rather than tripling the storage. - size_t storage_size; - if (enc == UTF8 && string->Length() > 65535) - storage_size = StringBytes::Size(env->isolate(), string, enc); - else - storage_size = StringBytes::StorageSize(env->isolate(), string, enc); - - if (storage_size > INT_MAX) - return UV_ENOBUFS; - - // Try writing immediately if write size isn't too big - WriteWrap* req_wrap; - char* data; - char stack_storage[16384]; // 16kb - size_t data_size; - uv_buf_t buf; - - bool try_write = storage_size <= sizeof(stack_storage) && - (!IsIPCPipe() || send_handle_obj.IsEmpty()); - if (try_write) { - data_size = StringBytes::Write(env->isolate(), - stack_storage, - storage_size, - string, - enc); - buf = uv_buf_init(stack_storage, data_size); - - uv_buf_t* bufs = &buf; - size_t count = 1; - err = DoTryWrite(&bufs, &count); - - // Failure - if (err != 0) - goto done; - - // Success - if (count == 0) - goto done; - - // Partial write - CHECK_EQ(count, 1); - } - - req_wrap = WriteWrap::New(env, req_wrap_obj, this, AfterWrite, storage_size); - - data = req_wrap->Extra(); - - if (try_write) { - // Copy partial data - memcpy(data, buf.base, buf.len); - data_size = buf.len; - } else { - // Write it - data_size = StringBytes::Write(env->isolate(), - data, - storage_size, - string, - enc); - } - - CHECK_LE(data_size, storage_size); - - buf = uv_buf_init(data, data_size); - - if (!IsIPCPipe()) { - err = DoWrite(req_wrap, &buf, 1, nullptr); - } else { - uv_handle_t* send_handle = nullptr; - - if (!send_handle_obj.IsEmpty()) { - HandleWrap* wrap = Unwrap(send_handle_obj); - send_handle = wrap->GetHandle(); - // Reference StreamWrap instance to prevent it from being garbage - // collected before `AfterWrite` is called. - CHECK_EQ(false, req_wrap->persistent().IsEmpty()); - req_wrap->object()->Set(env->handle_string(), send_handle_obj); - } - - err = DoWrite( - req_wrap, - &buf, - 1, - reinterpret_cast(send_handle)); - } - - req_wrap->Dispatched(); - req_wrap->object()->Set(env->async(), True(env->isolate())); - - if (err) - req_wrap->Dispose(); - - done: - const char* msg = Error(); - if (msg != nullptr) { - req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg)); - ClearError(); - } - req_wrap_obj->Set(env->bytes_string(), - Integer::NewFromUnsigned(env->isolate(), data_size)); - return err; -} - - -void StreamBase::AfterWrite(WriteWrap* req_wrap, int status) { - StreamBase* wrap = req_wrap->wrap(); - Environment* env = req_wrap->env(); - - HandleScope handle_scope(env->isolate()); - Context::Scope context_scope(env->context()); - - // The wrap and request objects should still be there. - CHECK_EQ(req_wrap->persistent().IsEmpty(), false); - CHECK_EQ(wrap->GetAsyncWrap()->persistent().IsEmpty(), false); - - // Unref handle property - Local req_wrap_obj = req_wrap->object(); - req_wrap_obj->Delete(env->handle_string()); - wrap->OnAfterWrite(req_wrap); - - Local argv[] = { - Integer::New(env->isolate(), status), - wrap->GetAsyncWrap()->object(), - req_wrap_obj, - Undefined(env->isolate()) - }; - - const char* msg = wrap->Error(); - if (msg != nullptr) { - argv[3] = OneByteString(env->isolate(), msg); - wrap->ClearError(); - } - - if (req_wrap->object()->Has(env->oncomplete_string())) - req_wrap->MakeCallback(env->oncomplete_string(), ARRAY_SIZE(argv), argv); - - req_wrap->Dispose(); -} - - -void StreamBase::EmitData(ssize_t nread, - Local buf, - Local handle) { - Environment* env = env_; - - Local argv[] = { - Integer::New(env->isolate(), nread), - buf, - handle - }; - - if (argv[1].IsEmpty()) - argv[1] = Undefined(env->isolate()); - - if (argv[2].IsEmpty()) - argv[2] = Undefined(env->isolate()); - - GetAsyncWrap()->MakeCallback(env->onread_string(), ARRAY_SIZE(argv), argv); -} - - -bool StreamBase::IsIPCPipe() { - return false; -} - - -int StreamBase::GetFD() { - return -1; -} - - -int StreamResource::DoTryWrite(uv_buf_t** bufs, size_t* count) { - // No TryWrite by default - return 0; -} - - -const char* StreamResource::Error() const { - return nullptr; -} - - -void StreamResource::ClearError() { - // No-op -} - -} // namespace node diff --git a/src/stream_base.h b/src/stream_base.h deleted file mode 100644 index dfb0d31c66776d..00000000000000 --- a/src/stream_base.h +++ /dev/null @@ -1,239 +0,0 @@ -#ifndef SRC_STREAM_BASE_H_ -#define SRC_STREAM_BASE_H_ - -#include "env.h" -#include "async-wrap.h" -#include "req-wrap.h" -#include "req-wrap-inl.h" -#include "node.h" - -#include "v8.h" - -namespace node { - -// Forward declarations -class StreamBase; - -template -class StreamReq { - public: - typedef void (*DoneCb)(Req* req, int status); - - explicit StreamReq(DoneCb cb) : cb_(cb) { - } - - inline void Done(int status) { - cb_(static_cast(this), status); - } - - private: - DoneCb cb_; -}; - -class ShutdownWrap : public ReqWrap, - public StreamReq { - public: - ShutdownWrap(Environment* env, - v8::Local req_wrap_obj, - StreamBase* wrap, - DoneCb cb) - : ReqWrap(env, req_wrap_obj, AsyncWrap::PROVIDER_SHUTDOWNWRAP), - StreamReq(cb), - wrap_(wrap) { - Wrap(req_wrap_obj, this); - } - - static void NewShutdownWrap(const v8::FunctionCallbackInfo& args) { - CHECK(args.IsConstructCall()); - } - - inline StreamBase* wrap() const { return wrap_; } - - private: - StreamBase* const wrap_; -}; - -class WriteWrap: public ReqWrap, - public StreamReq { - public: - static inline WriteWrap* New(Environment* env, - v8::Local obj, - StreamBase* wrap, - DoneCb cb, - size_t extra = 0); - inline void Dispose(); - inline char* Extra(size_t offset = 0); - - inline StreamBase* wrap() const { return wrap_; } - - static void NewWriteWrap(const v8::FunctionCallbackInfo& args) { - CHECK(args.IsConstructCall()); - } - - static const size_t kAlignSize = 16; - - protected: - WriteWrap(Environment* env, - v8::Local obj, - StreamBase* wrap, - DoneCb cb) - : ReqWrap(env, obj, AsyncWrap::PROVIDER_WRITEWRAP), - StreamReq(cb), - wrap_(wrap) { - Wrap(obj, this); - } - - void* operator new(size_t size) = delete; - void* operator new(size_t size, char* storage) { return storage; } - - // This is just to keep the compiler happy. It should never be called, since - // we don't use exceptions in node. - void operator delete(void* ptr, char* storage) { UNREACHABLE(); } - - private: - // People should not be using the non-placement new and delete operator on a - // WriteWrap. Ensure this never happens. - void operator delete(void* ptr) { UNREACHABLE(); } - - StreamBase* const wrap_; -}; - -class StreamResource { - public: - typedef void (*AfterWriteCb)(WriteWrap* w, void* ctx); - typedef void (*AllocCb)(size_t size, uv_buf_t* buf, void* ctx); - typedef void (*ReadCb)(ssize_t nread, - const uv_buf_t* buf, - uv_handle_type pending, - void* ctx); - - StreamResource() : after_write_cb_(nullptr), - alloc_cb_(nullptr), - read_cb_(nullptr) { - } - - virtual ~StreamResource() = default; - - virtual int DoShutdown(ShutdownWrap* req_wrap) = 0; - virtual int DoTryWrite(uv_buf_t** bufs, size_t* count); - virtual int DoWrite(WriteWrap* w, - uv_buf_t* bufs, - size_t count, - uv_stream_t* send_handle) = 0; - virtual const char* Error() const; - virtual void ClearError(); - - // Events - inline void OnAfterWrite(WriteWrap* w) { - if (after_write_cb_ != nullptr) - after_write_cb_(w, after_write_ctx_); - } - - inline void OnAlloc(size_t size, uv_buf_t* buf) { - if (alloc_cb_ != nullptr) - alloc_cb_(size, buf, alloc_ctx_); - } - - inline void OnRead(size_t nread, - const uv_buf_t* buf, - uv_handle_type pending = UV_UNKNOWN_HANDLE) { - if (read_cb_ != nullptr) - read_cb_(nread, buf, pending, read_ctx_); - } - - inline void set_after_write_cb(AfterWriteCb cb, void* ctx) { - after_write_ctx_ = ctx; - after_write_cb_ = cb; - } - - inline void set_alloc_cb(AllocCb cb, void* ctx) { - alloc_cb_ = cb; - alloc_ctx_ = ctx; - } - - inline void set_read_cb(ReadCb cb, void* ctx) { - read_cb_ = cb; - read_ctx_ = ctx; - } - - private: - AfterWriteCb after_write_cb_; - void* after_write_ctx_; - AllocCb alloc_cb_; - void* alloc_ctx_; - ReadCb read_cb_; - void* read_ctx_; -}; - -class StreamBase : public StreamResource { - public: - enum Flags { - kFlagNone = 0x0, - kFlagHasWritev = 0x1, - kFlagNoShutdown = 0x2 - }; - - template - static inline void AddMethods(Environment* env, - v8::Handle target, - int flags = kFlagNone); - - virtual void* Cast() = 0; - virtual bool IsAlive() = 0; - virtual bool IsClosing() = 0; - virtual bool IsIPCPipe(); - virtual int GetFD(); - - virtual int ReadStart() = 0; - virtual int ReadStop() = 0; - - inline void Consume() { - CHECK_EQ(consumed_, false); - consumed_ = true; - } - - template - inline Outer* Cast() { return static_cast(Cast()); } - - void EmitData(ssize_t nread, - v8::Local buf, - v8::Local handle); - - protected: - explicit StreamBase(Environment* env) : env_(env), consumed_(false) { - } - - virtual ~StreamBase() = default; - - virtual AsyncWrap* GetAsyncWrap() = 0; - - // Libuv callbacks - static void AfterShutdown(ShutdownWrap* req, int status); - static void AfterWrite(WriteWrap* req, int status); - - // JS Methods - int ReadStart(const v8::FunctionCallbackInfo& args); - int ReadStop(const v8::FunctionCallbackInfo& args); - int Shutdown(const v8::FunctionCallbackInfo& args); - int Writev(const v8::FunctionCallbackInfo& args); - int WriteBuffer(const v8::FunctionCallbackInfo& args); - template - int WriteString(const v8::FunctionCallbackInfo& args); - - template - static void GetFD(v8::Local, - const v8::PropertyCallbackInfo&); - - template & args)> - static void JSMethod(const v8::FunctionCallbackInfo& args); - - private: - Environment* env_; - bool consumed_; -}; - -} // namespace node - -#endif // SRC_STREAM_BASE_H_ diff --git a/src/stream_wrap.cc b/src/stream_wrap.cc index be5757d4a60bd1..a9f89e47bb9813 100644 --- a/src/stream_wrap.cc +++ b/src/stream_wrap.cc @@ -1,7 +1,4 @@ #include "stream_wrap.h" -#include "stream_base.h" -#include "stream_base-inl.h" - #include "env-inl.h" #include "env.h" #include "handle_wrap.h" @@ -41,8 +38,8 @@ using v8::Value; void StreamWrap::Initialize(Handle target, - Handle unused, - Handle context) { + Handle unused, + Handle context) { Environment* env = Environment::GetCurrent(context); Local sw = @@ -58,7 +55,6 @@ void StreamWrap::Initialize(Handle target, ww->SetClassName(FIXED_ONE_BYTE_STRING(env->isolate(), "WriteWrap")); target->Set(FIXED_ONE_BYTE_STRING(env->isolate(), "WriteWrap"), ww->GetFunction()); - env->set_write_wrap_constructor_function(ww->GetFunction()); } @@ -72,54 +68,23 @@ StreamWrap::StreamWrap(Environment* env, reinterpret_cast(stream), provider, parent), - StreamBase(env), - stream_(stream) { - set_after_write_cb(OnAfterWriteImpl, this); - set_alloc_cb(OnAllocImpl, this); - set_read_cb(OnReadImpl, this); -} - - -void StreamWrap::AddMethods(Environment* env, - v8::Handle target, - int flags) { - env->SetProtoMethod(target, "setBlocking", SetBlocking); - StreamBase::AddMethods(env, target, flags); + stream_(stream), + default_callbacks_(this), + callbacks_(&default_callbacks_), + callbacks_gc_(false) { } -int StreamWrap::GetFD() { - int fd = -1; +void StreamWrap::GetFD(Local, const PropertyCallbackInfo& args) { #if !defined(_WIN32) - if (stream() != nullptr) - fd = stream()->io_watcher.fd; + HandleScope scope(args.GetIsolate()); + StreamWrap* wrap = Unwrap(args.Holder()); + int fd = -1; + if (wrap != nullptr && wrap->stream() != nullptr) { + fd = wrap->stream()->io_watcher.fd; + } + args.GetReturnValue().Set(fd); #endif - return fd; -} - - -bool StreamWrap::IsAlive() { - return HandleWrap::IsAlive(this); -} - - -bool StreamWrap::IsClosing() { - return uv_is_closing(reinterpret_cast(stream())); -} - - -void* StreamWrap::Cast() { - return reinterpret_cast(this); -} - - -AsyncWrap* StreamWrap::GetAsyncWrap() { - return static_cast(this); -} - - -bool StreamWrap::IsIPCPipe() { - return is_named_pipe_ipc(); } @@ -131,13 +96,22 @@ void StreamWrap::UpdateWriteQueueSize() { } -int StreamWrap::ReadStart() { - return uv_read_start(stream(), OnAlloc, OnRead); +void StreamWrap::ReadStart(const FunctionCallbackInfo& args) { + StreamWrap* wrap = Unwrap(args.Holder()); + if (!IsAlive(wrap)) + return args.GetReturnValue().Set(UV_EINVAL); + + int err = uv_read_start(wrap->stream(), OnAlloc, OnRead); + args.GetReturnValue().Set(err); } -int StreamWrap::ReadStop() { - return uv_read_stop(stream()); +void StreamWrap::ReadStop(const FunctionCallbackInfo& args) { + StreamWrap* wrap = Unwrap(args.Holder()); + if (!IsAlive(wrap)) + return args.GetReturnValue().Set(UV_EINVAL); + int err = uv_read_stop(wrap->stream()); + args.GetReturnValue().Set(err); } @@ -145,29 +119,15 @@ void StreamWrap::OnAlloc(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { StreamWrap* wrap = static_cast(handle->data); - HandleScope scope(wrap->env()->isolate()); - Context::Scope context_scope(wrap->env()->context()); - CHECK_EQ(wrap->stream(), reinterpret_cast(handle)); - - return static_cast(wrap)->OnAlloc(suggested_size, buf); -} - - -void StreamWrap::OnAllocImpl(size_t size, uv_buf_t* buf, void* ctx) { - buf->base = static_cast(malloc(size)); - buf->len = size; - - if (buf->base == nullptr && size > 0) { - FatalError( - "node::StreamWrap::DoAlloc(size_t, uv_buf_t*, void*)", - "Out Of Memory"); - } + wrap->callbacks()->DoAlloc(handle, suggested_size, buf); } template -static Local AcceptHandle(Environment* env, StreamWrap* parent) { +static Local AcceptHandle(Environment* env, + uv_stream_t* pipe, + AsyncWrap* parent) { EscapableHandleScope scope(env->isolate()); Local wrap_obj; UVType* handle; @@ -179,61 +139,18 @@ static Local AcceptHandle(Environment* env, StreamWrap* parent) { WrapType* wrap = Unwrap(wrap_obj); handle = wrap->UVHandle(); - if (uv_accept(parent->stream(), reinterpret_cast(handle))) + if (uv_accept(pipe, reinterpret_cast(handle))) abort(); return scope.Escape(wrap_obj); } -void StreamWrap::OnReadImpl(ssize_t nread, - const uv_buf_t* buf, - uv_handle_type pending, - void* ctx) { - StreamWrap* wrap = static_cast(ctx); - Environment* env = wrap->env(); - HandleScope handle_scope(env->isolate()); - Context::Scope context_scope(env->context()); - - Local pending_obj; - - if (nread < 0) { - if (buf->base != nullptr) - free(buf->base); - wrap->EmitData(nread, Local(), pending_obj); - return; - } - - if (nread == 0) { - if (buf->base != nullptr) - free(buf->base); - return; - } - - char* base = static_cast(realloc(buf->base, nread)); - CHECK_LE(static_cast(nread), buf->len); - - if (pending == UV_TCP) { - pending_obj = AcceptHandle(env, wrap); - } else if (pending == UV_NAMED_PIPE) { - pending_obj = AcceptHandle(env, wrap); - } else if (pending == UV_UDP) { - pending_obj = AcceptHandle(env, wrap); - } else { - CHECK_EQ(pending, UV_UNKNOWN_HANDLE); - } - - wrap->EmitData(nread, Buffer::Use(env, base, nread), pending_obj); -} - - void StreamWrap::OnReadCommon(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf, uv_handle_type pending) { StreamWrap* wrap = static_cast(handle->data); - HandleScope scope(wrap->env()->isolate()); - Context::Scope context_scope(wrap->env()->context()); // We should not be getting this callback if someone as already called // uv_close() on the handle. @@ -247,7 +164,7 @@ void StreamWrap::OnReadCommon(uv_stream_t* handle, } } - static_cast(wrap)->OnRead(nread, buf, pending); + wrap->callbacks()->DoRead(handle, nread, buf, pending); } @@ -266,28 +183,437 @@ void StreamWrap::OnRead(uv_stream_t* handle, } -void StreamWrap::SetBlocking(const FunctionCallbackInfo& args) { +size_t StreamWrap::WriteBuffer(Handle val, uv_buf_t* buf) { + CHECK(Buffer::HasInstance(val)); + + // Simple non-writev case + buf->base = Buffer::Data(val); + buf->len = Buffer::Length(val); + + return buf->len; +} + + +void StreamWrap::WriteBuffer(const FunctionCallbackInfo& args) { + Environment* env = Environment::GetCurrent(args); + StreamWrap* wrap = Unwrap(args.Holder()); + if (!IsAlive(wrap)) + return args.GetReturnValue().Set(UV_EINVAL); - CHECK_GT(args.Length(), 0); - if (!wrap->IsAlive()) + CHECK(args[0]->IsObject()); + CHECK(Buffer::HasInstance(args[1])); + + Local req_wrap_obj = args[0].As(); + Local buf_obj = args[1].As(); + + size_t length = Buffer::Length(buf_obj); + + char* storage; + WriteWrap* req_wrap; + uv_buf_t buf; + WriteBuffer(buf_obj, &buf); + + // Try writing immediately without allocation + uv_buf_t* bufs = &buf; + size_t count = 1; + int err = wrap->callbacks()->TryWrite(&bufs, &count); + if (err != 0) + goto done; + if (count == 0) + goto done; + CHECK_EQ(count, 1); + + // Allocate, or write rest + storage = new char[sizeof(WriteWrap)]; + req_wrap = new(storage) WriteWrap(env, req_wrap_obj, wrap); + + err = wrap->callbacks()->DoWrite(req_wrap, + bufs, + count, + nullptr, + StreamWrap::AfterWrite); + req_wrap->Dispatched(); + req_wrap_obj->Set(env->async(), True(env->isolate())); + + if (err) { + req_wrap->~WriteWrap(); + delete[] storage; + } + + done: + const char* msg = wrap->callbacks()->Error(); + if (msg != nullptr) { + req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg)); + wrap->callbacks()->ClearError(); + } + req_wrap_obj->Set(env->bytes_string(), + Integer::NewFromUnsigned(env->isolate(), length)); + args.GetReturnValue().Set(err); +} + + +template +void StreamWrap::WriteStringImpl(const FunctionCallbackInfo& args) { + Environment* env = Environment::GetCurrent(args); + int err; + + StreamWrap* wrap = Unwrap(args.Holder()); + if (!IsAlive(wrap)) + return args.GetReturnValue().Set(UV_EINVAL); + + CHECK(args[0]->IsObject()); + CHECK(args[1]->IsString()); + + Local req_wrap_obj = args[0].As(); + Local string = args[1].As(); + + // Compute the size of the storage that the string will be flattened into. + // For UTF8 strings that are very long, go ahead and take the hit for + // computing their actual size, rather than tripling the storage. + size_t storage_size; + if (encoding == UTF8 && string->Length() > 65535) + storage_size = StringBytes::Size(env->isolate(), string, encoding); + else + storage_size = StringBytes::StorageSize(env->isolate(), string, encoding); + + if (storage_size > INT_MAX) { + args.GetReturnValue().Set(UV_ENOBUFS); + return; + } + + // Try writing immediately if write size isn't too big + char* storage; + WriteWrap* req_wrap; + char* data; + char stack_storage[16384]; // 16kb + size_t data_size; + uv_buf_t buf; + + bool try_write = storage_size + 15 <= sizeof(stack_storage) && + (!wrap->is_named_pipe_ipc() || !args[2]->IsObject()); + if (try_write) { + data_size = StringBytes::Write(env->isolate(), + stack_storage, + storage_size, + string, + encoding); + buf = uv_buf_init(stack_storage, data_size); + + uv_buf_t* bufs = &buf; + size_t count = 1; + err = wrap->callbacks()->TryWrite(&bufs, &count); + + // Failure + if (err != 0) + goto done; + + // Success + if (count == 0) + goto done; + + // Partial write + CHECK_EQ(count, 1); + } + + storage = new char[sizeof(WriteWrap) + storage_size + 15]; + req_wrap = new(storage) WriteWrap(env, req_wrap_obj, wrap); + + data = reinterpret_cast(ROUND_UP( + reinterpret_cast(storage) + sizeof(WriteWrap), 16)); + + if (try_write) { + // Copy partial data + memcpy(data, buf.base, buf.len); + data_size = buf.len; + } else { + // Write it + data_size = StringBytes::Write(env->isolate(), + data, + storage_size, + string, + encoding); + } + + CHECK_LE(data_size, storage_size); + + buf = uv_buf_init(data, data_size); + + if (!wrap->is_named_pipe_ipc()) { + err = wrap->callbacks()->DoWrite(req_wrap, + &buf, + 1, + nullptr, + StreamWrap::AfterWrite); + } else { + uv_handle_t* send_handle = nullptr; + + if (args[2]->IsObject()) { + Local send_handle_obj = args[2].As(); + HandleWrap* wrap = Unwrap(send_handle_obj); + send_handle = wrap->GetHandle(); + // Reference StreamWrap instance to prevent it from being garbage + // collected before `AfterWrite` is called. + CHECK_EQ(false, req_wrap->persistent().IsEmpty()); + req_wrap->object()->Set(env->handle_string(), send_handle_obj); + } + + err = wrap->callbacks()->DoWrite( + req_wrap, + &buf, + 1, + reinterpret_cast(send_handle), + StreamWrap::AfterWrite); + } + + req_wrap->Dispatched(); + req_wrap->object()->Set(env->async(), True(env->isolate())); + + if (err) { + req_wrap->~WriteWrap(); + delete[] storage; + } + + done: + const char* msg = wrap->callbacks()->Error(); + if (msg != nullptr) { + req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg)); + wrap->callbacks()->ClearError(); + } + req_wrap_obj->Set(env->bytes_string(), + Integer::NewFromUnsigned(env->isolate(), data_size)); + args.GetReturnValue().Set(err); +} + + +void StreamWrap::Writev(const FunctionCallbackInfo& args) { + Environment* env = Environment::GetCurrent(args); + + StreamWrap* wrap = Unwrap(args.Holder()); + if (!IsAlive(wrap)) + return args.GetReturnValue().Set(UV_EINVAL); + + CHECK(args[0]->IsObject()); + CHECK(args[1]->IsArray()); + + Local req_wrap_obj = args[0].As(); + Local chunks = args[1].As(); + size_t count = chunks->Length() >> 1; + + uv_buf_t bufs_[16]; + uv_buf_t* bufs = bufs_; + + // Determine storage size first + size_t storage_size = 0; + for (size_t i = 0; i < count; i++) { + Handle chunk = chunks->Get(i * 2); + + if (Buffer::HasInstance(chunk)) + continue; + // Buffer chunk, no additional storage required + + // String chunk + Handle string = chunk->ToString(env->isolate()); + enum encoding encoding = ParseEncoding(env->isolate(), + chunks->Get(i * 2 + 1)); + size_t chunk_size; + if (encoding == UTF8 && string->Length() > 65535) + chunk_size = StringBytes::Size(env->isolate(), string, encoding); + else + chunk_size = StringBytes::StorageSize(env->isolate(), string, encoding); + + storage_size += chunk_size + 15; + } + + if (storage_size > INT_MAX) { + args.GetReturnValue().Set(UV_ENOBUFS); + return; + } + + if (ARRAY_SIZE(bufs_) < count) + bufs = new uv_buf_t[count]; + + storage_size += sizeof(WriteWrap); + char* storage = new char[storage_size]; + WriteWrap* req_wrap = + new(storage) WriteWrap(env, req_wrap_obj, wrap); + + uint32_t bytes = 0; + size_t offset = sizeof(WriteWrap); + for (size_t i = 0; i < count; i++) { + Handle chunk = chunks->Get(i * 2); + + // Write buffer + if (Buffer::HasInstance(chunk)) { + bufs[i].base = Buffer::Data(chunk); + bufs[i].len = Buffer::Length(chunk); + bytes += bufs[i].len; + continue; + } + + // Write string + offset = ROUND_UP(offset, 16); + CHECK_LT(offset, storage_size); + char* str_storage = storage + offset; + size_t str_size = storage_size - offset; + + Handle string = chunk->ToString(env->isolate()); + enum encoding encoding = ParseEncoding(env->isolate(), + chunks->Get(i * 2 + 1)); + str_size = StringBytes::Write(env->isolate(), + str_storage, + str_size, + string, + encoding); + bufs[i].base = str_storage; + bufs[i].len = str_size; + offset += str_size; + bytes += str_size; + } + + int err = wrap->callbacks()->DoWrite(req_wrap, + bufs, + count, + nullptr, + StreamWrap::AfterWrite); + + // Deallocate space + if (bufs != bufs_) + delete[] bufs; + + req_wrap->Dispatched(); + req_wrap->object()->Set(env->async(), True(env->isolate())); + req_wrap->object()->Set(env->bytes_string(), + Number::New(env->isolate(), bytes)); + const char* msg = wrap->callbacks()->Error(); + if (msg != nullptr) { + req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg)); + wrap->callbacks()->ClearError(); + } + + if (err) { + req_wrap->~WriteWrap(); + delete[] storage; + } + + args.GetReturnValue().Set(err); +} + + +void StreamWrap::WriteAsciiString(const FunctionCallbackInfo& args) { + WriteStringImpl(args); +} + + +void StreamWrap::WriteUtf8String(const FunctionCallbackInfo& args) { + WriteStringImpl(args); +} + + +void StreamWrap::WriteUcs2String(const FunctionCallbackInfo& args) { + WriteStringImpl(args); +} + +void StreamWrap::WriteBinaryString(const FunctionCallbackInfo& args) { + WriteStringImpl(args); +} + +void StreamWrap::SetBlocking(const FunctionCallbackInfo& args) { + StreamWrap* wrap = Unwrap(args.Holder()); + if (!IsAlive(wrap)) return args.GetReturnValue().Set(UV_EINVAL); + CHECK_GT(args.Length(), 0); + int err = uv_stream_set_blocking(wrap->stream(), args[0]->IsTrue()); + args.GetReturnValue().Set(err); +} - bool enable = args[0]->IsTrue(); - args.GetReturnValue().Set(uv_stream_set_blocking(wrap->stream(), enable)); +void StreamWrap::AfterWrite(uv_write_t* req, int status) { + WriteWrap* req_wrap = ContainerOf(&WriteWrap::req_, req); + StreamWrap* wrap = req_wrap->wrap(); + Environment* env = wrap->env(); + + HandleScope handle_scope(env->isolate()); + Context::Scope context_scope(env->context()); + + // The wrap and request objects should still be there. + CHECK_EQ(req_wrap->persistent().IsEmpty(), false); + CHECK_EQ(wrap->persistent().IsEmpty(), false); + + // Unref handle property + Local req_wrap_obj = req_wrap->object(); + req_wrap_obj->Delete(env->handle_string()); + wrap->callbacks()->AfterWrite(req_wrap); + + Local argv[] = { + Integer::New(env->isolate(), status), + wrap->object(), + req_wrap_obj, + Undefined(env->isolate()) + }; + + const char* msg = wrap->callbacks()->Error(); + if (msg != nullptr) { + argv[3] = OneByteString(env->isolate(), msg); + wrap->callbacks()->ClearError(); + } + + req_wrap->MakeCallback(env->oncomplete_string(), ARRAY_SIZE(argv), argv); + + req_wrap->~WriteWrap(); + delete[] reinterpret_cast(req_wrap); } -int StreamWrap::DoShutdown(ShutdownWrap* req_wrap) { - return uv_shutdown(&req_wrap->req_, stream(), AfterShutdown); +void StreamWrap::Shutdown(const FunctionCallbackInfo& args) { + Environment* env = Environment::GetCurrent(args); + + StreamWrap* wrap = Unwrap(args.Holder()); + if (!IsAlive(wrap)) + return args.GetReturnValue().Set(UV_EINVAL); + + CHECK(args[0]->IsObject()); + Local req_wrap_obj = args[0].As(); + + ShutdownWrap* req_wrap = new ShutdownWrap(env, req_wrap_obj); + int err = wrap->callbacks()->DoShutdown(req_wrap, AfterShutdown); + req_wrap->Dispatched(); + if (err) + delete req_wrap; + args.GetReturnValue().Set(err); } void StreamWrap::AfterShutdown(uv_shutdown_t* req, int status) { - ShutdownWrap* req_wrap = ContainerOf(&ShutdownWrap::req_, req); - HandleScope scope(req_wrap->env()->isolate()); - Context::Scope context_scope(req_wrap->env()->context()); - req_wrap->Done(status); + ShutdownWrap* req_wrap = static_cast(req->data); + StreamWrap* wrap = static_cast(req->handle->data); + Environment* env = wrap->env(); + + // The wrap and request objects should still be there. + CHECK_EQ(req_wrap->persistent().IsEmpty(), false); + CHECK_EQ(wrap->persistent().IsEmpty(), false); + + HandleScope handle_scope(env->isolate()); + Context::Scope context_scope(env->context()); + + Local req_wrap_obj = req_wrap->object(); + Local argv[3] = { + Integer::New(env->isolate(), status), + wrap->object(), + req_wrap_obj + }; + + req_wrap->MakeCallback(env->oncomplete_string(), ARRAY_SIZE(argv), argv); + + delete req_wrap; +} + + +const char* StreamWrapCallbacks::Error() const { + return nullptr; +} + + +void StreamWrapCallbacks::ClearError() { } @@ -295,13 +621,13 @@ void StreamWrap::AfterShutdown(uv_shutdown_t* req, int status) { // values, shifting their base and decrementing their length. This is // required in order to skip the data that was successfully written via // uv_try_write(). -int StreamWrap::DoTryWrite(uv_buf_t** bufs, size_t* count) { +int StreamWrapCallbacks::TryWrite(uv_buf_t** bufs, size_t* count) { int err; size_t written; uv_buf_t* vbufs = *bufs; size_t vcount = *count; - err = uv_try_write(stream(), vbufs, vcount); + err = uv_try_write(wrap()->stream(), vbufs, vcount); if (err == UV_ENOSYS || err == UV_EAGAIN) return 0; if (err < 0) @@ -331,45 +657,106 @@ int StreamWrap::DoTryWrite(uv_buf_t** bufs, size_t* count) { } -int StreamWrap::DoWrite(WriteWrap* w, - uv_buf_t* bufs, - size_t count, - uv_stream_t* send_handle) { +int StreamWrapCallbacks::DoWrite(WriteWrap* w, + uv_buf_t* bufs, + size_t count, + uv_stream_t* send_handle, + uv_write_cb cb) { int r; if (send_handle == nullptr) { - r = uv_write(&w->req_, stream(), bufs, count, AfterWrite); + r = uv_write(&w->req_, wrap()->stream(), bufs, count, cb); } else { - r = uv_write2(&w->req_, stream(), bufs, count, send_handle, AfterWrite); + r = uv_write2(&w->req_, wrap()->stream(), bufs, count, send_handle, cb); } if (!r) { size_t bytes = 0; for (size_t i = 0; i < count; i++) bytes += bufs[i].len; - if (stream()->type == UV_TCP) { + if (wrap()->stream()->type == UV_TCP) { NODE_COUNT_NET_BYTES_SENT(bytes); - } else if (stream()->type == UV_NAMED_PIPE) { + } else if (wrap()->stream()->type == UV_NAMED_PIPE) { NODE_COUNT_PIPE_BYTES_SENT(bytes); } } - UpdateWriteQueueSize(); + wrap()->UpdateWriteQueueSize(); return r; } -void StreamWrap::AfterWrite(uv_write_t* req, int status) { - WriteWrap* req_wrap = ContainerOf(&WriteWrap::req_, req); - HandleScope scope(req_wrap->env()->isolate()); - Context::Scope context_scope(req_wrap->env()->context()); - req_wrap->Done(status); +void StreamWrapCallbacks::AfterWrite(WriteWrap* w) { + wrap()->UpdateWriteQueueSize(); +} + + +void StreamWrapCallbacks::DoAlloc(uv_handle_t* handle, + size_t suggested_size, + uv_buf_t* buf) { + buf->base = static_cast(malloc(suggested_size)); + buf->len = suggested_size; + + if (buf->base == nullptr && suggested_size > 0) { + FatalError( + "node::StreamWrapCallbacks::DoAlloc(uv_handle_t*, size_t, uv_buf_t*)", + "Out Of Memory"); + } +} + + +void StreamWrapCallbacks::DoRead(uv_stream_t* handle, + ssize_t nread, + const uv_buf_t* buf, + uv_handle_type pending) { + Environment* env = wrap()->env(); + HandleScope handle_scope(env->isolate()); + Context::Scope context_scope(env->context()); + + Local argv[] = { + Integer::New(env->isolate(), nread), + Undefined(env->isolate()), + Undefined(env->isolate()) + }; + + if (nread < 0) { + if (buf->base != nullptr) + free(buf->base); + wrap()->MakeCallback(env->onread_string(), ARRAY_SIZE(argv), argv); + return; + } + + if (nread == 0) { + if (buf->base != nullptr) + free(buf->base); + return; + } + + char* base = static_cast(realloc(buf->base, nread)); + CHECK_LE(static_cast(nread), buf->len); + argv[1] = Buffer::Use(env, base, nread); + + Local pending_obj; + if (pending == UV_TCP) { + pending_obj = AcceptHandle(env, handle, wrap()); + } else if (pending == UV_NAMED_PIPE) { + pending_obj = AcceptHandle(env, handle, wrap()); + } else if (pending == UV_UDP) { + pending_obj = AcceptHandle(env, handle, wrap()); + } else { + CHECK_EQ(pending, UV_UNKNOWN_HANDLE); + } + + if (!pending_obj.IsEmpty()) { + argv[2] = pending_obj; + } + + wrap()->MakeCallback(env->onread_string(), ARRAY_SIZE(argv), argv); } -void StreamWrap::OnAfterWriteImpl(WriteWrap* w, void* ctx) { - StreamWrap* wrap = static_cast(ctx); - wrap->UpdateWriteQueueSize(); +int StreamWrapCallbacks::DoShutdown(ShutdownWrap* req_wrap, uv_shutdown_cb cb) { + return uv_shutdown(&req_wrap->req_, wrap()->stream(), cb); } } // namespace node diff --git a/src/stream_wrap.h b/src/stream_wrap.h index 8e4cdf20be962e..5148228112eb1e 100644 --- a/src/stream_wrap.h +++ b/src/stream_wrap.h @@ -1,10 +1,10 @@ #ifndef SRC_STREAM_WRAP_H_ #define SRC_STREAM_WRAP_H_ -#include "stream_base.h" - #include "env.h" #include "handle_wrap.h" +#include "req-wrap.h" +#include "req-wrap-inl.h" #include "string_bytes.h" #include "v8.h" @@ -13,29 +13,126 @@ namespace node { // Forward declaration class StreamWrap; -class StreamWrap : public HandleWrap, public StreamBase { +class ShutdownWrap : public ReqWrap { + public: + ShutdownWrap(Environment* env, v8::Local req_wrap_obj) + : ReqWrap(env, req_wrap_obj, AsyncWrap::PROVIDER_SHUTDOWNWRAP) { + Wrap(req_wrap_obj, this); + } + + static void NewShutdownWrap(const v8::FunctionCallbackInfo& args) { + CHECK(args.IsConstructCall()); + } +}; + +class WriteWrap: public ReqWrap { + public: + // TODO(trevnorris): WrapWrap inherits from ReqWrap, which I've globbed + // into the same provider. How should these be broken apart? + WriteWrap(Environment* env, v8::Local obj, StreamWrap* wrap) + : ReqWrap(env, obj, AsyncWrap::PROVIDER_WRITEWRAP), + wrap_(wrap) { + Wrap(obj, this); + } + + void* operator new(size_t size, char* storage) { return storage; } + + // This is just to keep the compiler happy. It should never be called, since + // we don't use exceptions in node. + void operator delete(void* ptr, char* storage) { UNREACHABLE(); } + + inline StreamWrap* wrap() const { + return wrap_; + } + + static void NewWriteWrap(const v8::FunctionCallbackInfo& args) { + CHECK(args.IsConstructCall()); + } + + private: + // People should not be using the non-placement new and delete operator on a + // WriteWrap. Ensure this never happens. + void* operator new(size_t size) { UNREACHABLE(); } + void operator delete(void* ptr) { UNREACHABLE(); } + + StreamWrap* const wrap_; +}; + +// Overridable callbacks' types +class StreamWrapCallbacks { + public: + explicit StreamWrapCallbacks(StreamWrap* wrap) : wrap_(wrap) { + } + + explicit StreamWrapCallbacks(StreamWrapCallbacks* old) : wrap_(old->wrap()) { + } + + virtual ~StreamWrapCallbacks() = default; + + virtual const char* Error() const; + virtual void ClearError(); + + virtual int TryWrite(uv_buf_t** bufs, size_t* count); + + virtual int DoWrite(WriteWrap* w, + uv_buf_t* bufs, + size_t count, + uv_stream_t* send_handle, + uv_write_cb cb); + virtual void AfterWrite(WriteWrap* w); + virtual void DoAlloc(uv_handle_t* handle, + size_t suggested_size, + uv_buf_t* buf); + virtual void DoRead(uv_stream_t* handle, + ssize_t nread, + const uv_buf_t* buf, + uv_handle_type pending); + virtual int DoShutdown(ShutdownWrap* req_wrap, uv_shutdown_cb cb); + + protected: + inline StreamWrap* wrap() const { + return wrap_; + } + + private: + StreamWrap* const wrap_; +}; + +class StreamWrap : public HandleWrap { public: static void Initialize(v8::Handle target, v8::Handle unused, v8::Handle context); - int GetFD() override; - void* Cast() override; - bool IsAlive() override; - bool IsClosing() override; - bool IsIPCPipe() override; + void OverrideCallbacks(StreamWrapCallbacks* callbacks, bool gc) { + StreamWrapCallbacks* old = callbacks_; + callbacks_ = callbacks; + callbacks_gc_ = gc; + if (old != &default_callbacks_) + delete old; + } + + static void GetFD(v8::Local, + const v8::PropertyCallbackInfo&); // JavaScript functions - int ReadStart() override; - int ReadStop() override; + static void ReadStart(const v8::FunctionCallbackInfo& args); + static void ReadStop(const v8::FunctionCallbackInfo& args); + static void Shutdown(const v8::FunctionCallbackInfo& args); + + static void Writev(const v8::FunctionCallbackInfo& args); + static void WriteBuffer(const v8::FunctionCallbackInfo& args); + static void WriteAsciiString(const v8::FunctionCallbackInfo& args); + static void WriteUtf8String(const v8::FunctionCallbackInfo& args); + static void WriteUcs2String(const v8::FunctionCallbackInfo& args); + static void WriteBinaryString( + const v8::FunctionCallbackInfo& args); - // Resource implementation - int DoShutdown(ShutdownWrap* req_wrap) override; - int DoTryWrite(uv_buf_t** bufs, size_t* count) override; - int DoWrite(WriteWrap* w, - uv_buf_t* bufs, - size_t count, - uv_stream_t* send_handle) override; + static void SetBlocking(const v8::FunctionCallbackInfo& args); + + inline StreamWrapCallbacks* callbacks() const { + return callbacks_; + } inline uv_stream_t* stream() const { return stream_; @@ -55,6 +152,8 @@ class StreamWrap : public HandleWrap, public StreamBase { } protected: + static size_t WriteBuffer(v8::Handle val, uv_buf_t* buf); + StreamWrap(Environment* env, v8::Local object, uv_stream_t* stream, @@ -62,22 +161,22 @@ class StreamWrap : public HandleWrap, public StreamBase { AsyncWrap* parent = nullptr); ~StreamWrap() { + if (!callbacks_gc_ && callbacks_ != &default_callbacks_) { + delete callbacks_; + } + callbacks_ = nullptr; } - AsyncWrap* GetAsyncWrap() override; + void StateChange() { } void UpdateWriteQueueSize(); - static void AddMethods(Environment* env, - v8::Handle target, - int flags = StreamBase::kFlagNone); - private: - static void SetBlocking(const v8::FunctionCallbackInfo& args); - // Callbacks for libuv + static void AfterWrite(uv_write_t* req, int status); static void OnAlloc(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); + static void AfterShutdown(uv_shutdown_t* req, int status); static void OnRead(uv_stream_t* handle, ssize_t nread, @@ -86,18 +185,16 @@ class StreamWrap : public HandleWrap, public StreamBase { ssize_t nread, const uv_buf_t* buf, uv_handle_type pending); - static void AfterWrite(uv_write_t* req, int status); - static void AfterShutdown(uv_shutdown_t* req, int status); - // Resource interface implementation - static void OnAfterWriteImpl(WriteWrap* w, void* ctx); - static void OnAllocImpl(size_t size, uv_buf_t* buf, void* ctx); - static void OnReadImpl(ssize_t nread, - const uv_buf_t* buf, - uv_handle_type pending, - void* ctx); + template + static void WriteStringImpl(const v8::FunctionCallbackInfo& args); uv_stream_t* const stream_; + StreamWrapCallbacks default_callbacks_; + StreamWrapCallbacks* callbacks_; // Overridable callbacks + bool callbacks_gc_; + + friend class StreamWrapCallbacks; }; diff --git a/src/tcp_wrap.cc b/src/tcp_wrap.cc index e16e14058332aa..84400032f2b207 100644 --- a/src/tcp_wrap.cc +++ b/src/tcp_wrap.cc @@ -72,6 +72,15 @@ void TCPWrap::Initialize(Handle target, t->SetClassName(FIXED_ONE_BYTE_STRING(env->isolate(), "TCP")); t->InstanceTemplate()->SetInternalFieldCount(1); + enum PropertyAttribute attributes = + static_cast(v8::ReadOnly | v8::DontDelete); + t->InstanceTemplate()->SetAccessor(env->fd_string(), + StreamWrap::GetFD, + nullptr, + Handle(), + v8::DEFAULT, + attributes); + // Init properties t->InstanceTemplate()->Set(String::NewFromUtf8(env->isolate(), "reading"), Boolean::New(env->isolate(), false)); @@ -89,7 +98,16 @@ void TCPWrap::Initialize(Handle target, env->SetProtoMethod(t, "ref", HandleWrap::Ref); env->SetProtoMethod(t, "unref", HandleWrap::Unref); - StreamWrap::AddMethods(env, t, StreamBase::kFlagHasWritev); + env->SetProtoMethod(t, "readStart", StreamWrap::ReadStart); + env->SetProtoMethod(t, "readStop", StreamWrap::ReadStop); + env->SetProtoMethod(t, "shutdown", StreamWrap::Shutdown); + + env->SetProtoMethod(t, "writeBuffer", StreamWrap::WriteBuffer); + env->SetProtoMethod(t, "writeAsciiString", StreamWrap::WriteAsciiString); + env->SetProtoMethod(t, "writeUtf8String", StreamWrap::WriteUtf8String); + env->SetProtoMethod(t, "writeUcs2String", StreamWrap::WriteUcs2String); + env->SetProtoMethod(t, "writeBinaryString", StreamWrap::WriteBinaryString); + env->SetProtoMethod(t, "writev", StreamWrap::Writev); env->SetProtoMethod(t, "open", Open); env->SetProtoMethod(t, "bind", Bind); diff --git a/src/tls_wrap.cc b/src/tls_wrap.cc index 49523bc3b84955..a22bc250f2d3be 100644 --- a/src/tls_wrap.cc +++ b/src/tls_wrap.cc @@ -9,8 +9,6 @@ #include "node_wrap.h" // WithGenericStream #include "node_counters.h" #include "node_internals.h" -#include "stream_base.h" -#include "stream_base-inl.h" #include "util.h" #include "util-inl.h" @@ -26,6 +24,7 @@ using v8::Function; using v8::FunctionCallbackInfo; using v8::FunctionTemplate; using v8::Handle; +using v8::HandleScope; using v8::Integer; using v8::Local; using v8::Null; @@ -34,17 +33,17 @@ using v8::String; using v8::Value; -TLSWrap::TLSWrap(Environment* env, - Kind kind, - StreamBase* stream, - SecureContext* sc) - : SSLWrap(env, sc, kind), - StreamBase(env), +TLSCallbacks::TLSCallbacks(Environment* env, + Kind kind, + Handle sc, + StreamWrapCallbacks* old) + : SSLWrap(env, Unwrap(sc), kind), + StreamWrapCallbacks(old), AsyncWrap(env, env->tls_wrap_constructor_function()->NewInstance(), AsyncWrap::PROVIDER_TLSWRAP), - sc_(sc), - stream_(stream), + sc_(Unwrap(sc)), + sc_handle_(env->isolate(), sc), enc_in_(nullptr), enc_out_(nullptr), clear_in_(nullptr), @@ -59,28 +58,22 @@ TLSWrap::TLSWrap(Environment* env, MakeWeak(this); // We've our own session callbacks - SSL_CTX_sess_set_get_cb(sc_->ctx_, SSLWrap::GetSessionCallback); - SSL_CTX_sess_set_new_cb(sc_->ctx_, SSLWrap::NewSessionCallback); - - stream_->Consume(); - stream_->set_after_write_cb(OnAfterWriteImpl, this); - stream_->set_alloc_cb(OnAllocImpl, this); - stream_->set_read_cb(OnReadImpl, this); - - set_alloc_cb(OnAllocSelf, this); - set_read_cb(OnReadSelf, this); + SSL_CTX_sess_set_get_cb(sc_->ctx_, SSLWrap::GetSessionCallback); + SSL_CTX_sess_set_new_cb(sc_->ctx_, SSLWrap::NewSessionCallback); InitSSL(); } -TLSWrap::~TLSWrap() { +TLSCallbacks::~TLSCallbacks() { enc_in_ = nullptr; enc_out_ = nullptr; delete clear_in_; clear_in_ = nullptr; sc_ = nullptr; + sc_handle_.Reset(); + persistent().Reset(); #ifdef SSL_CTRL_SET_TLSEXT_SERVERNAME_CB sni_context_.Reset(); @@ -97,12 +90,12 @@ TLSWrap::~TLSWrap() { } -void TLSWrap::MakePending() { +void TLSCallbacks::MakePending() { write_item_queue_.MoveBack(&pending_write_items_); } -bool TLSWrap::InvokeQueued(int status) { +bool TLSCallbacks::InvokeQueued(int status) { if (pending_write_items_.IsEmpty()) return false; @@ -110,7 +103,7 @@ bool TLSWrap::InvokeQueued(int status) { WriteItemList queue; pending_write_items_.MoveBack(&queue); while (WriteItem* wi = queue.PopFront()) { - wi->w_->Done(status); + wi->cb_(&wi->w_->req_, status); delete wi; } @@ -118,12 +111,12 @@ bool TLSWrap::InvokeQueued(int status) { } -void TLSWrap::NewSessionDoneCb() { +void TLSCallbacks::NewSessionDoneCb() { Cycle(); } -void TLSWrap::InitSSL() { +void TLSCallbacks::InitSSL() { // Initialize SSL enc_in_ = NodeBIO::New(); enc_out_ = NodeBIO::New(); @@ -168,7 +161,7 @@ void TLSWrap::InitSSL() { } -void TLSWrap::Wrap(const FunctionCallbackInfo& args) { +void TLSCallbacks::Wrap(const FunctionCallbackInfo& args) { Environment* env = Environment::GetCurrent(args); if (args.Length() < 1 || !args[0]->IsObject()) { @@ -182,39 +175,42 @@ void TLSWrap::Wrap(const FunctionCallbackInfo& args) { if (args.Length() < 3 || !args[2]->IsBoolean()) return env->ThrowTypeError("Third argument should be boolean"); - Local stream_obj = args[0].As(); + Local stream = args[0].As(); Local sc = args[1].As(); - Kind kind = args[2]->IsTrue() ? SSLWrap::kServer : - SSLWrap::kClient; + Kind kind = args[2]->IsTrue() ? SSLWrap::kServer : + SSLWrap::kClient; - StreamBase* stream = nullptr; - WITH_GENERIC_STREAM(env, stream_obj, { - stream = wrap; + TLSCallbacks* callbacks = nullptr; + WITH_GENERIC_STREAM(env, stream, { + callbacks = new TLSCallbacks(env, kind, sc, wrap->callbacks()); + wrap->OverrideCallbacks(callbacks, true); }); - CHECK_NE(stream, nullptr); - TLSWrap* res = new TLSWrap(env, kind, stream, Unwrap(sc)); + if (callbacks == nullptr) { + return args.GetReturnValue().SetNull(); + } - args.GetReturnValue().Set(res->object()); + args.GetReturnValue().Set(callbacks->persistent()); } -void TLSWrap::Receive(const FunctionCallbackInfo& args) { - TLSWrap* wrap = Unwrap(args.Holder()); +void TLSCallbacks::Receive(const FunctionCallbackInfo& args) { + TLSCallbacks* wrap = Unwrap(args.Holder()); CHECK(Buffer::HasInstance(args[0])); char* data = Buffer::Data(args[0]); size_t len = Buffer::Length(args[0]); uv_buf_t buf; + uv_stream_t* stream = wrap->wrap()->stream(); // Copy given buffer entirely or partiall if handle becomes closed - while (len > 0 && !wrap->IsClosing()) { - wrap->stream_->OnAlloc(len, &buf); + while (len > 0 && !uv_is_closing(reinterpret_cast(stream))) { + wrap->DoAlloc(reinterpret_cast(stream), len, &buf); size_t copy = buf.len > len ? len : buf.len; memcpy(buf.base, data, copy); buf.len = copy; - wrap->stream_->OnRead(buf.len, &buf); + wrap->DoRead(stream, buf.len, &buf, UV_UNKNOWN_HANDLE); data += copy; len -= copy; @@ -222,10 +218,10 @@ void TLSWrap::Receive(const FunctionCallbackInfo& args) { } -void TLSWrap::Start(const FunctionCallbackInfo& args) { +void TLSCallbacks::Start(const FunctionCallbackInfo& args) { Environment* env = Environment::GetCurrent(args); - TLSWrap* wrap = Unwrap(args.Holder()); + TLSCallbacks* wrap = Unwrap(args.Holder()); if (wrap->started_) return env->ThrowError("Already started."); @@ -238,15 +234,17 @@ void TLSWrap::Start(const FunctionCallbackInfo& args) { } -void TLSWrap::SSLInfoCallback(const SSL* ssl_, int where, int ret) { +void TLSCallbacks::SSLInfoCallback(const SSL* ssl_, int where, int ret) { if (!(where & (SSL_CB_HANDSHAKE_START | SSL_CB_HANDSHAKE_DONE))) return; // Be compatible with older versions of OpenSSL. SSL_get_app_data() wants // a non-const SSL* in OpenSSL <= 0.9.7e. SSL* ssl = const_cast(ssl_); - TLSWrap* c = static_cast(SSL_get_app_data(ssl)); + TLSCallbacks* c = static_cast(SSL_get_app_data(ssl)); Environment* env = c->env(); + HandleScope handle_scope(env->isolate()); + Context::Scope context_scope(env->context()); Local object = c->object(); if (where & SSL_CB_HANDSHAKE_START) { @@ -266,7 +264,7 @@ void TLSWrap::SSLInfoCallback(const SSL* ssl_, int where, int ret) { } -void TLSWrap::EncOut() { +void TLSCallbacks::EncOut() { // Ignore cycling data if ClientHello wasn't yet parsed if (!hello_parser_.IsEnded()) return; @@ -296,50 +294,47 @@ void TLSWrap::EncOut() { write_size_ = NodeBIO::FromBIO(enc_out_)->PeekMultiple(data, size, &count); CHECK(write_size_ != 0 && count != 0); - Local req_wrap_obj = - env()->write_wrap_constructor_function()->NewInstance(); - WriteWrap* write_req = WriteWrap::New(env(), - req_wrap_obj, - this, - EncOutCb); - + write_req_.data = this; uv_buf_t buf[ARRAY_SIZE(data)]; for (size_t i = 0; i < count; i++) buf[i] = uv_buf_init(data[i], size[i]); - int r = stream_->DoWrite(write_req, buf, count, nullptr); - write_req->Dispatched(); + int r = uv_write(&write_req_, wrap()->stream(), buf, count, EncOutCb); // Ignore errors, this should be already handled in js - if (!r) - NODE_COUNT_NET_BYTES_SENT(write_size_); + if (!r) { + if (wrap()->is_tcp()) { + NODE_COUNT_NET_BYTES_SENT(write_size_); + } else if (wrap()->is_named_pipe()) { + NODE_COUNT_PIPE_BYTES_SENT(write_size_); + } + } } -void TLSWrap::EncOutCb(WriteWrap* req_wrap, int status) { - TLSWrap* wrap = req_wrap->wrap()->Cast(); - req_wrap->Dispose(); +void TLSCallbacks::EncOutCb(uv_write_t* req, int status) { + TLSCallbacks* callbacks = static_cast(req->data); // Handle error if (status) { // Ignore errors after shutdown - if (wrap->shutdown_) + if (callbacks->shutdown_) return; // Notify about error - wrap->InvokeQueued(status); + callbacks->InvokeQueued(status); return; } // Commit - NodeBIO::FromBIO(wrap->enc_out_)->Read(nullptr, wrap->write_size_); + NodeBIO::FromBIO(callbacks->enc_out_)->Read(nullptr, callbacks->write_size_); // Try writing more data - wrap->write_size_ = 0; - wrap->EncOut(); + callbacks->write_size_ = 0; + callbacks->EncOut(); } -Local TLSWrap::GetSSLError(int status, int* err, const char** msg) { +Local TLSCallbacks::GetSSLError(int status, int* err, const char** msg) { EscapableHandleScope scope(env()->isolate()); *err = SSL_get_error(ssl_, status); @@ -381,7 +376,7 @@ Local TLSWrap::GetSSLError(int status, int* err, const char** msg) { } -void TLSWrap::ClearOut() { +void TLSCallbacks::ClearOut() { // Ignore cycling data if ClientHello wasn't yet parsed if (!hello_parser_.IsEnded()) return; @@ -390,34 +385,29 @@ void TLSWrap::ClearOut() { if (eof_) return; + HandleScope handle_scope(env()->isolate()); + Context::Scope context_scope(env()->context()); + CHECK_NE(ssl_, nullptr); char out[kClearOutChunkSize]; int read; - for (;;) { + do { read = SSL_read(ssl_, out, sizeof(out)); - - if (read <= 0) - break; - - while (read > 0) { - int avail = read; - - uv_buf_t buf; - OnAlloc(avail, &buf); - if (static_cast(buf.len) < avail) - avail = buf.len; - memcpy(buf.base, out, avail); - OnRead(avail, &buf); - - read -= avail; + if (read > 0) { + Local argv[] = { + Integer::New(env()->isolate(), read), + Buffer::New(env(), out, read) + }; + wrap()->MakeCallback(env()->onread_string(), ARRAY_SIZE(argv), argv); } - } + } while (read > 0); int flags = SSL_get_shutdown(ssl_); if (!eof_ && flags & SSL_RECEIVED_SHUTDOWN) { eof_ = true; - OnRead(UV_EOF, nullptr); + Local arg = Integer::New(env()->isolate(), UV_EOF); + wrap()->MakeCallback(env()->onread_string(), 1, &arg); } if (read == -1) { @@ -440,7 +430,7 @@ void TLSWrap::ClearOut() { } -bool TLSWrap::ClearIn() { +bool TLSCallbacks::ClearIn() { // Ignore cycling data if ClientHello wasn't yet parsed if (!hello_parser_.IsEnded()) return false; @@ -462,6 +452,9 @@ bool TLSWrap::ClearIn() { return true; } + HandleScope handle_scope(env()->isolate()); + Context::Scope context_scope(env()->context()); + // Error or partial write int err; Local arg = GetSSLError(written, &err, &error_); @@ -476,61 +469,28 @@ bool TLSWrap::ClearIn() { } -void* TLSWrap::Cast() { - return reinterpret_cast(this); -} - - -AsyncWrap* TLSWrap::GetAsyncWrap() { - return static_cast(this); -} - - -bool TLSWrap::IsIPCPipe() { - return stream_->IsIPCPipe(); -} - - -int TLSWrap::GetFD() { - return stream_->GetFD(); -} - - -bool TLSWrap::IsAlive() { - return stream_->IsAlive(); -} - - -bool TLSWrap::IsClosing() { - return stream_->IsClosing(); -} - - -int TLSWrap::ReadStart() { - return stream_->ReadStart(); -} - - -int TLSWrap::ReadStop() { - return stream_->ReadStop(); -} - - -const char* TLSWrap::Error() const { +const char* TLSCallbacks::Error() const { return error_; } -void TLSWrap::ClearError() { +void TLSCallbacks::ClearError() { delete[] error_; error_ = nullptr; } -int TLSWrap::DoWrite(WriteWrap* w, - uv_buf_t* bufs, - size_t count, - uv_stream_t* send_handle) { +int TLSCallbacks::TryWrite(uv_buf_t** bufs, size_t* count) { + // TODO(indutny): Support it + return 0; +} + + +int TLSCallbacks::DoWrite(WriteWrap* w, + uv_buf_t* bufs, + size_t count, + uv_stream_t* send_handle, + uv_write_cb cb) { CHECK_EQ(send_handle, nullptr); bool empty = true; @@ -547,11 +507,11 @@ int TLSWrap::DoWrite(WriteWrap* w, // However if there any data that should be written to socket, // callback should not be invoked immediately if (BIO_pending(enc_out_) == 0) - return stream_->DoWrite(w, bufs, count, send_handle); + return uv_write(&w->req_, wrap()->stream(), bufs, count, cb); } // Queue callback to execute it on next tick - write_item_queue_.PushBack(new WriteItem(w)); + write_item_queue_.PushBack(new WriteItem(w, cb)); // Write queued data if (empty) { @@ -577,6 +537,8 @@ int TLSWrap::DoWrite(WriteWrap* w, if (i != count) { int err; + HandleScope handle_scope(env()->isolate()); + Context::Scope context_scope(env()->context()); Local arg = GetSSLError(written, &err, &error_); if (!arg.IsEmpty()) return UV_EPROTO; @@ -593,51 +555,24 @@ int TLSWrap::DoWrite(WriteWrap* w, } -void TLSWrap::OnAfterWriteImpl(WriteWrap* w, void* ctx) { +void TLSCallbacks::AfterWrite(WriteWrap* w) { // Intentionally empty } -void TLSWrap::OnAllocImpl(size_t suggested_size, uv_buf_t* buf, void* ctx) { - TLSWrap* wrap = static_cast(ctx); - +void TLSCallbacks::DoAlloc(uv_handle_t* handle, + size_t suggested_size, + uv_buf_t* buf) { size_t size = 0; - buf->base = NodeBIO::FromBIO(wrap->enc_in_)->PeekWritable(&size); + buf->base = NodeBIO::FromBIO(enc_in_)->PeekWritable(&size); buf->len = size; } -void TLSWrap::OnReadImpl(ssize_t nread, - const uv_buf_t* buf, - uv_handle_type pending, - void* ctx) { - TLSWrap* wrap = static_cast(ctx); - wrap->DoRead(nread, buf, pending); -} - - -void TLSWrap::OnAllocSelf(size_t suggested_size, uv_buf_t* buf, void* ctx) { - buf->base = static_cast(malloc(suggested_size)); - CHECK_NE(buf->base, nullptr); - buf->len = suggested_size; -} - - -void TLSWrap::OnReadSelf(ssize_t nread, - const uv_buf_t* buf, - uv_handle_type pending, - void* ctx) { - TLSWrap* wrap = static_cast(ctx); - Local buf_obj; - if (buf != nullptr) - buf_obj = Buffer::Use(wrap->env(), buf->base, buf->len); - wrap->EmitData(nread, buf_obj, Local()); -} - - -void TLSWrap::DoRead(ssize_t nread, - const uv_buf_t* buf, - uv_handle_type pending) { +void TLSCallbacks::DoRead(uv_stream_t* handle, + ssize_t nread, + const uv_buf_t* buf, + uv_handle_type pending) { if (nread < 0) { // Error should be emitted only after all data was read ClearOut(); @@ -649,7 +584,10 @@ void TLSWrap::DoRead(ssize_t nread, eof_ = true; } - OnRead(nread, nullptr); + HandleScope handle_scope(env()->isolate()); + Context::Scope context_scope(env()->context()); + Local arg = Integer::New(env()->isolate(), nread); + wrap()->MakeCallback(env()->onread_string(), 1, &arg); return; } @@ -673,19 +611,19 @@ void TLSWrap::DoRead(ssize_t nread, } -int TLSWrap::DoShutdown(ShutdownWrap* req_wrap) { +int TLSCallbacks::DoShutdown(ShutdownWrap* req_wrap, uv_shutdown_cb cb) { if (SSL_shutdown(ssl_) == 0) SSL_shutdown(ssl_); shutdown_ = true; EncOut(); - return stream_->DoShutdown(req_wrap); + return StreamWrapCallbacks::DoShutdown(req_wrap, cb); } -void TLSWrap::SetVerifyMode(const FunctionCallbackInfo& args) { +void TLSCallbacks::SetVerifyMode(const FunctionCallbackInfo& args) { Environment* env = Environment::GetCurrent(args); - TLSWrap* wrap = Unwrap(args.Holder()); + TLSCallbacks* wrap = Unwrap(args.Holder()); if (args.Length() < 2 || !args[0]->IsBoolean() || !args[1]->IsBoolean()) return env->ThrowTypeError("Bad arguments, expected two booleans"); @@ -712,34 +650,34 @@ void TLSWrap::SetVerifyMode(const FunctionCallbackInfo& args) { } -void TLSWrap::EnableSessionCallbacks( +void TLSCallbacks::EnableSessionCallbacks( const FunctionCallbackInfo& args) { - TLSWrap* wrap = Unwrap(args.Holder()); + TLSCallbacks* wrap = Unwrap(args.Holder()); wrap->enable_session_callbacks(); EnableHelloParser(args); } -void TLSWrap::EnableHelloParser(const FunctionCallbackInfo& args) { - TLSWrap* wrap = Unwrap(args.Holder()); +void TLSCallbacks::EnableHelloParser(const FunctionCallbackInfo& args) { + TLSCallbacks* wrap = Unwrap(args.Holder()); NodeBIO::FromBIO(wrap->enc_in_)->set_initial(kMaxHelloLength); - wrap->hello_parser_.Start(SSLWrap::OnClientHello, + wrap->hello_parser_.Start(SSLWrap::OnClientHello, OnClientHelloParseEnd, wrap); } -void TLSWrap::OnClientHelloParseEnd(void* arg) { - TLSWrap* c = static_cast(arg); +void TLSCallbacks::OnClientHelloParseEnd(void* arg) { + TLSCallbacks* c = static_cast(arg); c->Cycle(); } #ifdef SSL_CTRL_SET_TLSEXT_SERVERNAME_CB -void TLSWrap::GetServername(const FunctionCallbackInfo& args) { +void TLSCallbacks::GetServername(const FunctionCallbackInfo& args) { Environment* env = Environment::GetCurrent(args); - TLSWrap* wrap = Unwrap(args.Holder()); + TLSCallbacks* wrap = Unwrap(args.Holder()); const char* servername = SSL_get_servername(wrap->ssl_, TLSEXT_NAMETYPE_host_name); @@ -751,10 +689,10 @@ void TLSWrap::GetServername(const FunctionCallbackInfo& args) { } -void TLSWrap::SetServername(const FunctionCallbackInfo& args) { +void TLSCallbacks::SetServername(const FunctionCallbackInfo& args) { Environment* env = Environment::GetCurrent(args); - TLSWrap* wrap = Unwrap(args.Holder()); + TLSCallbacks* wrap = Unwrap(args.Holder()); if (args.Length() < 1 || !args[0]->IsString()) return env->ThrowTypeError("First argument should be a string"); @@ -772,8 +710,8 @@ void TLSWrap::SetServername(const FunctionCallbackInfo& args) { } -int TLSWrap::SelectSNIContextCallback(SSL* s, int* ad, void* arg) { - TLSWrap* p = static_cast(SSL_get_app_data(s)); +int TLSCallbacks::SelectSNIContextCallback(SSL* s, int* ad, void* arg) { + TLSCallbacks* p = static_cast(SSL_get_app_data(s)); Environment* env = p->env(); const char* servername = SSL_get_servername(s, TLSEXT_NAMETYPE_host_name); @@ -781,6 +719,7 @@ int TLSWrap::SelectSNIContextCallback(SSL* s, int* ad, void* arg) { if (servername == nullptr) return SSL_TLSEXT_ERR_OK; + HandleScope scope(env->isolate()); // Call the SNI callback and use its return value as context Local object = p->object(); Local ctx = object->Get(env->sni_context_string()); @@ -808,12 +747,12 @@ int TLSWrap::SelectSNIContextCallback(SSL* s, int* ad, void* arg) { #endif // SSL_CTRL_SET_TLSEXT_SERVERNAME_CB -void TLSWrap::Initialize(Handle target, - Handle unused, - Handle context) { +void TLSCallbacks::Initialize(Handle target, + Handle unused, + Handle context) { Environment* env = Environment::GetCurrent(context); - env->SetMethod(target, "wrap", TLSWrap::Wrap); + env->SetMethod(target, "wrap", TLSCallbacks::Wrap); Local t = FunctionTemplate::New(env->isolate()); t->InstanceTemplate()->SetInternalFieldCount(1); @@ -825,21 +764,16 @@ void TLSWrap::Initialize(Handle target, env->SetProtoMethod(t, "enableSessionCallbacks", EnableSessionCallbacks); env->SetProtoMethod(t, "enableHelloParser", EnableHelloParser); - StreamBase::AddMethods(env, t); - SSLWrap::AddMethods(env, t); + SSLWrap::AddMethods(env, t); #ifdef SSL_CTRL_SET_TLSEXT_SERVERNAME_CB env->SetProtoMethod(t, "getServername", GetServername); env->SetProtoMethod(t, "setServername", SetServername); #endif // SSL_CRT_SET_TLSEXT_SERVERNAME_CB - env->set_tls_wrap_constructor_template(t); env->set_tls_wrap_constructor_function(t->GetFunction()); - - target->Set(FIXED_ONE_BYTE_STRING(env->isolate(), "TLSWrap"), - t->GetFunction()); } } // namespace node -NODE_MODULE_CONTEXT_AWARE_BUILTIN(tls_wrap, node::TLSWrap::Initialize) +NODE_MODULE_CONTEXT_AWARE_BUILTIN(tls_wrap, node::TLSCallbacks::Initialize) diff --git a/src/tls_wrap.h b/src/tls_wrap.h index 9f095355bb58bd..3815878d586c15 100644 --- a/src/tls_wrap.h +++ b/src/tls_wrap.h @@ -21,32 +21,33 @@ namespace crypto { class SecureContext; } -class TLSWrap : public crypto::SSLWrap, - public StreamBase, - public AsyncWrap { +class TLSCallbacks : public crypto::SSLWrap, + public StreamWrapCallbacks, + public AsyncWrap { public: - ~TLSWrap() override; + ~TLSCallbacks() override; static void Initialize(v8::Handle target, v8::Handle unused, v8::Handle context); - void* Cast() override; - int GetFD() override; - bool IsAlive() override; - bool IsClosing() override; - - // JavaScript functions - int ReadStart() override; - int ReadStop() override; - - int DoShutdown(ShutdownWrap* req_wrap) override; + const char* Error() const override; + void ClearError() override; + int TryWrite(uv_buf_t** bufs, size_t* count) override; int DoWrite(WriteWrap* w, uv_buf_t* bufs, size_t count, - uv_stream_t* send_handle) override; - const char* Error() const override; - void ClearError() override; + uv_stream_t* send_handle, + uv_write_cb cb) override; + void AfterWrite(WriteWrap* w) override; + void DoAlloc(uv_handle_t* handle, + size_t suggested_size, + uv_buf_t* buf) override; + void DoRead(uv_stream_t* handle, + ssize_t nread, + const uv_buf_t* buf, + uv_handle_type pending) override; + int DoShutdown(ShutdownWrap* req_wrap, uv_shutdown_cb cb) override; void NewSessionDoneCb(); @@ -65,25 +66,27 @@ class TLSWrap : public crypto::SSLWrap, // Write callback queue's item class WriteItem { public: - explicit WriteItem(WriteWrap* w) : w_(w) { + WriteItem(WriteWrap* w, uv_write_cb cb) : w_(w), cb_(cb) { } ~WriteItem() { w_ = nullptr; + cb_ = nullptr; } WriteWrap* w_; + uv_write_cb cb_; ListNode member_; }; - TLSWrap(Environment* env, - Kind kind, - StreamBase* stream, - crypto::SecureContext* sc); + TLSCallbacks(Environment* env, + Kind kind, + v8::Handle sc, + StreamWrapCallbacks* old); static void SSLInfoCallback(const SSL* ssl_, int where, int ret); void InitSSL(); void EncOut(); - static void EncOutCb(WriteWrap* req_wrap, int status); + static void EncOutCb(uv_write_t* req, int status); bool ClearIn(); void ClearOut(); void MakePending(); @@ -101,25 +104,6 @@ class TLSWrap : public crypto::SSLWrap, } } - AsyncWrap* GetAsyncWrap() override; - bool IsIPCPipe() override; - - // Resource implementation - static void OnAfterWriteImpl(WriteWrap* w, void* ctx); - static void OnAllocImpl(size_t size, uv_buf_t* buf, void* ctx); - static void OnReadImpl(ssize_t nread, - const uv_buf_t* buf, - uv_handle_type pending, - void* ctx); - static void OnAfterWriteSelf(WriteWrap* w, void* ctx); - static void OnAllocSelf(size_t size, uv_buf_t* buf, void* ctx); - static void OnReadSelf(ssize_t nread, - const uv_buf_t* buf, - uv_handle_type pending, - void* ctx); - - void DoRead(ssize_t nread, const uv_buf_t* buf, uv_handle_type pending); - // If |msg| is not nullptr, caller is responsible for calling `delete[] *msg`. v8::Local GetSSLError(int status, int* err, const char** msg); @@ -140,10 +124,11 @@ class TLSWrap : public crypto::SSLWrap, #endif // SSL_CTRL_SET_TLSEXT_SERVERNAME_CB crypto::SecureContext* sc_; - StreamBase* stream_; + v8::Persistent sc_handle_; BIO* enc_in_; BIO* enc_out_; NodeBIO* clear_in_; + uv_write_t write_req_; size_t write_size_; size_t write_queue_size_; typedef ListHead WriteItemList; diff --git a/src/tty_wrap.cc b/src/tty_wrap.cc index eaec271937530e..08c50d911f7482 100644 --- a/src/tty_wrap.cc +++ b/src/tty_wrap.cc @@ -36,10 +36,26 @@ void TTYWrap::Initialize(Handle target, t->SetClassName(FIXED_ONE_BYTE_STRING(env->isolate(), "TTY")); t->InstanceTemplate()->SetInternalFieldCount(1); + enum PropertyAttribute attributes = + static_cast(v8::ReadOnly | v8::DontDelete); + t->InstanceTemplate()->SetAccessor(env->fd_string(), + StreamWrap::GetFD, + nullptr, + Handle(), + v8::DEFAULT, + attributes); + env->SetProtoMethod(t, "close", HandleWrap::Close); env->SetProtoMethod(t, "unref", HandleWrap::Unref); - StreamWrap::AddMethods(env, t, StreamBase::kFlagNoShutdown); + env->SetProtoMethod(t, "readStart", StreamWrap::ReadStart); + env->SetProtoMethod(t, "readStop", StreamWrap::ReadStop); + + env->SetProtoMethod(t, "writeBuffer", StreamWrap::WriteBuffer); + env->SetProtoMethod(t, "writeAsciiString", StreamWrap::WriteAsciiString); + env->SetProtoMethod(t, "writeUtf8String", StreamWrap::WriteUtf8String); + env->SetProtoMethod(t, "writeUcs2String", StreamWrap::WriteUcs2String); + env->SetProtoMethod(t, "writeBinaryString", StreamWrap::WriteBinaryString); env->SetProtoMethod(t, "getWindowSize", TTYWrap::GetWindowSize); env->SetProtoMethod(t, "setRawMode", SetRawMode); diff --git a/test/parallel/test-regress-GH-io-1068.js b/test/parallel/test-regress-GH-io-1068.js deleted file mode 100644 index e769e6b81a880d..00000000000000 --- a/test/parallel/test-regress-GH-io-1068.js +++ /dev/null @@ -1 +0,0 @@ -process.stdin.emit('end'); diff --git a/test/parallel/test-tls-client-default-ciphers.js b/test/parallel/test-tls-client-default-ciphers.js index e1422bac6f3662..ad3e7c7a607a5e 100644 --- a/test/parallel/test-tls-client-default-ciphers.js +++ b/test/parallel/test-tls-client-default-ciphers.js @@ -7,21 +7,13 @@ if (!common.hasCrypto) { } var tls = require('tls'); -function Done() {} - function test1() { var ciphers = ''; - tls.createSecureContext = function(options) { - ciphers = options.ciphers; - throw new Done(); - } - - try { - var s = tls.connect(common.PORT); - } catch (e) { - assert(e instanceof Done); + ciphers = options.ciphers } + var s = tls.connect(common.PORT); + s.destroy(); assert.equal(ciphers, tls.DEFAULT_CIPHERS); } test1(); diff --git a/test/parallel/test-tls-close-notify.js b/test/parallel/test-tls-close-notify.js index 3960a62694042e..1f74182cb00232 100644 --- a/test/parallel/test-tls-close-notify.js +++ b/test/parallel/test-tls-close-notify.js @@ -17,8 +17,8 @@ var server = tls.createServer({ cert: fs.readFileSync(common.fixturesDir + '/keys/agent1-cert.pem') }, function(c) { // Send close-notify without shutting down TCP socket - if (c._handle.shutdownSSL() !== 1) - c._handle.shutdownSSL(); + if (c.ssl.shutdown() !== 1) + c.ssl.shutdown(); }).listen(common.PORT, function() { var c = tls.connect(common.PORT, { rejectUnauthorized: false diff --git a/test/parallel/test-tls-js-stream.js b/test/parallel/test-tls-js-stream.js deleted file mode 100644 index 12c3381cb4fd60..00000000000000 --- a/test/parallel/test-tls-js-stream.js +++ /dev/null @@ -1,74 +0,0 @@ -var common = require('../common'); -var assert = require('assert'); - -if (!common.hasCrypto) { - console.log('1..0 # Skipped: missing crypto'); - process.exit(); -} -var tls = require('tls'); - -var stream = require('stream'); -var fs = require('fs'); -var net = require('net'); - -var connected = { - client: 0, - server: 0 -}; - -var server = tls.createServer({ - key: fs.readFileSync(common.fixturesDir + '/keys/agent1-key.pem'), - cert: fs.readFileSync(common.fixturesDir + '/keys/agent1-cert.pem') -}, function(c) { - console.log('new client'); - connected.server++; - c.end('ohai'); -}).listen(common.PORT, function() { - var raw = net.connect(common.PORT); - - var pending = false; - raw.on('readable', function() { - if (pending) - p._read(); - }); - - var p = new stream.Duplex({ - read: function read() { - pending = false; - - var chunk = raw.read(); - if (chunk) { - console.log('read', chunk); - this.push(chunk); - } else { - pending = true; - } - }, - write: function write(data, enc, cb) { - console.log('write', data, enc); - raw.write(data, enc, cb); - } - }); - - var socket = tls.connect({ - socket: p, - rejectUnauthorized: false - }, function() { - console.log('client secure'); - - connected.client++; - - socket.end('hello'); - socket.resume(); - }); - - socket.once('close', function() { - console.log('client close'); - server.close(); - }); -}); - -process.once('exit', function() { - assert.equal(connected.client, 1); - assert.equal(connected.server, 1); -}); diff --git a/test/parallel/test-tls-multi-key.js b/test/parallel/test-tls-multi-key.js index 85ff5e808cfe16..ac9556d14fd91a 100644 --- a/test/parallel/test-tls-multi-key.js +++ b/test/parallel/test-tls-multi-key.js @@ -28,14 +28,15 @@ var server = tls.createServer(options, function(conn) { ciphers: 'ECDHE-ECDSA-AES256-GCM-SHA384', rejectUnauthorized: false }, function() { - ciphers.push(ecdsa.getCipher()); var rsa = tls.connect(common.PORT, { ciphers: 'ECDHE-RSA-AES256-GCM-SHA384', rejectUnauthorized: false }, function() { - ciphers.push(rsa.getCipher()); ecdsa.destroy(); rsa.destroy(); + + ciphers.push(ecdsa.getCipher()); + ciphers.push(rsa.getCipher()); server.close(); }); }); diff --git a/test/parallel/test-tls-on-empty-socket.js b/test/parallel/test-tls-on-empty-socket.js deleted file mode 100644 index f4866701e23eb5..00000000000000 --- a/test/parallel/test-tls-on-empty-socket.js +++ /dev/null @@ -1,41 +0,0 @@ -var common = require('../common'); -var assert = require('assert'); - -if (!common.hasCrypto) { - console.log('1..0 # Skipped: missing crypto'); - process.exit(); -} -var tls = require('tls'); - -var fs = require('fs'); -var net = require('net'); - -var out = ''; - -var server = tls.createServer({ - key: fs.readFileSync(common.fixturesDir + '/keys/agent1-key.pem'), - cert: fs.readFileSync(common.fixturesDir + '/keys/agent1-cert.pem') -}, function(c) { - c.end('hello'); -}).listen(common.PORT, function() { - var socket = new net.Socket(); - - var s = tls.connect({ - socket: socket, - rejectUnauthorized: false - }, function() { - s.on('data', function(chunk) { - out += chunk; - }); - s.on('end', function() { - s.destroy(); - server.close(); - }); - }); - - socket.connect(common.PORT); -}); - -process.on('exit', function() { - assert.equal(out, 'hello'); -});