From 5f751f8da4d2e0bfd22a2826f121e46b6cb8134f Mon Sep 17 00:00:00 2001 From: rickyes <ives199511@gmail.com> Date: Sun, 28 Jun 2020 16:29:01 +0800 Subject: [PATCH 1/3] stream: add promises version to utility functions --- doc/api/stream.md | 20 +++--- lib/stream.js | 27 +++++++ lib/stream/promises.js | 39 ++++++++++ node.gyp | 1 + test/parallel/test-stream-promises.js | 100 ++++++++++++++++++++++++++ 5 files changed, 179 insertions(+), 8 deletions(-) create mode 100644 lib/stream/promises.js create mode 100644 test/parallel/test-stream-promises.js diff --git a/doc/api/stream.md b/doc/api/stream.md index ed305ab8a853c3..a4411288ca6020 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -46,6 +46,12 @@ Additionally, this module includes the utility functions [`stream.pipeline()`][], [`stream.finished()`][] and [`stream.Readable.from()`][]. +### Streams Promises API + +The `stream/promises` API provides an alternative set of asynchronous utility +functions for streams that return `Promise` objects rather than using +callbacks. The API is accessible via `require('stream/promises')`. + ### Object mode All streams created by Node.js APIs operate exclusively on strings and `Buffer` @@ -1563,10 +1569,10 @@ Especially useful in error handling scenarios where a stream is destroyed prematurely (like an aborted HTTP request), and will not emit `'end'` or `'finish'`. -The `finished` API is promisify-able as well; +The `finished` API provides promise version: ```js -const finished = util.promisify(stream.finished); +const { finished } = require('stream/promises'); const rs = fs.createReadStream('archive.tar'); @@ -1648,10 +1654,10 @@ pipeline( ); ``` -The `pipeline` API is promisify-able as well: +The `pipeline` API provides promise version: ```js -const pipeline = util.promisify(stream.pipeline); +const { pipeline } = require('stream/promises'); async function run() { await pipeline( @@ -1668,7 +1674,7 @@ run().catch(console.error); The `pipeline` API also supports async generators: ```js -const pipeline = util.promisify(stream.pipeline); +const { pipeline } = require('stream/promises'); const fs = require('fs'); async function run() { @@ -2892,8 +2898,7 @@ the handling of backpressure and backpressure-related errors: ```js const { pipeline } = require('stream'); -const util = require('util'); -const fs = require('fs'); +const { pipeline: pipelinePromise } = require('stream/promises'); const writable = fs.createWriteStream('./file'); @@ -2907,7 +2912,6 @@ pipeline(iterator, writable, (err, value) => { }); // Promise Pattern -const pipelinePromise = util.promisify(pipeline); pipelinePromise(iterator, writable) .then((value) => { console.log(value, 'value returned'); diff --git a/lib/stream.js b/lib/stream.js index 725038ba9c0d1c..8587096bac91b8 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -21,10 +21,21 @@ 'use strict'; +const { + ObjectDefineProperty, +} = primordials; + +const { + promisify: { custom: customPromisify }, +} = require('internal/util'); + const pipeline = require('internal/streams/pipeline'); const eos = require('internal/streams/end-of-stream'); const internalBuffer = require('internal/buffer'); +// Lazy loaded +let promises = null; + // Note: export Stream before Readable/Writable/Duplex/... // to avoid a cross-reference(require) issues const Stream = module.exports = require('internal/streams/legacy'); @@ -38,6 +49,22 @@ Stream.PassThrough = require('_stream_passthrough'); Stream.pipeline = pipeline; Stream.finished = eos; +ObjectDefineProperty(pipeline, customPromisify, { + enumerable: true, + get() { + if (promises === null) promises = require('stream/promises'); + return promises.pipeline; + } +}); + +ObjectDefineProperty(eos, customPromisify, { + enumerable: true, + get() { + if (promises === null) promises = require('stream/promises'); + return promises.finished; + } +}); + // Backwards-compat with node 0.4.x Stream.Stream = Stream; diff --git a/lib/stream/promises.js b/lib/stream/promises.js new file mode 100644 index 00000000000000..986db2e1f8db8a --- /dev/null +++ b/lib/stream/promises.js @@ -0,0 +1,39 @@ +'use strict'; + +const { + Promise, +} = primordials; + +let pl; +let eos; + +function pipeline(...streams) { + if (!pl) pl = require('internal/streams/pipeline'); + return new Promise((resolve, reject) => { + pl(...streams, (err, value) => { + if (err) { + reject(err); + } else { + resolve(value); + } + }); + }); +} + +function finished(stream, opts) { + if (!eos) eos = require('internal/streams/end-of-stream'); + return new Promise((resolve, reject) => { + eos(stream, opts, (err) => { + if (err) { + reject(err); + } else { + resolve(); + } + }); + }); +} + +module.exports = { + finished, + pipeline, +}; diff --git a/node.gyp b/node.gyp index 6d9e9cb3fc009e..320caafc9a428e 100644 --- a/node.gyp +++ b/node.gyp @@ -77,6 +77,7 @@ 'lib/readline.js', 'lib/repl.js', 'lib/stream.js', + 'lib/stream/promises.js', 'lib/_stream_readable.js', 'lib/_stream_writable.js', 'lib/_stream_duplex.js', diff --git a/test/parallel/test-stream-promises.js b/test/parallel/test-stream-promises.js new file mode 100644 index 00000000000000..1793043bcaef3c --- /dev/null +++ b/test/parallel/test-stream-promises.js @@ -0,0 +1,100 @@ +'use strict'; + +const common = require('../common'); +const stream = require('stream'); +const { + Readable, + Writable, +} = stream; +const { + finished, + pipeline, +} = require('stream/promises'); +const fs = require('fs'); +const assert = require('assert'); +const { promisify } = require('util'); + +assert.strictEqual(pipeline, promisify(stream.pipeline)); +assert.strictEqual(finished, promisify(stream.finished)); + +// pipeline success +{ + let finished = false; + const processed = []; + const expected = [ + Buffer.from('a'), + Buffer.from('b'), + Buffer.from('c') + ]; + + const read = new Readable({ + read() { } + }); + + const write = new Writable({ + write(data, enc, cb) { + processed.push(data); + cb(); + } + }); + + write.on('finish', () => { + finished = true; + }); + + for (let i = 0; i < expected.length; i++) { + read.push(expected[i]); + } + read.push(null); + + pipeline(read, write).then(common.mustCall((value) => { + assert.ok(finished); + assert.deepStrictEqual(processed, expected); + })); +} + +// pipeline error +{ + const read = new Readable({ + read() { } + }); + + const write = new Writable({ + write(data, enc, cb) { + cb(); + } + }); + + read.push('data'); + setImmediate(() => read.destroy()); + + pipeline(read, write).catch(common.mustCall((err) => { + assert.ok(err, 'should have an error'); + })); +} + +// finished success +{ + async function run() { + const rs = fs.createReadStream(__filename); + + let ended = false; + rs.resume(); + rs.on('end', () => { + ended = true; + }); + await finished(rs); + assert(ended); + } + + run().then(common.mustCall()); +} + +// finished error +{ + const rs = fs.createReadStream('file-does-not-exist'); + + assert.rejects(finished(rs), { + code: 'ENOENT' + }).then(common.mustCall()); +} From 504c366fb8868bc542a8c120ab915643a4858837 Mon Sep 17 00:00:00 2001 From: rickyes <ives199511@gmail.com> Date: Sun, 28 Jun 2020 16:42:11 +0800 Subject: [PATCH 2/3] stream: adds support for stream.promises --- doc/api/stream.md | 3 ++- lib/stream.js | 9 +++++++++ test/parallel/test-stream-promises.js | 3 +++ 3 files changed, 14 insertions(+), 1 deletion(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index a4411288ca6020..dfcfc1884a9539 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -50,7 +50,8 @@ Additionally, this module includes the utility functions The `stream/promises` API provides an alternative set of asynchronous utility functions for streams that return `Promise` objects rather than using -callbacks. The API is accessible via `require('stream/promises')`. +callbacks. The API is accessible via `require('stream/promises')` +or `require('stream').promises`. ### Object mode diff --git a/lib/stream.js b/lib/stream.js index 8587096bac91b8..ed6cc19753c806 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -49,6 +49,15 @@ Stream.PassThrough = require('_stream_passthrough'); Stream.pipeline = pipeline; Stream.finished = eos; +ObjectDefineProperty(Stream, 'promises', { + configurable: true, + enumerable: true, + get() { + if (promises === null) promises = require('stream/promises'); + return promises; + } +}); + ObjectDefineProperty(pipeline, customPromisify, { enumerable: true, get() { diff --git a/test/parallel/test-stream-promises.js b/test/parallel/test-stream-promises.js index 1793043bcaef3c..86c44b279fa4a1 100644 --- a/test/parallel/test-stream-promises.js +++ b/test/parallel/test-stream-promises.js @@ -5,6 +5,7 @@ const stream = require('stream'); const { Readable, Writable, + promises, } = stream; const { finished, @@ -14,6 +15,8 @@ const fs = require('fs'); const assert = require('assert'); const { promisify } = require('util'); +assert.strictEqual(promises.pipeline, pipeline); +assert.strictEqual(promises.finished, finished); assert.strictEqual(pipeline, promisify(stream.pipeline)); assert.strictEqual(finished, promisify(stream.finished)); From e364f641a961b393b8332e80e789e38b391da654 Mon Sep 17 00:00:00 2001 From: rickyes <ives199511@gmail.com> Date: Wed, 8 Jul 2020 14:53:29 +0800 Subject: [PATCH 3/3] fixup --- doc/api/stream.md | 1 + 1 file changed, 1 insertion(+) diff --git a/doc/api/stream.md b/doc/api/stream.md index dfcfc1884a9539..2d45578269e72d 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -2898,6 +2898,7 @@ handling of backpressure and errors. [`stream.pipeline()`][] abstracts away the handling of backpressure and backpressure-related errors: ```js +const fs = require('fs'); const { pipeline } = require('stream'); const { pipeline: pipelinePromise } = require('stream/promises');