Skip to content

Commit 24105a7

Browse files
WilliamConnatsercodebytere
authored andcommitted
doc: piping from async generators using pipeline()
PR-URL: #33992 Reviewed-By: Anna Henningsen <[email protected]> Reviewed-By: Luigi Pinca <[email protected]> Reviewed-By: Robert Nagy <[email protected]> Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Trivikram Kamat <[email protected]> Reviewed-By: James M Snell <[email protected]>
1 parent 70398db commit 24105a7

File tree

1 file changed

+20
-67
lines changed

1 file changed

+20
-67
lines changed

doc/api/stream.md

+20-67
Original file line numberDiff line numberDiff line change
@@ -2731,80 +2731,33 @@ readable.on('data', (chunk) => {
27312731

27322732
#### Piping to Writable Streams from Async Iterators
27332733

2734-
In the scenario of writing to a writable stream from an async iterator, ensure
2735-
the correct handling of backpressure and errors.
2734+
When writing to a writable stream from an async iterator, ensure correct
2735+
handling of backpressure and errors. [`stream.pipeline()`][] abstracts away
2736+
the handling of backpressure and backpressure-related errors:
27362737

27372738
```js
2738-
const { once } = require('events');
2739-
const finished = util.promisify(stream.finished);
2739+
const { pipeline } = require('stream');
2740+
const util = require('util');
2741+
const fs = require('fs');
27402742

27412743
const writable = fs.createWriteStream('./file');
27422744

2743-
function drain(writable) {
2744-
if (writable.destroyed) {
2745-
return Promise.reject(new Error('premature close'));
2746-
}
2747-
return Promise.race([
2748-
once(writable, 'drain'),
2749-
once(writable, 'close')
2750-
.then(() => Promise.reject(new Error('premature close')))
2751-
]);
2752-
}
2753-
2754-
async function pump(iterable, writable) {
2755-
for await (const chunk of iterable) {
2756-
// Handle backpressure on write().
2757-
if (!writable.write(chunk)) {
2758-
await drain(writable);
2759-
}
2745+
// Callback Pattern
2746+
pipeline(iterator, writable, (err, value) => {
2747+
if (err) {
2748+
console.error(err);
2749+
} else {
2750+
console.log(value, 'value returned');
27602751
}
2761-
writable.end();
2762-
}
2763-
2764-
(async function() {
2765-
// Ensure completion without errors.
2766-
await Promise.all([
2767-
pump(iterable, writable),
2768-
finished(writable)
2769-
]);
2770-
})();
2771-
```
2772-
2773-
In the above, errors on `write()` would be caught and thrown by the
2774-
`once()` listener for the `'drain'` event, since `once()` will also handle the
2775-
`'error'` event. To ensure completion of the write stream without errors,
2776-
it is safer to use the `finished()` method as above, instead of using the
2777-
`once()` listener for the `'finish'` event. Under certain cases, an `'error'`
2778-
event could be emitted by the writable stream after `'finish'` and as `once()`
2779-
will release the `'error'` handler on handling the `'finish'` event, it could
2780-
result in an unhandled error.
2781-
2782-
Alternatively, the readable stream could be wrapped with `Readable.from()` and
2783-
then piped via `.pipe()`:
2784-
2785-
```js
2786-
const finished = util.promisify(stream.finished);
2787-
2788-
const writable = fs.createWriteStream('./file');
2789-
2790-
(async function() {
2791-
const readable = Readable.from(iterable);
2792-
readable.pipe(writable);
2793-
// Ensure completion without errors.
2794-
await finished(writable);
2795-
})();
2796-
```
2797-
2798-
Or, using `stream.pipeline()` to pipe streams:
2799-
2800-
```js
2801-
const pipeline = util.promisify(stream.pipeline);
2802-
2803-
const writable = fs.createWriteStream('./file');
2752+
});
28042753

2805-
(async function() {
2806-
await pipeline(iterable, writable);
2807-
})();
2754+
// Promise Pattern
2755+
const pipelinePromise = util.promisify(pipeline);
2756+
pipelinePromise(iterator, writable)
2757+
.then((value) => {
2758+
console.log(value, 'value returned');
2759+
})
2760+
.catch(console.error);
28082761
```
28092762

28102763
<!--type=misc-->

0 commit comments

Comments
 (0)