From 0fb8a8a7d6cb42a1eba755423e225f94c1c04429 Mon Sep 17 00:00:00 2001 From: Mathias Buus Date: Mon, 29 Jan 2018 19:32:34 +0100 Subject: [PATCH 01/15] updated streams error handling --- lib/_stream_duplex.js | 7 ------- lib/_stream_readable.js | 9 +++++++-- lib/_stream_transform.js | 3 ++- lib/_stream_writable.js | 10 ++++++++-- lib/fs.js | 7 +++++++ lib/internal/errors.js | 1 + lib/internal/http2/core.js | 5 +++++ lib/internal/streams/destroy.js | 15 +++++++++++++++ lib/net.js | 4 ++++ 9 files changed, 49 insertions(+), 12 deletions(-) diff --git a/lib/_stream_duplex.js b/lib/_stream_duplex.js index 59ce83292789b5..1ccb931260ddbd 100644 --- a/lib/_stream_duplex.js +++ b/lib/_stream_duplex.js @@ -135,10 +135,3 @@ Object.defineProperty(Duplex.prototype, 'destroyed', { this._writableState.destroyed = value; } }); - -Duplex.prototype._destroy = function(err, cb) { - this.push(null); - this.end(); - - process.nextTick(cb, err); -}; diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index ba231ccda903c1..16abd893e32347 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -106,6 +106,9 @@ function ReadableState(options, stream) { this.readableListening = false; this.resumeScheduled = false; + // True if the close was already emitted and should not be emitted again + this.closeEmitted = false; + // has it been destroyed this.destroyed = false; @@ -177,7 +180,6 @@ Object.defineProperty(Readable.prototype, 'destroyed', { Readable.prototype.destroy = destroyImpl.destroy; Readable.prototype._undestroy = destroyImpl.undestroy; Readable.prototype._destroy = function(err, cb) { - this.push(null); cb(err); }; @@ -236,6 +238,8 @@ function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) { addChunk(stream, state, chunk, true); } else if (state.ended) { stream.emit('error', new errors.Error('ERR_STREAM_PUSH_AFTER_EOF')); + } else if (state.destroyed) { + stream.emit('error', new errors.Error('ERR_STREAM_PUSH_AFTER_DESTROY')); } else { state.reading = false; if (state.decoder && !encoding) { @@ -439,7 +443,8 @@ Readable.prototype.read = function(n) { if (state.length === 0) state.needReadable = true; // call internal read method - this._read(state.highWaterMark); + if (!state.destroyed) + this._read(state.highWaterMark); state.sync = false; // If _read pushed data synchronously, then `reading` will be false, // and we need to re-evaluate how much data we can return to the user. diff --git a/lib/_stream_transform.js b/lib/_stream_transform.js index a9fcddda2d9c83..7a47ac3ab4406d 100644 --- a/lib/_stream_transform.js +++ b/lib/_stream_transform.js @@ -132,6 +132,8 @@ function Transform(options) { } function prefinish() { + if (this._readableState.destroyed) + return; if (typeof this._flush === 'function') { this._flush((er, data) => { done(this, er, data); @@ -194,7 +196,6 @@ Transform.prototype._read = function(n) { Transform.prototype._destroy = function(err, cb) { Duplex.prototype._destroy.call(this, err, (err2) => { cb(err2); - this.emit('close'); }); }; diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 2b7658813599e5..a0bfb2dda15d59 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -134,6 +134,9 @@ function WritableState(options, stream) { // True if the error was already emitted and should not be thrown again this.errorEmitted = false; + // True if the close was already emitted and should not be emitted again + this.closeEmitted = false; + // count buffered requests this.bufferedRequestCount = 0; @@ -390,7 +393,9 @@ function doWrite(stream, state, writev, len, chunk, encoding, cb) { state.writecb = cb; state.writing = true; state.sync = true; - if (writev) + if (state.destroyed) + state.onwrite(new errors.Error('ERR_STREAM_DESTROYED')); + else if (writev) stream._writev(chunk, state.onwrite); else stream._write(chunk, encoding, state.onwrite); @@ -592,6 +597,8 @@ function needFinish(state) { !state.writing); } function callFinal(stream, state) { + if (state.destroyed) + return; stream._final((err) => { state.pendingcb--; if (err) { @@ -681,6 +688,5 @@ Object.defineProperty(Writable.prototype, 'destroyed', { Writable.prototype.destroy = destroyImpl.destroy; Writable.prototype._undestroy = destroyImpl.undestroy; Writable.prototype._destroy = function(err, cb) { - this.end(); cb(err); }; diff --git a/lib/fs.js b/lib/fs.js index 3771efad10d762..5c9ba4d83ff058 100644 --- a/lib/fs.js +++ b/lib/fs.js @@ -2048,6 +2048,13 @@ ReadStream.prototype._read = function(n) { }; ReadStream.prototype._destroy = function(err, cb) { + // Do not emit close on stream destruction + // as we do it below to preserve backward compat. + if (this._readableState) + this._readableState.closeEmitted = true; + if (this._writableState) + this._writableState.closeEmitted = true; + const isOpen = typeof this.fd !== 'number'; if (isOpen) { this.once('open', closeFsStream.bind(null, this, cb, err)); diff --git a/lib/internal/errors.js b/lib/internal/errors.js index a4a79d671e4938..01b5fbbabbf7e3 100644 --- a/lib/internal/errors.js +++ b/lib/internal/errors.js @@ -843,6 +843,7 @@ E('ERR_SOCKET_DGRAM_NOT_RUNNING', 'Not running', Error); E('ERR_STDERR_CLOSE', 'process.stderr cannot be closed', Error); E('ERR_STDOUT_CLOSE', 'process.stdout cannot be closed', Error); E('ERR_STREAM_CANNOT_PIPE', 'Cannot pipe, not readable', Error); +E('ERR_STREAM_DESTROYED', 'stream.destroy() was called'); E('ERR_STREAM_NULL_VALUES', 'May not write null values to stream', TypeError); E('ERR_STREAM_PUSH_AFTER_EOF', 'stream.push() after EOF', Error); E('ERR_STREAM_READ_NOT_IMPLEMENTED', '_read() is not implemented', Error); diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js index 71bb55ee23c89f..f9fff02de61962 100644 --- a/lib/internal/http2/core.js +++ b/lib/internal/http2/core.js @@ -1495,6 +1495,11 @@ class Http2Stream extends Duplex { this.on('resume', streamOnResume); this.on('pause', streamOnPause); + + // Set close emitted, so the stream destruction does not + // emit them + this._readableState.closeEmitted = true; + this._writableState.closeEmitted = true; } [kUpdateTimer]() { diff --git a/lib/internal/streams/destroy.js b/lib/internal/streams/destroy.js index 985332ac4607a8..5a9c6b26b9219c 100644 --- a/lib/internal/streams/destroy.js +++ b/lib/internal/streams/destroy.js @@ -30,6 +30,7 @@ function destroy(err, cb) { } this._destroy(err || null, (err) => { + process.nextTick(emitCloseNT, this); if (!cb && err) { process.nextTick(emitErrorNT, this, err); if (this._writableState) { @@ -43,6 +44,20 @@ function destroy(err, cb) { return this; } +function emitCloseNT(self) { + if (self._writableState) { + if (self._writableState.closeEmitted) + return; + self._writableState.closeEmitted = true; + } + if (self._readableState) { + if (self._readableState.closeEmitted) + return; + self._readableState.closeEmitted = true; + } + self.emit('close'); +} + function undestroy() { if (this._readableState) { this._readableState.destroyed = false; diff --git a/lib/net.js b/lib/net.js index 7583fcb27d1064..7083c92b3dbf54 100644 --- a/lib/net.js +++ b/lib/net.js @@ -552,6 +552,10 @@ Socket.prototype.destroySoon = function() { Socket.prototype._destroy = function(exception, cb) { debug('destroy'); + // Do not emit close on stream destruction + // as we do it here below to preserve backwards compat. + this._readableState.closeEmitted = true; + this.connecting = false; this.readable = this.writable = false; From ee661a3b34c8b88c71b8523bb5a3b7bb3ddd74d6 Mon Sep 17 00:00:00 2001 From: Mathias Buus Date: Mon, 29 Jan 2018 19:32:02 +0100 Subject: [PATCH 02/15] update stream.destroy() tests --- test/parallel/test-net-socket-destroy-send.js | 8 ++++---- test/parallel/test-stream-duplex-destroy.js | 14 ++++++++------ test/parallel/test-stream-readable-destroy.js | 9 +++++++-- test/parallel/test-stream-transform-destroy.js | 16 ++++++++-------- test/parallel/test-stream-writable-destroy.js | 13 ++++++++++--- 5 files changed, 37 insertions(+), 23 deletions(-) diff --git a/test/parallel/test-net-socket-destroy-send.js b/test/parallel/test-net-socket-destroy-send.js index a602b89253887d..881793e5e49a5e 100644 --- a/test/parallel/test-net-socket-destroy-send.js +++ b/test/parallel/test-net-socket-destroy-send.js @@ -13,14 +13,14 @@ server.listen(0, common.mustCall(function() { // Test destroy returns this, even on multiple calls when it short-circuits. assert.strictEqual(conn, conn.destroy().destroy()); conn.on('error', common.expectsError({ - code: 'ERR_SOCKET_CLOSED', - message: 'Socket is closed', + code: 'ERR_STREAM_DESTROYED', + message: 'stream.destroy() was called', type: Error })); conn.write(Buffer.from('kaboom'), common.expectsError({ - code: 'ERR_SOCKET_CLOSED', - message: 'Socket is closed', + code: 'ERR_STREAM_DESTROYED', + message: 'stream.destroy() was called', type: Error })); server.close(); diff --git a/test/parallel/test-stream-duplex-destroy.js b/test/parallel/test-stream-duplex-destroy.js index 00e334d64b5693..854d29ffc13049 100644 --- a/test/parallel/test-stream-duplex-destroy.js +++ b/test/parallel/test-stream-duplex-destroy.js @@ -13,8 +13,9 @@ const { inherits } = require('util'); duplex.resume(); - duplex.on('end', common.mustCall()); - duplex.on('finish', common.mustCall()); + duplex.on('end', common.mustNotCall()); + duplex.on('finish', common.mustNotCall()); + duplex.on('close', common.mustCall()); duplex.destroy(); assert.strictEqual(duplex.destroyed, true); @@ -29,8 +30,8 @@ const { inherits } = require('util'); const expected = new Error('kaboom'); - duplex.on('end', common.mustCall()); - duplex.on('finish', common.mustCall()); + duplex.on('end', common.mustNotCall()); + duplex.on('finish', common.mustNotCall()); duplex.on('error', common.mustCall((err) => { assert.strictEqual(err, expected); })); @@ -78,6 +79,7 @@ const { inherits } = require('util'); // error is swallowed by the custom _destroy duplex.on('error', common.mustNotCall('no error event')); + duplex.on('close', common.mustCall()); duplex.destroy(expected); assert.strictEqual(duplex.destroyed, true); @@ -159,8 +161,8 @@ const { inherits } = require('util'); }); duplex.resume(); - duplex.on('finish', common.mustCall()); - duplex.on('end', common.mustCall()); + duplex.on('finish', common.mustNotCall()); + duplex.on('end', common.mustNotCall()); duplex.destroy(); assert.strictEqual(duplex.destroyed, true); diff --git a/test/parallel/test-stream-readable-destroy.js b/test/parallel/test-stream-readable-destroy.js index def20d26c34080..4e11e76ece01ac 100644 --- a/test/parallel/test-stream-readable-destroy.js +++ b/test/parallel/test-stream-readable-destroy.js @@ -11,7 +11,7 @@ const { inherits } = require('util'); }); read.resume(); - read.on('end', common.mustCall()); + read.on('close', common.mustCall()); read.destroy(); assert.strictEqual(read.destroyed, true); @@ -25,7 +25,8 @@ const { inherits } = require('util'); const expected = new Error('kaboom'); - read.on('end', common.mustCall()); + read.on('end', common.mustNotCall('no end event')); + read.on('close', common.mustCall()); read.on('error', common.mustCall((err) => { assert.strictEqual(err, expected); })); @@ -47,6 +48,7 @@ const { inherits } = require('util'); const expected = new Error('kaboom'); read.on('end', common.mustNotCall('no end event')); + read.on('close', common.mustCall()); read.on('error', common.mustCall((err) => { assert.strictEqual(err, expected); })); @@ -70,6 +72,7 @@ const { inherits } = require('util'); // error is swallowed by the custom _destroy read.on('error', common.mustNotCall('no error event')); + read.on('close', common.mustCall()); read.destroy(expected); assert.strictEqual(read.destroyed, true); @@ -106,6 +109,7 @@ const { inherits } = require('util'); const fail = common.mustNotCall('no end event'); read.on('end', fail); + read.on('close', common.mustCall()); read.destroy(); @@ -170,6 +174,7 @@ const { inherits } = require('util'); const expected = new Error('kaboom'); + read.on('close', common.mustCall()); read.destroy(expected, common.mustCall(function(err) { assert.strictEqual(expected, err); })); diff --git a/test/parallel/test-stream-transform-destroy.js b/test/parallel/test-stream-transform-destroy.js index c42fe1d6f96d08..47cce87264b5c1 100644 --- a/test/parallel/test-stream-transform-destroy.js +++ b/test/parallel/test-stream-transform-destroy.js @@ -11,9 +11,9 @@ const assert = require('assert'); transform.resume(); - transform.on('end', common.mustCall()); + transform.on('end', common.mustNotCall()); transform.on('close', common.mustCall()); - transform.on('finish', common.mustCall()); + transform.on('finish', common.mustNotCall()); transform.destroy(); } @@ -26,8 +26,8 @@ const assert = require('assert'); const expected = new Error('kaboom'); - transform.on('end', common.mustCall()); - transform.on('finish', common.mustCall()); + transform.on('end', common.mustNotCall()); + transform.on('finish', common.mustNotCall()); transform.on('close', common.mustCall()); transform.on('error', common.mustCall((err) => { assert.strictEqual(err, expected); @@ -49,7 +49,7 @@ const assert = require('assert'); const expected = new Error('kaboom'); transform.on('finish', common.mustNotCall('no finish event')); - transform.on('close', common.mustNotCall('no close event')); + transform.on('close', common.mustCall()); transform.on('error', common.mustCall((err) => { assert.strictEqual(err, expected); })); @@ -69,7 +69,7 @@ const assert = require('assert'); transform.resume(); transform.on('end', common.mustNotCall('no end event')); - transform.on('close', common.mustNotCall('no close event')); + transform.on('close', common.mustCall()); transform.on('finish', common.mustNotCall('no finish event')); // error is swallowed by the custom _destroy @@ -110,7 +110,7 @@ const assert = require('assert'); transform.on('finish', fail); transform.on('end', fail); - transform.on('close', fail); + transform.on('close', common.mustCall()); transform.destroy(); @@ -132,7 +132,7 @@ const assert = require('assert'); cb(expected); }, 1); - transform.on('close', common.mustNotCall('no close event')); + transform.on('close', common.mustCall()); transform.on('finish', common.mustNotCall('no finish event')); transform.on('end', common.mustNotCall('no end event')); transform.on('error', common.mustCall((err) => { diff --git a/test/parallel/test-stream-writable-destroy.js b/test/parallel/test-stream-writable-destroy.js index 46c48511177813..565a5564e2bc29 100644 --- a/test/parallel/test-stream-writable-destroy.js +++ b/test/parallel/test-stream-writable-destroy.js @@ -10,7 +10,8 @@ const { inherits } = require('util'); write(chunk, enc, cb) { cb(); } }); - write.on('finish', common.mustCall()); + write.on('finish', common.mustNotCall()); + write.on('close', common.mustCall()); write.destroy(); assert.strictEqual(write.destroyed, true); @@ -23,7 +24,8 @@ const { inherits } = require('util'); const expected = new Error('kaboom'); - write.on('finish', common.mustCall()); + write.on('finish', common.mustNotCall()); + write.on('close', common.mustCall()); write.on('error', common.mustCall((err) => { assert.strictEqual(err, expected); })); @@ -45,6 +47,7 @@ const { inherits } = require('util'); const expected = new Error('kaboom'); write.on('finish', common.mustNotCall('no finish event')); + write.on('close', common.mustCall()); write.on('error', common.mustCall((err) => { assert.strictEqual(err, expected); })); @@ -65,6 +68,7 @@ const { inherits } = require('util'); const expected = new Error('kaboom'); write.on('finish', common.mustNotCall('no finish event')); + write.on('close', common.mustCall()); // error is swallowed by the custom _destroy write.on('error', common.mustNotCall('no error event')); @@ -103,6 +107,7 @@ const { inherits } = require('util'); const fail = common.mustNotCall('no finish event'); write.on('finish', fail); + write.on('close', common.mustCall()); write.destroy(); @@ -123,6 +128,7 @@ const { inherits } = require('util'); cb(expected); }); + write.on('close', common.mustCall()); write.on('finish', common.mustNotCall('no finish event')); write.on('error', common.mustCall((err) => { assert.strictEqual(err, expected); @@ -138,6 +144,7 @@ const { inherits } = require('util'); write(chunk, enc, cb) { cb(); } }); + write.on('close', common.mustCall()); write.on('error', common.mustCall()); write.destroy(new Error('kaboom 1')); @@ -155,7 +162,7 @@ const { inherits } = require('util'); assert.strictEqual(write.destroyed, true); // the internal destroy() mechanism should not be triggered - write.on('finish', common.mustNotCall()); + write.on('close', common.mustNotCall()); write.destroy(); } From e0bb28e2a3bca90b07a5f8694f51ec520102471c Mon Sep 17 00:00:00 2001 From: Mathias Buus Date: Mon, 29 Jan 2018 19:32:57 +0100 Subject: [PATCH 03/15] document updated error handling --- doc/api/errors.md | 6 ++++++ doc/api/stream.md | 11 +++++++---- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/doc/api/errors.md b/doc/api/errors.md index b9c52ad0d9af89..035f1d1a32ad37 100644 --- a/doc/api/errors.md +++ b/doc/api/errors.md @@ -1449,6 +1449,12 @@ An unspecified or non-specific system error has occurred within the Node.js process. The error object will have an `err.info` object property with additional details. + +### ERR_STREAM_DESTROYED + +A stream method was called that cannot complete cause the stream was been +destroyed using `stream.destroy()`. + ### ERR_TLS_CERT_ALTNAME_INVALID diff --git a/doc/api/stream.md b/doc/api/stream.md index 5db990d4d2cc3d..06e823ef3b4649 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -543,8 +543,10 @@ added: v8.0.0 * Returns: {this} -Destroy the stream, and emit the passed error. After this call, the -writable stream has ended. Implementors should not override this method, +Destroy the stream, and emit the passed error and a close event. +After this call, the writable stream has ended and subsequent calls +to `write` / `end` will give an `ERR_STREAM_DESTROYED` error. +Implementors should not override this method, but instead implement [`writable._destroy`][writable-_destroy]. ### Readable Streams @@ -1167,8 +1169,9 @@ myReader.on('readable', () => { added: v8.0.0 --> -Destroy the stream, and emit `'error'`. After this call, the -readable stream will release any internal resources. +Destroy the stream, and emit `'error'` and `close`. After this call, the +readable stream will release any internal resources and subsequent calls +to `read` will result in an `ERR_STREAM_DESTROYED` error. Implementors should not override this method, but instead implement [`readable._destroy`][readable-_destroy]. From fe7886e1a85eb41e39ed53d1d0cbb92d652a5629 Mon Sep 17 00:00:00 2001 From: Mathias Buus Date: Tue, 30 Jan 2018 13:32:35 +0100 Subject: [PATCH 04/15] use .destroy() in zlib to emit close --- lib/zlib.js | 6 +----- test/parallel/test-zlib-write-after-close.js | 4 ++-- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/lib/zlib.js b/lib/zlib.js index 93f878712add08..55d93bb4d3f192 100644 --- a/lib/zlib.js +++ b/lib/zlib.js @@ -392,7 +392,7 @@ Zlib.prototype.flush = function flush(kind, callback) { Zlib.prototype.close = function close(callback) { _close(this, callback); - process.nextTick(emitCloseNT, this); + this.destroy(); }; Zlib.prototype._transform = function _transform(chunk, encoding, cb) { @@ -603,10 +603,6 @@ function _close(engine, callback) { engine._handle = null; } -function emitCloseNT(self) { - self.emit('close'); -} - // generic zlib // minimal 2-byte header function Deflate(opts) { diff --git a/test/parallel/test-zlib-write-after-close.js b/test/parallel/test-zlib-write-after-close.js index 88d6643da8b994..ac633f97a2620f 100644 --- a/test/parallel/test-zlib-write-after-close.js +++ b/test/parallel/test-zlib-write-after-close.js @@ -29,9 +29,9 @@ zlib.gzip('hello', common.mustCall(function(err, out) { common.expectsError( () => unzip.write(out), { - code: 'ERR_ZLIB_BINDING_CLOSED', + code: 'ERR_STREAM_DESTROYED', type: Error, - message: 'zlib binding closed' + message: 'stream.destroy() was called' } ); })); From d65c0ae0ec149bc50fe3618fb892abd5d888ba13 Mon Sep 17 00:00:00 2001 From: Mathias Buus Date: Tue, 30 Jan 2018 14:22:16 +0100 Subject: [PATCH 05/15] comments by @mcollina --- lib/_stream_readable.js | 9 ++++----- lib/_stream_transform.js | 4 +--- lib/_stream_writable.js | 8 +++----- lib/fs.js | 13 ++++++------- lib/internal/http2/core.js | 6 +----- lib/internal/streams/destroy.js | 14 ++++---------- lib/net.js | 16 ++++++++++++---- 7 files changed, 31 insertions(+), 39 deletions(-) diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 16abd893e32347..542a2eb368c171 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -106,8 +106,8 @@ function ReadableState(options, stream) { this.readableListening = false; this.resumeScheduled = false; - // True if the close was already emitted and should not be emitted again - this.closeEmitted = false; + // Should close be emitted on destroy. Defaults to true. + this.emitClose = options.emitClose !== false; // has it been destroyed this.destroyed = false; @@ -432,7 +432,7 @@ Readable.prototype.read = function(n) { // however, if we've ended, then there's no point, and if we're already // reading, then it's unnecessary. - if (state.ended || state.reading) { + if (state.ended || state.reading || state.destroyed) { doRead = false; debug('reading or ended', doRead); } else if (doRead) { @@ -443,8 +443,7 @@ Readable.prototype.read = function(n) { if (state.length === 0) state.needReadable = true; // call internal read method - if (!state.destroyed) - this._read(state.highWaterMark); + this._read(state.highWaterMark); state.sync = false; // If _read pushed data synchronously, then `reading` will be false, // and we need to re-evaluate how much data we can return to the user. diff --git a/lib/_stream_transform.js b/lib/_stream_transform.js index 7a47ac3ab4406d..b82114ecaecd1d 100644 --- a/lib/_stream_transform.js +++ b/lib/_stream_transform.js @@ -132,9 +132,7 @@ function Transform(options) { } function prefinish() { - if (this._readableState.destroyed) - return; - if (typeof this._flush === 'function') { + if (typeof this._flush === 'function' && !this._readableState.destroyed) { this._flush((er, data) => { done(this, er, data); }); diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index a0bfb2dda15d59..4e908a89145f7a 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -134,8 +134,8 @@ function WritableState(options, stream) { // True if the error was already emitted and should not be thrown again this.errorEmitted = false; - // True if the close was already emitted and should not be emitted again - this.closeEmitted = false; + // Should close be emitted on destroy. Defaults to true. + this.emitClose = options.emitClose !== false; // count buffered requests this.bufferedRequestCount = 0; @@ -597,8 +597,6 @@ function needFinish(state) { !state.writing); } function callFinal(stream, state) { - if (state.destroyed) - return; stream._final((err) => { state.pendingcb--; if (err) { @@ -611,7 +609,7 @@ function callFinal(stream, state) { } function prefinish(stream, state) { if (!state.prefinished && !state.finalCalled) { - if (typeof stream._final === 'function') { + if (typeof stream._final === 'function' && !state.destroyed) { state.pendingcb++; state.finalCalled = true; process.nextTick(callFinal, stream, state); diff --git a/lib/fs.js b/lib/fs.js index 5c9ba4d83ff058..917c3eb3a9f640 100644 --- a/lib/fs.js +++ b/lib/fs.js @@ -1929,6 +1929,9 @@ function ReadStream(path, options) { if (options.highWaterMark === undefined) options.highWaterMark = 64 * 1024; + // for backwards compat do not emit close on destroy. + options.emitClose = false; + Readable.call(this, options); // path will be ignored when fd is specified, so it can be falsy @@ -2048,13 +2051,6 @@ ReadStream.prototype._read = function(n) { }; ReadStream.prototype._destroy = function(err, cb) { - // Do not emit close on stream destruction - // as we do it below to preserve backward compat. - if (this._readableState) - this._readableState.closeEmitted = true; - if (this._writableState) - this._writableState.closeEmitted = true; - const isOpen = typeof this.fd !== 'number'; if (isOpen) { this.once('open', closeFsStream.bind(null, this, cb, err)); @@ -2091,6 +2087,9 @@ function WriteStream(path, options) { options = copyObject(getOptions(options, {})); + // for backwards compat do not emit close on destroy. + options.emitClose = false; + Writable.call(this, options); // path will be ignored when fd is specified, so it can be falsy diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js index f9fff02de61962..f60c6388af6cec 100644 --- a/lib/internal/http2/core.js +++ b/lib/internal/http2/core.js @@ -1475,6 +1475,7 @@ class Http2Stream extends Duplex { constructor(session, options) { options.allowHalfOpen = true; options.decodeStrings = false; + options.emitClose = false; super(options); this[async_id_symbol] = -1; @@ -1495,11 +1496,6 @@ class Http2Stream extends Duplex { this.on('resume', streamOnResume); this.on('pause', streamOnPause); - - // Set close emitted, so the stream destruction does not - // emit them - this._readableState.closeEmitted = true; - this._writableState.closeEmitted = true; } [kUpdateTimer]() { diff --git a/lib/internal/streams/destroy.js b/lib/internal/streams/destroy.js index 5a9c6b26b9219c..5d29e182041cdc 100644 --- a/lib/internal/streams/destroy.js +++ b/lib/internal/streams/destroy.js @@ -45,16 +45,10 @@ function destroy(err, cb) { } function emitCloseNT(self) { - if (self._writableState) { - if (self._writableState.closeEmitted) - return; - self._writableState.closeEmitted = true; - } - if (self._readableState) { - if (self._readableState.closeEmitted) - return; - self._readableState.closeEmitted = true; - } + if (self._writableState && !self._writableState.emitClose) + return; + if (self._readableState && !self._readableState.emitClose) + return; self.emit('close'); } diff --git a/lib/net.js b/lib/net.js index 7083c92b3dbf54..55af1b05ccada9 100644 --- a/lib/net.js +++ b/lib/net.js @@ -84,6 +84,13 @@ const { function noop() {} +function copyObject(source) { + var target = {}; + for (var key in source) + target[key] = source[key]; + return target; +} + function createHandle(fd, is_server) { const type = TTYWrap.guessHandleType(fd); if (type === 'PIPE') { @@ -232,6 +239,11 @@ function Socket(options) { options = { fd: options }; // Legacy interface. else if (options === undefined) options = {}; + else + options = copyObject(options); + + // for backwards compat do not emit close on destroy. + options.emitClose = false; stream.Duplex.call(this, options); @@ -552,10 +564,6 @@ Socket.prototype.destroySoon = function() { Socket.prototype._destroy = function(exception, cb) { debug('destroy'); - // Do not emit close on stream destruction - // as we do it here below to preserve backwards compat. - this._readableState.closeEmitted = true; - this.connecting = false; this.readable = this.writable = false; From 048429d739f5c048855132f617f97bdc2b535ea2 Mon Sep 17 00:00:00 2001 From: Mathias Buus Date: Tue, 30 Jan 2018 20:35:01 +0100 Subject: [PATCH 06/15] fix nit --- lib/net.js | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/lib/net.js b/lib/net.js index 55af1b05ccada9..bd6d72aefe4a3b 100644 --- a/lib/net.js +++ b/lib/net.js @@ -84,13 +84,6 @@ const { function noop() {} -function copyObject(source) { - var target = {}; - for (var key in source) - target[key] = source[key]; - return target; -} - function createHandle(fd, is_server) { const type = TTYWrap.guessHandleType(fd); if (type === 'PIPE') { @@ -240,7 +233,7 @@ function Socket(options) { else if (options === undefined) options = {}; else - options = copyObject(options); + options = Object.assign({}, options); // for backwards compat do not emit close on destroy. options.emitClose = false; From a241c9e624a38c135a6e14e6fd2ee825dca08689 Mon Sep 17 00:00:00 2001 From: Mathias Buus Date: Wed, 31 Jan 2018 10:34:09 +0100 Subject: [PATCH 07/15] document emitClose --- doc/api/stream.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/doc/api/stream.md b/doc/api/stream.md index 06e823ef3b4649..40e999e4d84914 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1398,6 +1398,8 @@ constructor and implement the `writable._write()` method. The it becomes possible to write JavaScript values other than string, `Buffer` or `Uint8Array` if supported by the stream implementation. Defaults to `false` + * `emitClose` {boolean} Whether or not the stream should emit `close` + after it has been destroyed. Defaults to `true` * `write` {Function} Implementation for the [`stream._write()`][stream-_write] method. * `writev` {Function} Implementation for the From 3b543fbdaf23c3357115093840845188956ecc75 Mon Sep 17 00:00:00 2001 From: Mathias Buus Date: Wed, 31 Jan 2018 21:59:37 +0100 Subject: [PATCH 08/15] fix errors docs --- doc/api/errors.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/api/errors.md b/doc/api/errors.md index 035f1d1a32ad37..bc4f8b9653a4b8 100644 --- a/doc/api/errors.md +++ b/doc/api/errors.md @@ -1452,7 +1452,7 @@ additional details. ### ERR_STREAM_DESTROYED -A stream method was called that cannot complete cause the stream was been +A stream method was called that cannot complete because the stream was destroyed using `stream.destroy()`. From 05f46f3ddb58ae49a7c657a0b42c95733517066b Mon Sep 17 00:00:00 2001 From: Mathias Buus Date: Wed, 14 Feb 2018 15:40:46 +0100 Subject: [PATCH 09/15] comments --- doc/api/errors.md | 5 ---- doc/api/stream.md | 10 +++++-- lib/_stream_readable.js | 2 +- lib/_stream_writable.js | 2 +- lib/internal/errors.js | 30 ++++++++++++++++++- lib/net.js | 4 +-- lib/zlib.js | 2 +- test/parallel/test-net-socket-destroy-send.js | 4 +-- test/parallel/test-stream-readable-destroy.js | 13 ++++++++ test/parallel/test-zlib-write-after-close.js | 2 +- 10 files changed, 58 insertions(+), 16 deletions(-) diff --git a/doc/api/errors.md b/doc/api/errors.md index bc4f8b9653a4b8..70ac01de610d6a 100644 --- a/doc/api/errors.md +++ b/doc/api/errors.md @@ -1621,11 +1621,6 @@ The fulfilled value of a linking promise is not a `vm.Module` object. The current module's status does not allow for this operation. The specific meaning of the error depends on the specific function. - -### ERR_ZLIB_BINDING_CLOSED - -An attempt was made to use a `zlib` object after it has already been closed. - ### ERR_ZLIB_INITIALIZATION_FAILED diff --git a/doc/api/stream.md b/doc/api/stream.md index 40e999e4d84914..428f323fa2f6ce 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -543,8 +543,8 @@ added: v8.0.0 * Returns: {this} -Destroy the stream, and emit the passed error and a close event. -After this call, the writable stream has ended and subsequent calls +Destroy the stream, and emit the passed `error` and a `close` event. +After this call, the writable stream has ended and subsequent calls to `write` / `end` will give an `ERR_STREAM_DESTROYED` error. Implementors should not override this method, but instead implement [`writable._destroy`][writable-_destroy]. @@ -1385,6 +1385,12 @@ constructor and implement the `writable._write()` method. The `writable._writev()` method *may* also be implemented. #### Constructor: new stream.Writable([options]) + * `options` {Object} * `highWaterMark` {number} Buffer level when diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 542a2eb368c171..3f80b256e09e86 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -239,7 +239,7 @@ function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) { } else if (state.ended) { stream.emit('error', new errors.Error('ERR_STREAM_PUSH_AFTER_EOF')); } else if (state.destroyed) { - stream.emit('error', new errors.Error('ERR_STREAM_PUSH_AFTER_DESTROY')); + stream.emit('error', new errors.Error('ERR_STREAM_DESTROYED', 'push')); } else { state.reading = false; if (state.decoder && !encoding) { diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 4e908a89145f7a..d5cfe07f171324 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -394,7 +394,7 @@ function doWrite(stream, state, writev, len, chunk, encoding, cb) { state.writing = true; state.sync = true; if (state.destroyed) - state.onwrite(new errors.Error('ERR_STREAM_DESTROYED')); + state.onwrite(new errors.Error('ERR_STREAM_DESTROYED', 'write')); else if (writev) stream._writev(chunk, state.onwrite); else diff --git a/lib/internal/errors.js b/lib/internal/errors.js index 01b5fbbabbf7e3..597d54774b1160 100644 --- a/lib/internal/errors.js +++ b/lib/internal/errors.js @@ -843,7 +843,7 @@ E('ERR_SOCKET_DGRAM_NOT_RUNNING', 'Not running', Error); E('ERR_STDERR_CLOSE', 'process.stderr cannot be closed', Error); E('ERR_STDOUT_CLOSE', 'process.stdout cannot be closed', Error); E('ERR_STREAM_CANNOT_PIPE', 'Cannot pipe, not readable', Error); -E('ERR_STREAM_DESTROYED', 'stream.destroy() was called'); +E('ERR_STREAM_DESTROYED', 'Cannot call %s after a stream was destroyed'); E('ERR_STREAM_NULL_VALUES', 'May not write null values to stream', TypeError); E('ERR_STREAM_PUSH_AFTER_EOF', 'stream.push() after EOF', Error); E('ERR_STREAM_READ_NOT_IMPLEMENTED', '_read() is not implemented', Error); @@ -905,6 +905,7 @@ E('ERR_VM_MODULE_DIFFERENT_CONTEXT', E('ERR_VM_MODULE_LINKING_ERRORED', 'Linking has already failed for the provided module', Error); E('ERR_VM_MODULE_NOT_LINKED', +<<<<<<< HEAD 'Module must be linked before it can be instantiated', Error); E('ERR_VM_MODULE_NOT_MODULE', 'Provided module is not an instance of Module', Error); @@ -927,6 +928,33 @@ function sysError(code, syscall, path, dest, message += ` => ${dest}`; } return message; +======= + 'Module must be linked before it can be instantiated'); +E('ERR_VM_MODULE_NOT_MODULE', 'Provided module is not an instance of Module'); +E('ERR_VM_MODULE_STATUS', 'Module status %s'); +E('ERR_ZLIB_INITIALIZATION_FAILED', 'Initialization failed'); + +function sysError(defaultMessage) { + return function(code, + syscall, + path, + dest, + message = defaultMessage) { + if (code !== undefined) + message += `: ${code}`; + if (syscall !== undefined) { + if (code === undefined) + message += ':'; + message += ` [${syscall}]`; + } + if (path !== undefined) { + message += `: ${path}`; + if (dest !== undefined) + message += ` => ${dest}`; + } + return message; + }; +>>>>>>> comments } function invalidArgType(name, expected, actual) { diff --git a/lib/net.js b/lib/net.js index bd6d72aefe4a3b..f2cb423f3003ea 100644 --- a/lib/net.js +++ b/lib/net.js @@ -233,9 +233,9 @@ function Socket(options) { else if (options === undefined) options = {}; else - options = Object.assign({}, options); + options = util._extend({}, options); - // for backwards compat do not emit close on destroy. + // For backwards compat do not emit close on destroy. options.emitClose = false; stream.Duplex.call(this, options); diff --git a/lib/zlib.js b/lib/zlib.js index 55d93bb4d3f192..907fdd52d11b61 100644 --- a/lib/zlib.js +++ b/lib/zlib.js @@ -510,7 +510,7 @@ function processChunkSync(self, chunk, flushFlag) { function processChunk(self, chunk, flushFlag, cb) { var handle = self._handle; if (!handle) - return cb(new ERR_ZLIB_BINDING_CLOSED()); + assert(false, 'zlib binding closed'); handle.buffer = chunk; handle.cb = cb; diff --git a/test/parallel/test-net-socket-destroy-send.js b/test/parallel/test-net-socket-destroy-send.js index 881793e5e49a5e..aa587fc2e16896 100644 --- a/test/parallel/test-net-socket-destroy-send.js +++ b/test/parallel/test-net-socket-destroy-send.js @@ -14,13 +14,13 @@ server.listen(0, common.mustCall(function() { assert.strictEqual(conn, conn.destroy().destroy()); conn.on('error', common.expectsError({ code: 'ERR_STREAM_DESTROYED', - message: 'stream.destroy() was called', + message: 'Cannot call write after a stream was destroyed', type: Error })); conn.write(Buffer.from('kaboom'), common.expectsError({ code: 'ERR_STREAM_DESTROYED', - message: 'stream.destroy() was called', + message: 'Cannot call write after a stream was destroyed', type: Error })); server.close(); diff --git a/test/parallel/test-stream-readable-destroy.js b/test/parallel/test-stream-readable-destroy.js index 4e11e76ece01ac..09e4e57650f647 100644 --- a/test/parallel/test-stream-readable-destroy.js +++ b/test/parallel/test-stream-readable-destroy.js @@ -179,3 +179,16 @@ const { inherits } = require('util'); assert.strictEqual(expected, err); })); } + +{ + const read = new Readable(); + + read.on('error', common.expectsError({ + code: 'ERR_STREAM_DESTROYED', + message: 'Cannot call push after a stream was destroyed', + type: Error + })); + + read.destroy(); + read.push('hi'); +} diff --git a/test/parallel/test-zlib-write-after-close.js b/test/parallel/test-zlib-write-after-close.js index ac633f97a2620f..160971b16bc30c 100644 --- a/test/parallel/test-zlib-write-after-close.js +++ b/test/parallel/test-zlib-write-after-close.js @@ -31,7 +31,7 @@ zlib.gzip('hello', common.mustCall(function(err, out) { { code: 'ERR_STREAM_DESTROYED', type: Error, - message: 'stream.destroy() was called' + message: 'Cannot call write after a stream was destroyed' } ); })); From da1daf362b4e85d7a9222f87cd9c8a84010949e4 Mon Sep 17 00:00:00 2001 From: Mathias Buus Date: Wed, 14 Feb 2018 17:16:14 +0100 Subject: [PATCH 10/15] allow .read() to function after .destroy() --- lib/_stream_readable.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 3f80b256e09e86..000075ae3b0413 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -432,7 +432,7 @@ Readable.prototype.read = function(n) { // however, if we've ended, then there's no point, and if we're already // reading, then it's unnecessary. - if (state.ended || state.reading || state.destroyed) { + if (state.ended || state.reading) { doRead = false; debug('reading or ended', doRead); } else if (doRead) { From c9752ce07d7a5691c453363f36c1367b9d50fd83 Mon Sep 17 00:00:00 2001 From: Mathias Buus Date: Wed, 14 Feb 2018 17:17:09 +0100 Subject: [PATCH 11/15] fix docs typo --- doc/api/stream.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index 428f323fa2f6ce..a7dba149b371dd 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1171,7 +1171,7 @@ added: v8.0.0 Destroy the stream, and emit `'error'` and `close`. After this call, the readable stream will release any internal resources and subsequent calls -to `read` will result in an `ERR_STREAM_DESTROYED` error. +to `push` will result in an `ERR_STREAM_DESTROYED` error. Implementors should not override this method, but instead implement [`readable._destroy`][readable-_destroy]. From 599c9b7ea84dd1b6ca707c55e5f6fa243863a821 Mon Sep 17 00:00:00 2001 From: Mathias Buus Date: Mon, 5 Mar 2018 17:38:45 +0100 Subject: [PATCH 12/15] change .push to be a noop when destroyed --- lib/_stream_readable.js | 2 +- lib/internal/errors.js | 29 ------------------- test/parallel/test-stream-readable-destroy.js | 11 +++---- 3 files changed, 5 insertions(+), 37 deletions(-) diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 000075ae3b0413..5781dfd471e72d 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -239,7 +239,7 @@ function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) { } else if (state.ended) { stream.emit('error', new errors.Error('ERR_STREAM_PUSH_AFTER_EOF')); } else if (state.destroyed) { - stream.emit('error', new errors.Error('ERR_STREAM_DESTROYED', 'push')); + return false; } else { state.reading = false; if (state.decoder && !encoding) { diff --git a/lib/internal/errors.js b/lib/internal/errors.js index 597d54774b1160..11f32ccdc17dc9 100644 --- a/lib/internal/errors.js +++ b/lib/internal/errors.js @@ -905,12 +905,10 @@ E('ERR_VM_MODULE_DIFFERENT_CONTEXT', E('ERR_VM_MODULE_LINKING_ERRORED', 'Linking has already failed for the provided module', Error); E('ERR_VM_MODULE_NOT_LINKED', -<<<<<<< HEAD 'Module must be linked before it can be instantiated', Error); E('ERR_VM_MODULE_NOT_MODULE', 'Provided module is not an instance of Module', Error); E('ERR_VM_MODULE_STATUS', 'Module status %s', Error); -E('ERR_ZLIB_BINDING_CLOSED', 'zlib binding closed', Error); E('ERR_ZLIB_INITIALIZATION_FAILED', 'Initialization failed', Error); function sysError(code, syscall, path, dest, @@ -928,33 +926,6 @@ function sysError(code, syscall, path, dest, message += ` => ${dest}`; } return message; -======= - 'Module must be linked before it can be instantiated'); -E('ERR_VM_MODULE_NOT_MODULE', 'Provided module is not an instance of Module'); -E('ERR_VM_MODULE_STATUS', 'Module status %s'); -E('ERR_ZLIB_INITIALIZATION_FAILED', 'Initialization failed'); - -function sysError(defaultMessage) { - return function(code, - syscall, - path, - dest, - message = defaultMessage) { - if (code !== undefined) - message += `: ${code}`; - if (syscall !== undefined) { - if (code === undefined) - message += ':'; - message += ` [${syscall}]`; - } - if (path !== undefined) { - message += `: ${path}`; - if (dest !== undefined) - message += ` => ${dest}`; - } - return message; - }; ->>>>>>> comments } function invalidArgType(name, expected, actual) { diff --git a/test/parallel/test-stream-readable-destroy.js b/test/parallel/test-stream-readable-destroy.js index 09e4e57650f647..026aa8ca1603b8 100644 --- a/test/parallel/test-stream-readable-destroy.js +++ b/test/parallel/test-stream-readable-destroy.js @@ -181,14 +181,11 @@ const { inherits } = require('util'); } { - const read = new Readable(); - - read.on('error', common.expectsError({ - code: 'ERR_STREAM_DESTROYED', - message: 'Cannot call push after a stream was destroyed', - type: Error - })); + const read = new Readable({ + read() {} + }); read.destroy(); read.push('hi'); + read.on('data', common.mustNotCall()); } From 80add3045b246d11f1fc6c0c89159bbb704451bd Mon Sep 17 00:00:00 2001 From: Mathias Buus Date: Mon, 5 Mar 2018 18:09:45 +0100 Subject: [PATCH 13/15] update docs --- doc/api/stream.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index a7dba149b371dd..b65739bda7c0e9 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1171,7 +1171,7 @@ added: v8.0.0 Destroy the stream, and emit `'error'` and `close`. After this call, the readable stream will release any internal resources and subsequent calls -to `push` will result in an `ERR_STREAM_DESTROYED` error. +to `push` will be ignored. Implementors should not override this method, but instead implement [`readable._destroy`][readable-_destroy]. From 25a0b5a9889cdd192ad14b55ff0819649ab21743 Mon Sep 17 00:00:00 2001 From: Mathias Buus Date: Tue, 6 Mar 2018 11:50:41 +0100 Subject: [PATCH 14/15] @addaleax nit --- doc/api/stream.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index b65739bda7c0e9..32e368f05f1875 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1389,7 +1389,7 @@ constructor and implement the `writable._write()` method. The changes: - version: REPLACEME pr-url: https://github.com/nodejs/node/pull/18438 - description: add emitClose option to specify if close is emitted on destroy + description: Add `emitClose` option to specify if `close` is emitted on destroy --> * `options` {Object} From 1689baf94cb8022d8467a644343f3b2de8c7f0b9 Mon Sep 17 00:00:00 2001 From: Mathias Buus Date: Tue, 6 Mar 2018 12:14:17 +0100 Subject: [PATCH 15/15] rebase master --- lib/zlib.js | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/zlib.js b/lib/zlib.js index 907fdd52d11b61..4adfd1ffa289fb 100644 --- a/lib/zlib.js +++ b/lib/zlib.js @@ -25,7 +25,6 @@ const { ERR_BUFFER_TOO_LARGE, ERR_INVALID_ARG_TYPE, ERR_OUT_OF_RANGE, - ERR_ZLIB_BINDING_CLOSED, ERR_ZLIB_INITIALIZATION_FAILED } = require('internal/errors').codes; const Transform = require('_stream_transform');