Skip to content

Commit 8af1387

Browse files
committed
stream: support passing generator functions into pipeline()
Backport-PR-URL: nodejs#31975 PR-URL: nodejs#31223 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Benjamin Gruenbaum <[email protected]> Reviewed-By: Rich Trott <[email protected]> Reviewed-By: James M Snell <[email protected]> Reviewed-By: Anna Henningsen <[email protected]>
1 parent de0c6a0 commit 8af1387

File tree

4 files changed

+633
-31
lines changed

4 files changed

+633
-31
lines changed

doc/api/stream.md

+43-9
Original file line numberDiff line numberDiff line change
@@ -1555,17 +1555,30 @@ const cleanup = finished(rs, (err) => {
15551555
});
15561556
```
15571557

1558-
### `stream.pipeline(...streams, callback)`
1558+
### `stream.pipeline(source, ...transforms, destination, callback)`
15591559
<!-- YAML
15601560
added: v10.0.0
1561-
-->
1562-
1563-
* `...streams` {Stream} Two or more streams to pipe between.
1561+
changes:
1562+
- version: REPLACEME
1563+
pr-url: https://github.com/nodejs/node/pull/31223
1564+
description: Add support for async generators.
1565+
-->
1566+
1567+
* `source` {Stream|Iterable|AsyncIterable|Function}
1568+
* Returns: {Iterable|AsyncIterable}
1569+
* `...transforms` {Stream|Function}
1570+
* `source` {AsyncIterable}
1571+
* Returns: {AsyncIterable}
1572+
* `destination` {Stream|Function}
1573+
* `source` {AsyncIterable}
1574+
* Returns: {AsyncIterable|Promise}
15641575
* `callback` {Function} Called when the pipeline is fully done.
15651576
* `err` {Error}
1577+
* `val` Resolved value of `Promise` returned by `destination`.
1578+
* Returns: {Stream}
15661579

1567-
A module method to pipe between streams forwarding errors and properly cleaning
1568-
up and provide a callback when the pipeline is complete.
1580+
A module method to pipe between streams and generators forwarding errors and
1581+
properly cleaning up and provide a callback when the pipeline is complete.
15691582

15701583
```js
15711584
const { pipeline } = require('stream');
@@ -1608,6 +1621,28 @@ async function run() {
16081621
run().catch(console.error);
16091622
```
16101623

1624+
The `pipeline` API also supports async generators:
1625+
1626+
```js
1627+
const pipeline = util.promisify(stream.pipeline);
1628+
const fs = require('fs').promises;
1629+
1630+
async function run() {
1631+
await pipeline(
1632+
fs.createReadStream('lowercase.txt'),
1633+
async function* (source) {
1634+
for await (const chunk of source) {
1635+
yield String(chunk).toUpperCase();
1636+
}
1637+
},
1638+
fs.createWriteStream('uppercase.txt')
1639+
);
1640+
console.log('Pipeline succeeded.');
1641+
}
1642+
1643+
run().catch(console.error);
1644+
```
1645+
16111646
`stream.pipeline()` will call `stream.destroy(err)` on all streams except:
16121647
* `Readable` streams which have emitted `'end'` or `'close'`.
16131648
* `Writable` streams which have emitted `'finish'` or `'close'`.
@@ -2707,8 +2742,7 @@ const pipeline = util.promisify(stream.pipeline);
27072742
const writable = fs.createWriteStream('./file');
27082743

27092744
(async function() {
2710-
const readable = Readable.from(iterable);
2711-
await pipeline(readable, writable);
2745+
await pipeline(iterable, writable);
27122746
})();
27132747
```
27142748

@@ -2843,7 +2877,7 @@ contain multi-byte characters.
28432877
[`stream.cork()`]: #stream_writable_cork
28442878
[`stream.finished()`]: #stream_stream_finished_stream_options_callback
28452879
[`stream.pipe()`]: #stream_readable_pipe_destination_options
2846-
[`stream.pipeline()`]: #stream_stream_pipeline_streams_callback
2880+
[`stream.pipeline()`]: #stream_stream_pipeline_source_transforms_destination_callback
28472881
[`stream.uncork()`]: #stream_writable_uncork
28482882
[`stream.unpipe()`]: #stream_readable_unpipe_destination
28492883
[`stream.wrap()`]: #stream_readable_wrap_stream

lib/internal/streams/pipeline.js

+191-22
Original file line numberDiff line numberDiff line change
@@ -5,21 +5,37 @@
55

66
const {
77
ArrayIsArray,
8+
SymbolAsyncIterator,
9+
SymbolIterator
810
} = primordials;
911

1012
let eos;
1113

1214
const { once } = require('internal/util');
1315
const {
16+
ERR_INVALID_ARG_TYPE,
17+
ERR_INVALID_RETURN_VALUE,
1418
ERR_INVALID_CALLBACK,
1519
ERR_MISSING_ARGS,
1620
ERR_STREAM_DESTROYED
1721
} = require('internal/errors').codes;
1822

23+
let EE;
24+
let PassThrough;
25+
let createReadableStreamAsyncIterator;
26+
1927
function isRequest(stream) {
2028
return stream && stream.setHeader && typeof stream.abort === 'function';
2129
}
2230

31+
function destroyStream(stream, err) {
32+
// request.destroy just do .end - .abort is what we want
33+
if (isRequest(stream)) return stream.abort();
34+
if (isRequest(stream.req)) return stream.req.abort();
35+
if (typeof stream.destroy === 'function') return stream.destroy(err);
36+
if (typeof stream.close === 'function') return stream.close();
37+
}
38+
2339
function destroyer(stream, reading, writing, callback) {
2440
callback = once(callback);
2541

@@ -41,19 +57,12 @@ function destroyer(stream, reading, writing, callback) {
4157
if (destroyed) return;
4258
destroyed = true;
4359

44-
// request.destroy just do .end - .abort is what we want
45-
if (isRequest(stream)) return stream.abort();
46-
if (isRequest(stream.req)) return stream.req.abort();
47-
if (typeof stream.destroy === 'function') return stream.destroy(err);
60+
destroyStream(stream, err);
4861

4962
callback(err || new ERR_STREAM_DESTROYED('pipe'));
5063
};
5164
}
5265

53-
function pipe(from, to) {
54-
return from.pipe(to);
55-
}
56-
5766
function popCallback(streams) {
5867
// Streams should never be an empty array. It should always contain at least
5968
// a single stream. Therefore optimize for the average case instead of
@@ -63,8 +72,89 @@ function popCallback(streams) {
6372
return streams.pop();
6473
}
6574

75+
function isPromise(obj) {
76+
return !!(obj && typeof obj.then === 'function');
77+
}
78+
79+
function isReadable(obj) {
80+
return !!(obj && typeof obj.pipe === 'function');
81+
}
82+
83+
function isWritable(obj) {
84+
return !!(obj && typeof obj.write === 'function');
85+
}
86+
87+
function isStream(obj) {
88+
return isReadable(obj) || isWritable(obj);
89+
}
90+
91+
function isIterable(obj, isAsync) {
92+
if (!obj) return false;
93+
if (isAsync === true) return typeof obj[SymbolAsyncIterator] === 'function';
94+
if (isAsync === false) return typeof obj[SymbolIterator] === 'function';
95+
return typeof obj[SymbolAsyncIterator] === 'function' ||
96+
typeof obj[SymbolIterator] === 'function';
97+
}
98+
99+
function makeAsyncIterable(val) {
100+
if (isIterable(val)) {
101+
return val;
102+
} else if (isReadable(val)) {
103+
// Legacy streams are not Iterable.
104+
return _fromReadable(val);
105+
} else {
106+
throw new ERR_INVALID_ARG_TYPE(
107+
'val', ['Readable', 'Iterable', 'AsyncIterable'], val);
108+
}
109+
}
110+
111+
async function* _fromReadable(val) {
112+
if (!createReadableStreamAsyncIterator) {
113+
createReadableStreamAsyncIterator =
114+
require('internal/streams/async_iterator');
115+
}
116+
117+
try {
118+
if (typeof val.read !== 'function') {
119+
// createReadableStreamAsyncIterator does not support
120+
// v1 streams. Convert it into a v2 stream.
121+
122+
if (!PassThrough) {
123+
PassThrough = require('_stream_passthrough');
124+
}
125+
126+
const pt = new PassThrough();
127+
val
128+
.on('error', (err) => pt.destroy(err))
129+
.pipe(pt);
130+
yield* createReadableStreamAsyncIterator(pt);
131+
} else {
132+
yield* createReadableStreamAsyncIterator(val);
133+
}
134+
} finally {
135+
destroyStream(val);
136+
}
137+
}
138+
139+
async function pump(iterable, writable, finish) {
140+
if (!EE) {
141+
EE = require('events');
142+
}
143+
try {
144+
for await (const chunk of iterable) {
145+
if (!writable.write(chunk)) {
146+
if (writable.destroyed) return;
147+
await EE.once(writable, 'drain');
148+
}
149+
}
150+
writable.end();
151+
} catch (err) {
152+
finish(err);
153+
}
154+
}
155+
66156
function pipeline(...streams) {
67-
const callback = popCallback(streams);
157+
const callback = once(popCallback(streams));
68158

69159
if (ArrayIsArray(streams[0])) streams = streams[0];
70160

@@ -73,25 +163,104 @@ function pipeline(...streams) {
73163
}
74164

75165
let error;
76-
const destroys = streams.map(function(stream, i) {
166+
const destroys = [];
167+
168+
function finish(err, val, final) {
169+
if (!error && err) {
170+
error = err;
171+
}
172+
173+
if (error || final) {
174+
for (const destroy of destroys) {
175+
destroy(error);
176+
}
177+
}
178+
179+
if (final) {
180+
callback(error, val);
181+
}
182+
}
183+
184+
function wrap(stream, reading, writing, final) {
185+
destroys.push(destroyer(stream, reading, writing, (err) => {
186+
finish(err, null, final);
187+
}));
188+
}
189+
190+
let ret;
191+
for (let i = 0; i < streams.length; i++) {
192+
const stream = streams[i];
77193
const reading = i < streams.length - 1;
78194
const writing = i > 0;
79-
return destroyer(stream, reading, writing, function(err) {
80-
if (!error) error = err;
81-
if (err) {
82-
for (const destroy of destroys) {
83-
destroy(err);
195+
196+
if (isStream(stream)) {
197+
wrap(stream, reading, writing, !reading);
198+
}
199+
200+
if (i === 0) {
201+
if (typeof stream === 'function') {
202+
ret = stream();
203+
if (!isIterable(ret)) {
204+
throw new ERR_INVALID_RETURN_VALUE(
205+
'Iterable, AsyncIterable or Stream', 'source', ret);
84206
}
207+
} else if (isIterable(stream) || isReadable(stream)) {
208+
ret = stream;
209+
} else {
210+
throw new ERR_INVALID_ARG_TYPE(
211+
'source', ['Stream', 'Iterable', 'AsyncIterable', 'Function'],
212+
stream);
85213
}
86-
if (reading) return;
87-
for (const destroy of destroys) {
88-
destroy();
214+
} else if (typeof stream === 'function') {
215+
ret = makeAsyncIterable(ret);
216+
ret = stream(ret);
217+
218+
if (reading) {
219+
if (!isIterable(ret, true)) {
220+
throw new ERR_INVALID_RETURN_VALUE(
221+
'AsyncIterable', `transform[${i - 1}]`, ret);
222+
}
223+
} else {
224+
if (!PassThrough) {
225+
PassThrough = require('_stream_passthrough');
226+
}
227+
228+
const pt = new PassThrough();
229+
if (isPromise(ret)) {
230+
ret
231+
.then((val) => {
232+
pt.end(val);
233+
finish(null, val, true);
234+
})
235+
.catch((err) => {
236+
finish(err, null, true);
237+
});
238+
} else if (isIterable(ret, true)) {
239+
pump(ret, pt, finish);
240+
} else {
241+
throw new ERR_INVALID_RETURN_VALUE(
242+
'AsyncIterable or Promise', 'destination', ret);
243+
}
244+
245+
ret = pt;
246+
wrap(ret, true, false, true);
89247
}
90-
callback(error);
91-
});
92-
});
248+
} else if (isStream(stream)) {
249+
if (isReadable(ret)) {
250+
ret.pipe(stream);
251+
} else {
252+
ret = makeAsyncIterable(ret);
253+
pump(ret, stream, finish);
254+
}
255+
ret = stream;
256+
} else {
257+
const name = reading ? `transform[${i - 1}]` : 'destination';
258+
throw new ERR_INVALID_ARG_TYPE(
259+
name, ['Stream', 'Function'], ret);
260+
}
261+
}
93262

94-
return streams.reduce(pipe);
263+
return ret;
95264
}
96265

97266
module.exports = pipeline;

0 commit comments

Comments
 (0)