From c38d883336c3e91912bb98f5afb27f8d164df744 Mon Sep 17 00:00:00 2001 From: Kosta Date: Mon, 22 Jun 2020 18:09:32 +0200 Subject: [PATCH] fix drain() function for "Piping to writable streams from async iterators" The sample code for ["Piping to writable streams from async iterators"](https://nodejs.org/docs/latest-v14.x/api/stream.html#stream_piping_to_writable_streams_from_async_iterators) is containing an event listener leak, since it registers 2 event listeners, but don't remove them again. The `drain()` function has been changed to remove all the listeners that have been registered within that function in any case again. --- doc/api/stream.md | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index b195458cec9b4f..25d8880430c176 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -2890,20 +2890,32 @@ In the scenario of writing to a writable stream from an async iterator, ensure the correct handling of backpressure and errors. ```js -const { once } = require('events'); const finished = util.promisify(stream.finished); const writable = fs.createWriteStream('./file'); function drain(writable) { + const resolve = () => Promise.resolve(); + const rejectError = (err) => Promise.reject(err); + const rejectPrematureClose = () => Promise.reject(new Error('premature close of writable stream')); + if (writable.destroyed) { - return Promise.reject(new Error('premature close')); + return rejectPrematureClose(); + } + + try { + // register event listeners and wait for whatever event gets emitted first + await Promise.race([ + writable.once('drain', resolve), + writable.once('error', rejectError), + writable.once('close', rejectPrematureClose), + ]); + } finally { + // ensure that all the registered event listeners get removed again + writable.removeListener('drain', resolve); + writable.removeListener('error', rejectError); + writable.removeListener('close', rejectPrematureClose); } - return Promise.race([ - once(writable, 'drain'), - once(writable, 'close') - .then(() => Promise.reject(new Error('premature close'))) - ]); } async function pump(iterable, writable) {