Skip to content

Commit 7663895

Browse files
committed
stream: add stream.compose
Refs: nodejs#32020 PR-URL: nodejs#39029 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Luigi Pinca <[email protected]> Reviewed-By: Michaël Zasso <[email protected]> Backport-PR-URL: nodejs#39563
1 parent a6d50a1 commit 7663895

File tree

6 files changed

+727
-3
lines changed

6 files changed

+727
-3
lines changed

doc/api/stream.md

+89
Original file line numberDiff line numberDiff line change
@@ -1918,6 +1918,95 @@ run().catch(console.error);
19181918
after the `callback` has been invoked. In the case of reuse of streams after
19191919
failure, this can cause event listener leaks and swallowed errors.
19201920

1921+
### `stream.compose(...streams)`
1922+
<!-- YAML
1923+
added: REPLACEME
1924+
-->
1925+
1926+
> Stability: 1 - `stream.compose` is experimental.
1927+
1928+
* `streams` {Stream[]|Iterable[]|AsyncIterable[]|Function[]}
1929+
* Returns: {stream.Duplex}
1930+
1931+
Combines two or more streams into a `Duplex` stream that writes to the
1932+
first stream and reads from the last. Each provided stream is piped into
1933+
the next, using `stream.pipeline`. If any of the streams error then all
1934+
are destroyed, including the outer `Duplex` stream.
1935+
1936+
Because `stream.compose` returns a new stream that in turn can (and
1937+
should) be piped into other streams, it enables composition. In contrast,
1938+
when passing streams to `stream.pipeline`, typically the first stream is
1939+
a readable stream and the last a writable stream, forming a closed
1940+
circuit.
1941+
1942+
If passed a `Function` it must be a factory method taking a `source`
1943+
`Iterable`.
1944+
1945+
```mjs
1946+
import { compose, Transform } from 'stream';
1947+
1948+
const removeSpaces = new Transform({
1949+
transform(chunk, encoding, callback) {
1950+
callback(null, String(chunk).replace(' ', ''));
1951+
}
1952+
});
1953+
1954+
async function* toUpper(source) {
1955+
for await (const chunk of source) {
1956+
yield String(chunk).toUpperCase();
1957+
}
1958+
}
1959+
1960+
let res = '';
1961+
for await (const buf of compose(removeSpaces, toUpper).end('hello world')) {
1962+
res += buf;
1963+
}
1964+
1965+
console.log(res); // prints 'HELLOWORLD'
1966+
```
1967+
1968+
`stream.compose` can be used to convert async iterables, generators and
1969+
functions into streams.
1970+
1971+
* `AsyncIterable` converts into a readable `Duplex`. Cannot yield
1972+
`null`.
1973+
* `AsyncGeneratorFunction` converts into a readable/writable transform `Duplex`.
1974+
Must take a source `AsyncIterable` as first parameter. Cannot yield
1975+
`null`.
1976+
* `AsyncFunction` converts into a writable `Duplex`. Must return
1977+
either `null` or `undefined`.
1978+
1979+
```mjs
1980+
import { compose } from 'stream';
1981+
import { finished } from 'stream/promises';
1982+
1983+
// Convert AsyncIterable into readable Duplex.
1984+
const s1 = compose(async function*() {
1985+
yield 'Hello';
1986+
yield 'World';
1987+
}());
1988+
1989+
// Convert AsyncGenerator into transform Duplex.
1990+
const s2 = compose(async function*(source) {
1991+
for await (const chunk of source) {
1992+
yield String(chunk).toUpperCase();
1993+
}
1994+
});
1995+
1996+
let res = '';
1997+
1998+
// Convert AsyncFunction into writable Duplex.
1999+
const s3 = compose(async function(source) {
2000+
for await (const chunk of source) {
2001+
res += chunk;
2002+
}
2003+
});
2004+
2005+
await finished(compose(s1, s2, s3));
2006+
2007+
console.log(res); // prints 'HELLOWORLD'
2008+
```
2009+
19212010
### `stream.Readable.from(iterable, [options])`
19222011
<!-- YAML
19232012
added:

lib/internal/streams/compose.js

+208
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
'use strict';
2+
3+
const pipeline = require('internal/streams/pipeline');
4+
const Duplex = require('internal/streams/duplex');
5+
const { destroyer } = require('internal/streams/destroy');
6+
const {
7+
isNodeStream,
8+
} = require('internal/streams/utils');
9+
const {
10+
AbortError,
11+
codes: {
12+
ERR_INVALID_ARG_VALUE,
13+
ERR_MISSING_ARGS,
14+
},
15+
} = require('internal/errors');
16+
17+
function isReadable(stream) {
18+
const r = isReadableNodeStream(stream);
19+
if (r === null || typeof stream.readable !== 'boolean') return null;
20+
if (isDestroyed(stream)) return false;
21+
return r && stream.readable && !isReadableFinished(stream);
22+
}
23+
24+
function isWritable(stream) {
25+
const r = isWritableNodeStream(stream);
26+
if (r === null || typeof stream.writable !== 'boolean') return null;
27+
if (isDestroyed(stream)) return false;
28+
return r && stream.writable && !isWritableEnded(stream);
29+
}
30+
31+
// This is needed for pre node 17.
32+
class ComposeDuplex extends Duplex {
33+
constructor(options) {
34+
super(options);
35+
36+
// https://github.com/nodejs/node/pull/34385
37+
38+
if (options?.readable === false) {
39+
this._readableState.readable = false;
40+
this._readableState.ended = true;
41+
this._readableState.endEmitted = true;
42+
}
43+
44+
if (options?.writable === false) {
45+
this._writableState.writable = false;
46+
this._writableState.ending = true;
47+
this._writableState.ended = true;
48+
this._writableState.finished = true;
49+
}
50+
}
51+
}
52+
53+
module.exports = function compose(...streams) {
54+
if (streams.length === 0) {
55+
throw new ERR_MISSING_ARGS('streams');
56+
}
57+
58+
if (streams.length === 1) {
59+
return Duplex.from(streams[0]);
60+
}
61+
62+
const orgStreams = [...streams];
63+
64+
if (typeof streams[0] === 'function') {
65+
streams[0] = Duplex.from(streams[0]);
66+
}
67+
68+
if (typeof streams[streams.length - 1] === 'function') {
69+
const idx = streams.length - 1;
70+
streams[idx] = Duplex.from(streams[idx]);
71+
}
72+
73+
for (let n = 0; n < streams.length; ++n) {
74+
if (!isNodeStream(streams[n])) {
75+
// TODO(ronag): Add checks for non streams.
76+
continue;
77+
}
78+
if (n < streams.length - 1 && !isReadable(streams[n])) {
79+
throw new ERR_INVALID_ARG_VALUE(
80+
`streams[${n}]`,
81+
orgStreams[n],
82+
'must be readable'
83+
);
84+
}
85+
if (n > 0 && !isWritable(streams[n])) {
86+
throw new ERR_INVALID_ARG_VALUE(
87+
`streams[${n}]`,
88+
orgStreams[n],
89+
'must be writable'
90+
);
91+
}
92+
}
93+
94+
let ondrain;
95+
let onfinish;
96+
let onreadable;
97+
let onclose;
98+
let d;
99+
100+
function onfinished(err) {
101+
const cb = onclose;
102+
onclose = null;
103+
104+
if (cb) {
105+
cb(err);
106+
} else if (err) {
107+
d.destroy(err);
108+
} else if (!readable && !writable) {
109+
d.destroy();
110+
}
111+
}
112+
113+
const head = streams[0];
114+
const tail = pipeline(streams, onfinished);
115+
116+
const writable = !!isWritable(head);
117+
const readable = !!isReadable(tail);
118+
119+
// TODO(ronag): Avoid double buffering.
120+
// Implement Writable/Readable/Duplex traits.
121+
// See, https://github.com/nodejs/node/pull/33515.
122+
d = new ComposeDuplex({
123+
// TODO (ronag): highWaterMark?
124+
writableObjectMode: !!head?.writableObjectMode,
125+
readableObjectMode: !!tail?.writableObjectMode,
126+
writable,
127+
readable,
128+
});
129+
130+
if (writable) {
131+
d._write = function(chunk, encoding, callback) {
132+
if (head.write(chunk, encoding)) {
133+
callback();
134+
} else {
135+
ondrain = callback;
136+
}
137+
};
138+
139+
d._final = function(callback) {
140+
head.end();
141+
onfinish = callback;
142+
};
143+
144+
head.on('drain', function() {
145+
if (ondrain) {
146+
const cb = ondrain;
147+
ondrain = null;
148+
cb();
149+
}
150+
});
151+
152+
tail.on('finish', function() {
153+
if (onfinish) {
154+
const cb = onfinish;
155+
onfinish = null;
156+
cb();
157+
}
158+
});
159+
}
160+
161+
if (readable) {
162+
tail.on('readable', function() {
163+
if (onreadable) {
164+
const cb = onreadable;
165+
onreadable = null;
166+
cb();
167+
}
168+
});
169+
170+
tail.on('end', function() {
171+
d.push(null);
172+
});
173+
174+
d._read = function() {
175+
while (true) {
176+
const buf = tail.read();
177+
178+
if (buf === null) {
179+
onreadable = d._read;
180+
return;
181+
}
182+
183+
if (!d.push(buf)) {
184+
return;
185+
}
186+
}
187+
};
188+
}
189+
190+
d._destroy = function(err, callback) {
191+
if (!err && onclose !== null) {
192+
err = new AbortError();
193+
}
194+
195+
onreadable = null;
196+
ondrain = null;
197+
onfinish = null;
198+
199+
if (onclose === null) {
200+
callback(err);
201+
} else {
202+
onclose = callback;
203+
destroyer(tail, err);
204+
}
205+
};
206+
207+
return d;
208+
};

lib/internal/streams/pipeline.js

-3
Original file line numberDiff line numberDiff line change
@@ -291,9 +291,6 @@ function pipeline(...streams) {
291291
}
292292
}
293293

294-
// TODO(ronag): Consider returning a Duplex proxy if the first argument
295-
// is a writable. Would improve composability.
296-
// See, https://github.com/nodejs/node/issues/32020
297294
return ret;
298295
}
299296

lib/stream.js

+4
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ const {
3030
} = require('internal/util');
3131

3232
const pipeline = require('internal/streams/pipeline');
33+
const compose = require('internal/streams/compose');
34+
const { destroyer } = require('internal/streams/destroy');
3335
const eos = require('internal/streams/end-of-stream');
3436
const internalBuffer = require('internal/buffer');
3537

@@ -46,6 +48,8 @@ Stream.pipeline = pipeline;
4648
const { addAbortSignal } = require('internal/streams/add-abort-signal');
4749
Stream.addAbortSignal = addAbortSignal;
4850
Stream.finished = eos;
51+
Stream.destroy = destroyer;
52+
Stream.compose = compose;
4953

5054
ObjectDefineProperty(Stream, 'promises', {
5155
configurable: true,

test/parallel/test-bootstrap-modules.js

+1
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ const expectedModules = new Set([
101101
'NativeModule internal/stream_base_commons',
102102
'NativeModule internal/streams/add-abort-signal',
103103
'NativeModule internal/streams/buffer_list',
104+
'NativeModule internal/streams/compose',
104105
'NativeModule internal/streams/destroy',
105106
'NativeModule internal/streams/duplex',
106107
'NativeModule internal/streams/end-of-stream',

0 commit comments

Comments
 (0)