From c5639771896196c70b3fc4ea4f205d430697cd1a Mon Sep 17 00:00:00 2001
From: Robert Nagy <ronagy@icloud.com>
Date: Sun, 28 Jun 2020 17:44:07 +0200
Subject: [PATCH 1/2] stream: fix writable.end callback behavior

Changes so that the end() callback behaves the same way in relation
to _final as write() does to _write/_writev.
---
 doc/api/stream.md                             |  8 ++-
 lib/_stream_writable.js                       | 53 +++++++++----------
 .../test-stream-transform-final-sync.js       |  4 +-
 test/parallel/test-stream-transform-final.js  |  4 +-
 test/parallel/test-stream-writable-destroy.js |  2 +-
 .../test-stream-writable-end-cb-error.js      |  4 +-
 .../test-stream-writable-end-cb-uncaught.js   |  2 +-
 test/parallel/test-stream-write-destroy.js    |  2 +-
 8 files changed, 37 insertions(+), 42 deletions(-)

diff --git a/doc/api/stream.md b/doc/api/stream.md
index 588e7bdfd3a7a2..0827143065ff44 100644
--- a/doc/api/stream.md
+++ b/doc/api/stream.md
@@ -428,15 +428,13 @@ changes:
   `Uint8Array`. For object mode streams, `chunk` may be any JavaScript value
   other than `null`.
 * `encoding` {string} The encoding if `chunk` is a string
-* `callback` {Function} Optional callback for when the stream finishes
-  or errors
+* `callback` {Function} Callback for when the stream is finished.
 * Returns: {this}
 
 Calling the `writable.end()` method signals that no more data will be written
 to the [`Writable`][]. The optional `chunk` and `encoding` arguments allow one
 final additional chunk of data to be written immediately before closing the
-stream. If provided, the optional `callback` function is attached as a listener
-for the [`'finish'`][] and the `'error'` event.
+stream.
 
 Calling the [`stream.write()`][stream-write] method after calling
 [`stream.end()`][stream-end] will raise an error.
@@ -592,7 +590,7 @@ changes:
   `Uint8Array`. For object mode streams, `chunk` may be any JavaScript value
   other than `null`.
 * `encoding` {string} The encoding, if `chunk` is a string. **Default:** `'utf8'`
-* `callback` {Function} Callback for when this chunk of data is flushed
+* `callback` {Function} Callback for when this chunk of data is flushed.
 * Returns: {boolean} `false` if the stream wishes for the calling code to
   wait for the `'drain'` event to be emitted before continuing to write
   additional data; otherwise `true`.
diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js
index 916e9b87d9c17a..65a7fe9f9f0984 100644
--- a/lib/_stream_writable.js
+++ b/lib/_stream_writable.js
@@ -64,6 +64,8 @@ ObjectSetPrototypeOf(Writable, Stream);
 
 function nop() {}
 
+const kOnFinished = Symbol('kOnFinished');
+
 function WritableState(options, stream, isDuplex) {
   // Duplex streams are both readable and writable, but share
   // the same options object.
@@ -185,6 +187,8 @@ function WritableState(options, stream, isDuplex) {
   // True if close has been emitted or would have been emitted
   // depending on emitClose.
   this.closeEmitted = false;
+
+  this[kOnFinished] = [];
 }
 
 function resetBuffer(state) {
@@ -411,7 +415,7 @@ function onwriteError(stream, state, er, cb) {
   // not enabled. Passing `er` here doesn't make sense since
   // it's related to one specific write, not to the buffered
   // writes.
-  errorBuffer(state, new ERR_STREAM_DESTROYED('write'));
+  errorBuffer(state);
   // This can emit error, but error must always follow cb.
   errorOrDestroy(stream, er);
 }
@@ -487,14 +491,14 @@ function afterWrite(stream, state, count, cb) {
   }
 
   if (state.destroyed) {
-    errorBuffer(state, new ERR_STREAM_DESTROYED('write'));
+    errorBuffer(state);
   }
 
   finishMaybe(stream, state);
 }
 
 // If there's something in the buffer waiting, then invoke callbacks.
-function errorBuffer(state, err) {
+function errorBuffer(state) {
   if (state.writing) {
     return;
   }
@@ -503,7 +507,11 @@ function errorBuffer(state, err) {
     const { chunk, callback } = state.buffered[n];
     const len = state.objectMode ? 1 : chunk.length;
     state.length -= len;
-    callback(err);
+    callback(new ERR_STREAM_DESTROYED('write'));
+  }
+
+  for (const callback of state[kOnFinished].splice(0)) {
+    callback(new ERR_STREAM_DESTROYED('end'));
   }
 
   resetBuffer(state);
@@ -611,10 +619,11 @@ Writable.prototype.end = function(chunk, encoding, cb) {
   }
 
   if (typeof cb === 'function') {
-    if (err || state.finished)
+    if (err || state.finished) {
       process.nextTick(cb, err);
-    else
-      onFinished(this, cb);
+    } else {
+      state[kOnFinished].push(cb);
+    }
   }
 
   return this;
@@ -636,6 +645,9 @@ function callFinal(stream, state) {
   stream._final((err) => {
     state.pendingcb--;
     if (err) {
+      for (const callback of state[kOnFinished].splice(0)) {
+        callback(err);
+      }
       errorOrDestroy(stream, err, state.sync);
     } else if (needFinish(state)) {
       state.prefinished = true;
@@ -683,6 +695,11 @@ function finish(stream, state) {
     return;
 
   state.finished = true;
+
+  for (const callback of state[kOnFinished].splice(0)) {
+    callback();
+  }
+
   stream.emit('finish');
 
   if (state.autoDestroy) {
@@ -701,26 +718,6 @@ function finish(stream, state) {
   }
 }
 
-// TODO(ronag): Avoid using events to implement internal logic.
-function onFinished(stream, cb) {
-  function onerror(err) {
-    stream.removeListener('finish', onfinish);
-    stream.removeListener('error', onerror);
-    cb(err);
-    if (stream.listenerCount('error') === 0) {
-      stream.emit('error', err);
-    }
-  }
-
-  function onfinish() {
-    stream.removeListener('finish', onfinish);
-    stream.removeListener('error', onerror);
-    cb();
-  }
-  stream.on('finish', onfinish);
-  stream.prependListener('error', onerror);
-}
-
 ObjectDefineProperties(Writable.prototype, {
 
   destroyed: {
@@ -800,7 +797,7 @@ const destroy = destroyImpl.destroy;
 Writable.prototype.destroy = function(err, cb) {
   const state = this._writableState;
   if (!state.destroyed) {
-    process.nextTick(errorBuffer, state, new ERR_STREAM_DESTROYED('write'));
+    process.nextTick(errorBuffer, state);
   }
   destroy.call(this, err, cb);
   return this;
diff --git a/test/parallel/test-stream-transform-final-sync.js b/test/parallel/test-stream-transform-final-sync.js
index 4faf1b067627ad..f8465c8929631c 100644
--- a/test/parallel/test-stream-transform-final-sync.js
+++ b/test/parallel/test-stream-transform-final-sync.js
@@ -90,7 +90,7 @@ const t = new stream.Transform({
 t.on('finish', common.mustCall(function() {
   state++;
   // finishListener
-  assert.strictEqual(state, 14);
+  assert.strictEqual(state, 15);
 }, 1));
 t.on('end', common.mustCall(function() {
   state++;
@@ -106,5 +106,5 @@ t.write(4);
 t.end(7, common.mustCall(function() {
   state++;
   // endMethodCallback
-  assert.strictEqual(state, 15);
+  assert.strictEqual(state, 14);
 }, 1));
diff --git a/test/parallel/test-stream-transform-final.js b/test/parallel/test-stream-transform-final.js
index 19af744a6bb33e..dd6cc3b427d6b7 100644
--- a/test/parallel/test-stream-transform-final.js
+++ b/test/parallel/test-stream-transform-final.js
@@ -92,7 +92,7 @@ const t = new stream.Transform({
 t.on('finish', common.mustCall(function() {
   state++;
   // finishListener
-  assert.strictEqual(state, 14);
+  assert.strictEqual(state, 15);
 }, 1));
 t.on('end', common.mustCall(function() {
   state++;
@@ -108,5 +108,5 @@ t.write(4);
 t.end(7, common.mustCall(function() {
   state++;
   // endMethodCallback
-  assert.strictEqual(state, 15);
+  assert.strictEqual(state, 14);
 }, 1));
diff --git a/test/parallel/test-stream-writable-destroy.js b/test/parallel/test-stream-writable-destroy.js
index 706847a8582d0c..80b51a50c52555 100644
--- a/test/parallel/test-stream-writable-destroy.js
+++ b/test/parallel/test-stream-writable-destroy.js
@@ -354,7 +354,7 @@ const assert = require('assert');
     assert.strictEqual(err.message, 'asd');
   }));
   write.end('asd', common.mustCall((err) => {
-    assert.strictEqual(err.message, 'asd');
+    assert.strictEqual(err.code, 'ERR_STREAM_DESTROYED');
   }));
   write.destroy(new Error('asd'));
 }
diff --git a/test/parallel/test-stream-writable-end-cb-error.js b/test/parallel/test-stream-writable-end-cb-error.js
index 24989a6d06a111..8f6d209954436f 100644
--- a/test/parallel/test-stream-writable-end-cb-error.js
+++ b/test/parallel/test-stream-writable-end-cb-error.js
@@ -17,10 +17,10 @@ const stream = require('stream');
   }));
   writable.write('asd');
   writable.end(common.mustCall((err) => {
-    assert.strictEqual(err.message, 'kaboom');
+    assert.strictEqual(err.code, 'ERR_STREAM_DESTROYED');
   }));
   writable.end(common.mustCall((err) => {
-    assert.strictEqual(err.message, 'kaboom');
+    assert.strictEqual(err.code, 'ERR_STREAM_DESTROYED');
   }));
 }
 
diff --git a/test/parallel/test-stream-writable-end-cb-uncaught.js b/test/parallel/test-stream-writable-end-cb-uncaught.js
index ab25cac81b0bee..5c1753aa067342 100644
--- a/test/parallel/test-stream-writable-end-cb-uncaught.js
+++ b/test/parallel/test-stream-writable-end-cb-uncaught.js
@@ -19,5 +19,5 @@ writable._final = (cb) => {
 
 writable.write('asd');
 writable.end(common.mustCall((err) => {
-  assert.strictEqual(err.message, 'kaboom');
+  assert.strictEqual(err.code, 'ERR_STREAM_DESTROYED');
 }));
diff --git a/test/parallel/test-stream-write-destroy.js b/test/parallel/test-stream-write-destroy.js
index 297217eb4accc6..1acf45a9ab2781 100644
--- a/test/parallel/test-stream-write-destroy.js
+++ b/test/parallel/test-stream-write-destroy.js
@@ -57,7 +57,7 @@ for (const withPendingData of [ false, true ]) {
     w.destroy();
     assert.strictEqual(chunksWritten, 1);
     callbacks.shift()();
-    assert.strictEqual(chunksWritten, 2);
+    assert.strictEqual(chunksWritten, useEnd && !withPendingData ? 1 : 2);
     assert.strictEqual(callbacks.length, 0);
     assert.strictEqual(drains, 1);
 

From 5d901cba6268a83ed41915dca6f45e127891ded6 Mon Sep 17 00:00:00 2001
From: Robert Nagy <ronagy@icloud.com>
Date: Mon, 29 Jun 2020 23:34:07 +0200
Subject: [PATCH 2/2] fixup: add changes

---
 doc/api/stream.md | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/doc/api/stream.md b/doc/api/stream.md
index 0827143065ff44..003389502aeac1 100644
--- a/doc/api/stream.md
+++ b/doc/api/stream.md
@@ -412,6 +412,9 @@ Is `true` after [`writable.destroy()`][writable-destroy] has been called.
 <!-- YAML
 added: v0.9.4
 changes:
+  - version: REPLACEME
+    pr-url: https://github.com/nodejs/node/pull/34101
+    description: The `callback` is invoked before 'finish' or on error.
   - version: v14.0.0
     pr-url: https://github.com/nodejs/node/pull/29747
     description: The `callback` is invoked if 'finish' or 'error' is emitted.