Skip to content

Commit cb44781

Browse files
ronagtargos
authored andcommitted
stream: add stream.compose
Refs: #32020 PR-URL: #39029 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Luigi Pinca <[email protected]> Reviewed-By: Michaël Zasso <[email protected]> Backport-PR-URL: #39563
1 parent 9e782eb commit cb44781

File tree

10 files changed

+829
-44
lines changed

10 files changed

+829
-44
lines changed

doc/api/stream.md

+89
Original file line numberDiff line numberDiff line change
@@ -1933,6 +1933,95 @@ run().catch(console.error);
19331933
after the `callback` has been invoked. In the case of reuse of streams after
19341934
failure, this can cause event listener leaks and swallowed errors.
19351935

1936+
### `stream.compose(...streams)`
1937+
<!-- YAML
1938+
added: REPLACEME
1939+
-->
1940+
1941+
> Stability: 1 - `stream.compose` is experimental.
1942+
1943+
* `streams` {Stream[]|Iterable[]|AsyncIterable[]|Function[]}
1944+
* Returns: {stream.Duplex}
1945+
1946+
Combines two or more streams into a `Duplex` stream that writes to the
1947+
first stream and reads from the last. Each provided stream is piped into
1948+
the next, using `stream.pipeline`. If any of the streams error then all
1949+
are destroyed, including the outer `Duplex` stream.
1950+
1951+
Because `stream.compose` returns a new stream that in turn can (and
1952+
should) be piped into other streams, it enables composition. In contrast,
1953+
when passing streams to `stream.pipeline`, typically the first stream is
1954+
a readable stream and the last a writable stream, forming a closed
1955+
circuit.
1956+
1957+
If passed a `Function` it must be a factory method taking a `source`
1958+
`Iterable`.
1959+
1960+
```mjs
1961+
import { compose, Transform } from 'stream';
1962+
1963+
const removeSpaces = new Transform({
1964+
transform(chunk, encoding, callback) {
1965+
callback(null, String(chunk).replace(' ', ''));
1966+
}
1967+
});
1968+
1969+
async function* toUpper(source) {
1970+
for await (const chunk of source) {
1971+
yield String(chunk).toUpperCase();
1972+
}
1973+
}
1974+
1975+
let res = '';
1976+
for await (const buf of compose(removeSpaces, toUpper).end('hello world')) {
1977+
res += buf;
1978+
}
1979+
1980+
console.log(res); // prints 'HELLOWORLD'
1981+
```
1982+
1983+
`stream.compose` can be used to convert async iterables, generators and
1984+
functions into streams.
1985+
1986+
* `AsyncIterable` converts into a readable `Duplex`. Cannot yield
1987+
`null`.
1988+
* `AsyncGeneratorFunction` converts into a readable/writable transform `Duplex`.
1989+
Must take a source `AsyncIterable` as first parameter. Cannot yield
1990+
`null`.
1991+
* `AsyncFunction` converts into a writable `Duplex`. Must return
1992+
either `null` or `undefined`.
1993+
1994+
```mjs
1995+
import { compose } from 'stream';
1996+
import { finished } from 'stream/promises';
1997+
1998+
// Convert AsyncIterable into readable Duplex.
1999+
const s1 = compose(async function*() {
2000+
yield 'Hello';
2001+
yield 'World';
2002+
}());
2003+
2004+
// Convert AsyncGenerator into transform Duplex.
2005+
const s2 = compose(async function*(source) {
2006+
for await (const chunk of source) {
2007+
yield String(chunk).toUpperCase();
2008+
}
2009+
});
2010+
2011+
let res = '';
2012+
2013+
// Convert AsyncFunction into writable Duplex.
2014+
const s3 = compose(async function(source) {
2015+
for await (const chunk of source) {
2016+
res += chunk;
2017+
}
2018+
});
2019+
2020+
await finished(compose(s1, s2, s3));
2021+
2022+
console.log(res); // prints 'HELLOWORLD'
2023+
```
2024+
19362025
### `stream.Readable.from(iterable, [options])`
19372026
<!-- YAML
19382027
added:

lib/internal/streams/compose.js

+196
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
'use strict';
2+
3+
const pipeline = require('internal/streams/pipeline');
4+
const Duplex = require('internal/streams/duplex');
5+
const { destroyer } = require('internal/streams/destroy');
6+
const {
7+
isNodeStream,
8+
isReadable,
9+
isWritable,
10+
} = require('internal/streams/utils');
11+
const {
12+
AbortError,
13+
codes: {
14+
ERR_INVALID_ARG_VALUE,
15+
ERR_MISSING_ARGS,
16+
},
17+
} = require('internal/errors');
18+
19+
// This is needed for pre node 17.
20+
class ComposeDuplex extends Duplex {
21+
constructor(options) {
22+
super(options);
23+
24+
// https://github.com/nodejs/node/pull/34385
25+
26+
if (options?.readable === false) {
27+
this._readableState.readable = false;
28+
this._readableState.ended = true;
29+
this._readableState.endEmitted = true;
30+
}
31+
32+
if (options?.writable === false) {
33+
this._writableState.writable = false;
34+
this._writableState.ending = true;
35+
this._writableState.ended = true;
36+
this._writableState.finished = true;
37+
}
38+
}
39+
}
40+
41+
module.exports = function compose(...streams) {
42+
if (streams.length === 0) {
43+
throw new ERR_MISSING_ARGS('streams');
44+
}
45+
46+
if (streams.length === 1) {
47+
return Duplex.from(streams[0]);
48+
}
49+
50+
const orgStreams = [...streams];
51+
52+
if (typeof streams[0] === 'function') {
53+
streams[0] = Duplex.from(streams[0]);
54+
}
55+
56+
if (typeof streams[streams.length - 1] === 'function') {
57+
const idx = streams.length - 1;
58+
streams[idx] = Duplex.from(streams[idx]);
59+
}
60+
61+
for (let n = 0; n < streams.length; ++n) {
62+
if (!isNodeStream(streams[n])) {
63+
// TODO(ronag): Add checks for non streams.
64+
continue;
65+
}
66+
if (n < streams.length - 1 && !isReadable(streams[n])) {
67+
throw new ERR_INVALID_ARG_VALUE(
68+
`streams[${n}]`,
69+
orgStreams[n],
70+
'must be readable'
71+
);
72+
}
73+
if (n > 0 && !isWritable(streams[n])) {
74+
throw new ERR_INVALID_ARG_VALUE(
75+
`streams[${n}]`,
76+
orgStreams[n],
77+
'must be writable'
78+
);
79+
}
80+
}
81+
82+
let ondrain;
83+
let onfinish;
84+
let onreadable;
85+
let onclose;
86+
let d;
87+
88+
function onfinished(err) {
89+
const cb = onclose;
90+
onclose = null;
91+
92+
if (cb) {
93+
cb(err);
94+
} else if (err) {
95+
d.destroy(err);
96+
} else if (!readable && !writable) {
97+
d.destroy();
98+
}
99+
}
100+
101+
const head = streams[0];
102+
const tail = pipeline(streams, onfinished);
103+
104+
const writable = !!isWritable(head);
105+
const readable = !!isReadable(tail);
106+
107+
// TODO(ronag): Avoid double buffering.
108+
// Implement Writable/Readable/Duplex traits.
109+
// See, https://github.com/nodejs/node/pull/33515.
110+
d = new ComposeDuplex({
111+
// TODO (ronag): highWaterMark?
112+
writableObjectMode: !!head?.writableObjectMode,
113+
readableObjectMode: !!tail?.writableObjectMode,
114+
writable,
115+
readable,
116+
});
117+
118+
if (writable) {
119+
d._write = function(chunk, encoding, callback) {
120+
if (head.write(chunk, encoding)) {
121+
callback();
122+
} else {
123+
ondrain = callback;
124+
}
125+
};
126+
127+
d._final = function(callback) {
128+
head.end();
129+
onfinish = callback;
130+
};
131+
132+
head.on('drain', function() {
133+
if (ondrain) {
134+
const cb = ondrain;
135+
ondrain = null;
136+
cb();
137+
}
138+
});
139+
140+
tail.on('finish', function() {
141+
if (onfinish) {
142+
const cb = onfinish;
143+
onfinish = null;
144+
cb();
145+
}
146+
});
147+
}
148+
149+
if (readable) {
150+
tail.on('readable', function() {
151+
if (onreadable) {
152+
const cb = onreadable;
153+
onreadable = null;
154+
cb();
155+
}
156+
});
157+
158+
tail.on('end', function() {
159+
d.push(null);
160+
});
161+
162+
d._read = function() {
163+
while (true) {
164+
const buf = tail.read();
165+
166+
if (buf === null) {
167+
onreadable = d._read;
168+
return;
169+
}
170+
171+
if (!d.push(buf)) {
172+
return;
173+
}
174+
}
175+
};
176+
}
177+
178+
d._destroy = function(err, callback) {
179+
if (!err && onclose !== null) {
180+
err = new AbortError();
181+
}
182+
183+
onreadable = null;
184+
ondrain = null;
185+
onfinish = null;
186+
187+
if (onclose === null) {
188+
callback(err);
189+
} else {
190+
onclose = callback;
191+
destroyer(tail, err);
192+
}
193+
};
194+
195+
return d;
196+
};

lib/internal/streams/duplexify.js

+2-17
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,11 @@
33
const {
44
isIterable,
55
isNodeStream,
6-
isDestroyed,
6+
isReadable,
77
isReadableNodeStream,
8+
isWritable,
89
isWritableNodeStream,
910
isDuplexNodeStream,
10-
isReadableFinished,
11-
isWritableEnded,
1211
} = require('internal/streams/utils');
1312
const eos = require('internal/streams/end-of-stream');
1413
const {
@@ -32,20 +31,6 @@ const {
3231
FunctionPrototypeCall
3332
} = primordials;
3433

35-
function isReadable(stream) {
36-
const r = isReadableNodeStream(stream);
37-
if (r === null || typeof stream?.readable !== 'boolean') return null;
38-
if (isDestroyed(stream)) return false;
39-
return r && stream.readable && !isReadableFinished(stream);
40-
}
41-
42-
function isWritable(stream) {
43-
const r = isWritableNodeStream(stream);
44-
if (r === null || typeof stream?.writable !== 'boolean') return null;
45-
if (isDestroyed(stream)) return false;
46-
return r && stream.writable && !isWritableEnded(stream);
47-
}
48-
4934
// This is needed for pre node 17.
5035
class Duplexify extends Duplex {
5136
constructor(options) {

lib/internal/streams/end-of-stream.js

+2
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,8 @@ function eos(stream, options, callback) {
176176
(rState && rState.errorEmitted) ||
177177
(rState && stream.req && stream.aborted) ||
178178
(
179+
(!wState || !willEmitClose || typeof wState.closed !== 'boolean') &&
180+
(!rState || !willEmitClose || typeof rState.closed !== 'boolean') &&
179181
(!writable || (wState && wState.finished)) &&
180182
(!readable || (rState && rState.endEmitted))
181183
)

lib/internal/streams/pipeline.js

+7-10
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ const { validateCallback } = require('internal/validators');
2727

2828
const {
2929
isIterable,
30-
isReadable,
31-
isStream,
30+
isReadableNodeStream,
31+
isNodeStream,
3232
} = require('internal/streams/utils');
3333

3434
let PassThrough;
@@ -87,7 +87,7 @@ function popCallback(streams) {
8787
function makeAsyncIterable(val) {
8888
if (isIterable(val)) {
8989
return val;
90-
} else if (isReadable(val)) {
90+
} else if (isReadableNodeStream(val)) {
9191
// Legacy streams are not Iterable.
9292
return fromReadable(val);
9393
}
@@ -204,7 +204,7 @@ function pipeline(...streams) {
204204
const reading = i < streams.length - 1;
205205
const writing = i > 0;
206206

207-
if (isStream(stream)) {
207+
if (isNodeStream(stream)) {
208208
finishCount++;
209209
destroys.push(destroyer(stream, reading, writing, finish));
210210
}
@@ -216,7 +216,7 @@ function pipeline(...streams) {
216216
throw new ERR_INVALID_RETURN_VALUE(
217217
'Iterable, AsyncIterable or Stream', 'source', ret);
218218
}
219-
} else if (isIterable(stream) || isReadable(stream)) {
219+
} else if (isIterable(stream) || isReadableNodeStream(stream)) {
220220
ret = stream;
221221
} else {
222222
ret = Duplex.from(stream);
@@ -269,8 +269,8 @@ function pipeline(...streams) {
269269
finishCount++;
270270
destroys.push(destroyer(ret, false, true, finish));
271271
}
272-
} else if (isStream(stream)) {
273-
if (isReadable(ret)) {
272+
} else if (isNodeStream(stream)) {
273+
if (isReadableNodeStream(ret)) {
274274
ret.pipe(stream);
275275

276276
// Compat. Before node v10.12.0 stdio used to throw an error so
@@ -291,9 +291,6 @@ function pipeline(...streams) {
291291
}
292292
}
293293

294-
// TODO(ronag): Consider returning a Duplex proxy if the first argument
295-
// is a writable. Would improve composability.
296-
// See, https://github.com/nodejs/node/issues/32020
297294
return ret;
298295
}
299296

0 commit comments

Comments
 (0)