Skip to content

Commit b74b28f

Browse files
committed
stream: add stream.compose
Refs: nodejs#32020 PR-URL: nodejs#39029 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Luigi Pinca <[email protected]> Reviewed-By: Michaël Zasso <[email protected]> Backport-PR-URL: nodejs#39563
1 parent e53ec68 commit b74b28f

File tree

7 files changed

+851
-4
lines changed

7 files changed

+851
-4
lines changed

doc/api/stream.md

+42
Original file line numberDiff line numberDiff line change
@@ -1857,6 +1857,48 @@ run().catch(console.error);
18571857
after the `callback` has been invoked. In the case of reuse of streams after
18581858
failure, this can cause event listener leaks and swallowed errors.
18591859

1860+
### `stream.compose(...streams)`
1861+
<!-- YAML
1862+
added: REPLACEME
1863+
-->
1864+
1865+
* `streams` {Stream[]}
1866+
* Returns: {stream.Duplex}
1867+
1868+
Combines two or more streams into a `Duplex` stream that writes to the
1869+
first stream and reads from the last. Each provided stream is piped into
1870+
the next, using `stream.pipeline`. If any of the streams error then all
1871+
are destroyed, including the outer `Duplex` stream.
1872+
1873+
Because `stream.compose` returns a new stream that in turn can (and
1874+
should) be piped into other streams, it enables composition. In contrast,
1875+
when passing streams to `stream.pipeline`, typically the first stream is
1876+
a readable stream and the last a writable stream, forming a closed
1877+
circuit.
1878+
1879+
```mjs
1880+
import { compose, Transform } from 'stream';
1881+
1882+
const removeSpaces = new Transform({
1883+
transform(chunk, encoding, callback) {
1884+
callback(null, String(chunk).replace(' ', ''));
1885+
}
1886+
});
1887+
1888+
const toUpper = new Transform({
1889+
transform(chunk, encoding, callback) {
1890+
callback(null, String(chunk).toUpperCase());
1891+
}
1892+
});
1893+
1894+
let res = '';
1895+
for await (const buf of compose(removeSpaces, toUpper).end('hello world')) {
1896+
res += buf;
1897+
}
1898+
1899+
console.log(res); // prints 'HELLOWORLD'
1900+
```
1901+
18601902
### `stream.Readable.from(iterable, [options])`
18611903
<!-- YAML
18621904
added:

lib/internal/streams/compose.js

+300
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,300 @@
1+
'use strict';
2+
3+
const pipeline = require('internal/streams/pipeline');
4+
const Duplex = require('internal/streams/duplex');
5+
const { createDeferredPromise } = require('internal/util');
6+
const { destroyer } = require('internal/streams/destroy');
7+
const from = require('internal/streams/from');
8+
const {
9+
isIterable,
10+
isReadableNodeStream,
11+
isWritableNodeStream,
12+
isDestroyed,
13+
isReadableFinished,
14+
isWritableEnded,
15+
} = require('internal/streams/utils');
16+
const {
17+
PromiseResolve,
18+
} = primordials;
19+
const {
20+
AbortError,
21+
codes: {
22+
ERR_INVALID_ARG_TYPE,
23+
ERR_INVALID_ARG_VALUE,
24+
ERR_INVALID_RETURN_VALUE,
25+
ERR_MISSING_ARGS,
26+
},
27+
} = require('internal/errors');
28+
const assert = require('internal/assert');
29+
30+
function isReadable(stream) {
31+
const r = isReadableNodeStream(stream);
32+
if (r === null || typeof stream.readable !== 'boolean') return null;
33+
if (isDestroyed(stream)) return false;
34+
return r && stream.readable && !isReadableFinished(stream);
35+
}
36+
37+
function isWritable(stream) {
38+
const r = isWritableNodeStream(stream);
39+
if (r === null || typeof stream.writable !== 'boolean') return null;
40+
if (isDestroyed(stream)) return false;
41+
return r && stream.writable && !isWritableEnded(stream);
42+
}
43+
44+
// This is needed for pre node 17.
45+
class ComposeDuplex extends Duplex {
46+
constructor(options) {
47+
super(options);
48+
49+
// https://github.com/nodejs/node/pull/34385
50+
51+
if (options?.readable === false) {
52+
this._readableState.readable = false;
53+
this._readableState.ended = true;
54+
this._readableState.endEmitted = true;
55+
}
56+
57+
if (options?.writable === false) {
58+
this._writableState.writable = false;
59+
this._writableState.ending = true;
60+
this._writableState.ended = true;
61+
this._writableState.finished = true;
62+
}
63+
}
64+
}
65+
66+
module.exports = function compose(...streams) {
67+
if (streams.length === 0) {
68+
throw new ERR_MISSING_ARGS('streams');
69+
}
70+
71+
if (streams.length === 1) {
72+
return makeDuplex(streams[0], 'streams[0]');
73+
}
74+
75+
const orgStreams = [...streams];
76+
77+
if (typeof streams[0] === 'function') {
78+
streams[0] = makeDuplex(streams[0], 'streams[0]');
79+
}
80+
81+
if (typeof streams[streams.length - 1] === 'function') {
82+
const idx = streams.length - 1;
83+
streams[idx] = makeDuplex(streams[idx], `streams[${idx}]`);
84+
}
85+
86+
for (let n = 0; n < streams.length; ++n) {
87+
if (!isNodeStream(streams[n])) {
88+
// TODO(ronag): Add checks for non streams.
89+
continue;
90+
}
91+
if (n < streams.length - 1 && !isReadable(streams[n])) {
92+
throw new ERR_INVALID_ARG_VALUE(
93+
`streams[${n}]`,
94+
orgStreams[n],
95+
'must be readable'
96+
);
97+
}
98+
if (n > 0 && !isWritable(streams[n])) {
99+
throw new ERR_INVALID_ARG_VALUE(
100+
`streams[${n}]`,
101+
orgStreams[n],
102+
'must be writable'
103+
);
104+
}
105+
}
106+
107+
let ondrain;
108+
let onfinish;
109+
let onreadable;
110+
let onclose;
111+
let d;
112+
113+
function onfinished(err) {
114+
const cb = onclose;
115+
onclose = null;
116+
117+
if (cb) {
118+
cb(err);
119+
} else if (err) {
120+
d.destroy(err);
121+
} else if (!readable && !writable) {
122+
d.destroy();
123+
}
124+
}
125+
126+
const head = streams[0];
127+
const tail = pipeline(streams, onfinished);
128+
129+
const writable = !!isWritable(head);
130+
const readable = !!isReadable(tail);
131+
132+
// TODO(ronag): Avoid double buffering.
133+
// Implement Writable/Readable/Duplex traits.
134+
// See, https://github.com/nodejs/node/pull/33515.
135+
d = new ComposeDuplex({
136+
highWaterMark: 1,
137+
writableObjectMode: !!head?.writableObjectMode,
138+
readableObjectMode: !!tail?.writableObjectMode,
139+
writable,
140+
readable,
141+
});
142+
143+
if (writable) {
144+
d._write = function(chunk, encoding, callback) {
145+
if (head.write(chunk, encoding)) {
146+
callback();
147+
} else {
148+
ondrain = callback;
149+
}
150+
};
151+
152+
d._final = function(callback) {
153+
head.end();
154+
onfinish = callback;
155+
};
156+
157+
head.on('drain', function() {
158+
if (ondrain) {
159+
const cb = ondrain;
160+
ondrain = null;
161+
cb();
162+
}
163+
});
164+
165+
tail.on('finish', function() {
166+
if (onfinish) {
167+
const cb = onfinish;
168+
onfinish = null;
169+
cb();
170+
}
171+
});
172+
}
173+
174+
if (readable) {
175+
tail.on('readable', function() {
176+
if (onreadable) {
177+
const cb = onreadable;
178+
onreadable = null;
179+
cb();
180+
}
181+
});
182+
183+
tail.on('end', function() {
184+
d.push(null);
185+
});
186+
187+
d._read = function() {
188+
while (true) {
189+
const buf = tail.read();
190+
191+
if (buf === null) {
192+
onreadable = d._read;
193+
return;
194+
}
195+
196+
if (!d.push(buf)) {
197+
return;
198+
}
199+
}
200+
};
201+
}
202+
203+
d._destroy = function(err, callback) {
204+
if (!err && onclose !== null) {
205+
err = new AbortError();
206+
}
207+
208+
onreadable = null;
209+
ondrain = null;
210+
onfinish = null;
211+
212+
if (onclose === null) {
213+
callback(err);
214+
} else {
215+
onclose = callback;
216+
destroyer(tail, err);
217+
}
218+
};
219+
220+
return d;
221+
};
222+
223+
function makeDuplex(stream, name) {
224+
let ret;
225+
if (typeof stream === 'function') {
226+
assert(stream.length > 0);
227+
228+
const { value, write, final } = fromAsyncGen(stream);
229+
230+
if (isIterable(value)) {
231+
ret = from(ComposeDuplex, value, {
232+
objectMode: true,
233+
highWaterMark: 1,
234+
write,
235+
final
236+
});
237+
} else if (typeof value?.then === 'function') {
238+
const promise = PromiseResolve(value)
239+
.then((val) => {
240+
if (val != null) {
241+
throw new ERR_INVALID_RETURN_VALUE('nully', name, val);
242+
}
243+
})
244+
.catch((err) => {
245+
destroyer(ret, err);
246+
});
247+
248+
ret = new ComposeDuplex({
249+
objectMode: true,
250+
highWaterMark: 1,
251+
readable: false,
252+
write,
253+
final(cb) {
254+
final(() => promise.then(cb, cb));
255+
}
256+
});
257+
} else {
258+
throw new ERR_INVALID_RETURN_VALUE(
259+
'Iterable, AsyncIterable or AsyncFunction', name, value);
260+
}
261+
} else if (isNodeStream(stream)) {
262+
ret = stream;
263+
} else if (isIterable(stream)) {
264+
ret = from(ComposeDuplex, stream, {
265+
objectMode: true,
266+
highWaterMark: 1,
267+
writable: false
268+
});
269+
} else {
270+
throw new ERR_INVALID_ARG_TYPE(
271+
name,
272+
['Stream', 'Iterable', 'AsyncIterable', 'Function'],
273+
stream)
274+
;
275+
}
276+
return ret;
277+
}
278+
279+
function fromAsyncGen(fn) {
280+
let { promise, resolve } = createDeferredPromise();
281+
const value = fn(async function*() {
282+
while (true) {
283+
const { chunk, done, cb } = await promise;
284+
process.nextTick(cb);
285+
if (done) return;
286+
yield chunk;
287+
({ promise, resolve } = createDeferredPromise());
288+
}
289+
}());
290+
291+
return {
292+
value,
293+
write(chunk, encoding, cb) {
294+
resolve({ chunk, done: false, cb });
295+
},
296+
final(cb) {
297+
resolve({ done: true, cb });
298+
}
299+
};
300+
}

lib/internal/streams/pipeline.js

-3
Original file line numberDiff line numberDiff line change
@@ -295,9 +295,6 @@ function pipeline(...streams) {
295295
}
296296
}
297297

298-
// TODO(ronag): Consider returning a Duplex proxy if the first argument
299-
// is a writable. Would improve composability.
300-
// See, https://github.com/nodejs/node/issues/32020
301298
return ret;
302299
}
303300

0 commit comments

Comments
 (0)