Skip to content

Commit ccb1c1e

Browse files
rluvatonRafaelGSS
authored andcommitted
stream: add compose operator
PR-URL: #44937 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Benjamin Gruenbaum <[email protected]> Reviewed-By: Robert Nagy <[email protected]>
1 parent d7470ad commit ccb1c1e

File tree

4 files changed

+210
-15
lines changed

4 files changed

+210
-15
lines changed

doc/api/stream.md

+39
Original file line numberDiff line numberDiff line change
@@ -1681,6 +1681,41 @@ option. In the code example above, data will be in a single chunk if the file
16811681
has less then 64 KiB of data because no `highWaterMark` option is provided to
16821682
[`fs.createReadStream()`][].
16831683

1684+
##### `readable.compose(stream[, options])`
1685+
1686+
<!-- YAML
1687+
added: REPLACEME
1688+
-->
1689+
1690+
> Stability: 1 - Experimental
1691+
1692+
* `stream` {Stream|Iterable|AsyncIterable|Function}
1693+
* `options` {Object}
1694+
* `signal` {AbortSignal} allows destroying the stream if the signal is
1695+
aborted.
1696+
* Returns: {Duplex} a stream composed with the stream `stream`.
1697+
1698+
```mjs
1699+
import { Readable } from 'node:stream';
1700+
1701+
async function* splitToWords(source) {
1702+
for await (const chunk of source) {
1703+
const words = String(chunk).split(' ');
1704+
1705+
for (const word of words) {
1706+
yield word;
1707+
}
1708+
}
1709+
}
1710+
1711+
const wordsStream = Readable.from(['this is', 'compose as operator']).compose(splitToWords);
1712+
const words = await wordsStream.toArray();
1713+
1714+
console.log(words); // prints ['this', 'is', 'compose', 'as', 'operator']
1715+
```
1716+
1717+
See [`stream.compose`][] for more information.
1718+
16841719
##### `readable.iterator([options])`
16851720

16861721
<!-- YAML
@@ -2720,6 +2755,8 @@ await finished(compose(s1, s2, s3));
27202755
console.log(res); // prints 'HELLOWORLD'
27212756
```
27222757

2758+
See [`readable.compose(stream)`][] for `stream.compose` as operator.
2759+
27232760
### `stream.Readable.from(iterable[, options])`
27242761

27252762
<!-- YAML
@@ -4487,11 +4524,13 @@ contain multi-byte characters.
44874524
[`process.stdin`]: process.md#processstdin
44884525
[`process.stdout`]: process.md#processstdout
44894526
[`readable._read()`]: #readable_readsize
4527+
[`readable.compose(stream)`]: #readablecomposestream-options
44904528
[`readable.map`]: #readablemapfn-options
44914529
[`readable.push('')`]: #readablepush
44924530
[`readable.setEncoding()`]: #readablesetencodingencoding
44934531
[`stream.Readable.from()`]: #streamreadablefromiterable-options
44944532
[`stream.addAbortSignal()`]: #streamaddabortsignalsignal-stream
4533+
[`stream.compose`]: #streamcomposestreams
44954534
[`stream.cork()`]: #writablecork
44964535
[`stream.finished()`]: #streamfinishedstream-options-callback
44974536
[`stream.pipe()`]: #readablepipedestination-options

lib/internal/streams/operators.js

+32
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ const { AbortController } = require('internal/abort_controller');
44

55
const {
66
codes: {
7+
ERR_INVALID_ARG_VALUE,
78
ERR_INVALID_ARG_TYPE,
89
ERR_MISSING_ARGS,
910
ERR_OUT_OF_RANGE,
@@ -17,6 +18,11 @@ const {
1718
} = require('internal/validators');
1819
const { kWeakHandler } = require('internal/event_target');
1920
const { finished } = require('internal/streams/end-of-stream');
21+
const staticCompose = require('internal/streams/compose');
22+
const {
23+
addAbortSignalNoValidate,
24+
} = require('internal/streams/add-abort-signal');
25+
const { isWritable, isNodeStream } = require('internal/streams/utils');
2026

2127
const {
2228
ArrayPrototypePush,
@@ -32,6 +38,31 @@ const {
3238
const kEmpty = Symbol('kEmpty');
3339
const kEof = Symbol('kEof');
3440

41+
function compose(stream, options) {
42+
if (options != null) {
43+
validateObject(options, 'options');
44+
}
45+
if (options?.signal != null) {
46+
validateAbortSignal(options.signal, 'options.signal');
47+
}
48+
49+
if (isNodeStream(stream) && !isWritable(stream)) {
50+
throw new ERR_INVALID_ARG_VALUE('stream', stream, 'must be writable');
51+
}
52+
53+
const composedStream = staticCompose(this, stream);
54+
55+
if (options?.signal) {
56+
// Not validating as we already validated before
57+
addAbortSignalNoValidate(
58+
options.signal,
59+
composedStream
60+
);
61+
}
62+
63+
return composedStream;
64+
}
65+
3566
function map(fn, options) {
3667
if (typeof fn !== 'function') {
3768
throw new ERR_INVALID_ARG_TYPE(
@@ -392,6 +423,7 @@ module.exports.streamReturningOperators = {
392423
flatMap,
393424
map,
394425
take,
426+
compose,
395427
};
396428

397429
module.exports.promiseReturningOperators = {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const {
5+
Readable, Transform,
6+
} = require('stream');
7+
const assert = require('assert');
8+
9+
{
10+
// with async generator
11+
const stream = Readable.from(['a', 'b', 'c', 'd']).compose(async function *(stream) {
12+
let str = '';
13+
for await (const chunk of stream) {
14+
str += chunk;
15+
16+
if (str.length === 2) {
17+
yield str;
18+
str = '';
19+
}
20+
}
21+
});
22+
const result = ['ab', 'cd'];
23+
(async () => {
24+
for await (const item of stream) {
25+
assert.strictEqual(item, result.shift());
26+
}
27+
})().then(common.mustCall());
28+
}
29+
30+
{
31+
// With Transformer
32+
const stream = Readable.from(['a', 'b', 'c', 'd']).compose(new Transform({
33+
objectMode: true,
34+
transform: common.mustCall((chunk, encoding, callback) => {
35+
callback(null, chunk);
36+
}, 4)
37+
}));
38+
const result = ['a', 'b', 'c', 'd'];
39+
(async () => {
40+
for await (const item of stream) {
41+
assert.strictEqual(item, result.shift());
42+
}
43+
})().then(common.mustCall());
44+
}
45+
46+
{
47+
// Throwing an error during `compose` (before waiting for data)
48+
const stream = Readable.from([1, 2, 3, 4, 5]).compose(async function *(stream) { // eslint-disable-line require-yield
49+
50+
throw new Error('boom');
51+
});
52+
53+
assert.rejects(async () => {
54+
for await (const item of stream) {
55+
assert.fail('should not reach here, got ' + item);
56+
}
57+
}, /boom/).then(common.mustCall());
58+
}
59+
60+
{
61+
// Throwing an error during `compose` (when waiting for data)
62+
const stream = Readable.from([1, 2, 3, 4, 5]).compose(async function *(stream) {
63+
for await (const chunk of stream) {
64+
if (chunk === 3) {
65+
throw new Error('boom');
66+
}
67+
yield chunk;
68+
}
69+
});
70+
71+
assert.rejects(
72+
stream.toArray(),
73+
/boom/,
74+
).then(common.mustCall());
75+
}
76+
77+
{
78+
// Throwing an error during `compose` (after finishing all readable data)
79+
const stream = Readable.from([1, 2, 3, 4, 5]).compose(async function *(stream) { // eslint-disable-line require-yield
80+
81+
// eslint-disable-next-line no-unused-vars,no-empty
82+
for await (const chunk of stream) {
83+
}
84+
85+
throw new Error('boom');
86+
});
87+
assert.rejects(
88+
stream.toArray(),
89+
/boom/,
90+
).then(common.mustCall());
91+
}
92+
93+
{
94+
// AbortSignal
95+
const ac = new AbortController();
96+
const stream = Readable.from([1, 2, 3, 4, 5])
97+
.compose(async function *(source) {
98+
// Should not reach here
99+
for await (const chunk of source) {
100+
yield chunk;
101+
}
102+
}, { signal: ac.signal });
103+
104+
ac.abort();
105+
106+
assert.rejects(async () => {
107+
for await (const item of stream) {
108+
assert.fail('should not reach here, got ' + item);
109+
}
110+
}, {
111+
name: 'AbortError',
112+
}).then(common.mustCall());
113+
}
114+
115+
{
116+
assert.throws(
117+
() => Readable.from(['a']).compose(Readable.from(['b'])),
118+
{ code: 'ERR_INVALID_ARG_VALUE' }
119+
);
120+
}
121+
122+
{
123+
assert.throws(
124+
() => Readable.from(['a']).compose(),
125+
{ code: 'ERR_INVALID_ARG_TYPE' }
126+
);
127+
}

test/parallel/test-stream-compose.js

+12-15
Original file line numberDiff line numberDiff line change
@@ -358,27 +358,24 @@ const assert = require('assert');
358358
}
359359

360360
{
361-
try {
362-
compose();
363-
} catch (err) {
364-
assert.strictEqual(err.code, 'ERR_MISSING_ARGS');
365-
}
361+
assert.throws(
362+
() => compose(),
363+
{ code: 'ERR_MISSING_ARGS' }
364+
);
366365
}
367366

368367
{
369-
try {
370-
compose(new Writable(), new PassThrough());
371-
} catch (err) {
372-
assert.strictEqual(err.code, 'ERR_INVALID_ARG_VALUE');
373-
}
368+
assert.throws(
369+
() => compose(new Writable(), new PassThrough()),
370+
{ code: 'ERR_INVALID_ARG_VALUE' }
371+
);
374372
}
375373

376374
{
377-
try {
378-
compose(new PassThrough(), new Readable({ read() {} }), new PassThrough());
379-
} catch (err) {
380-
assert.strictEqual(err.code, 'ERR_INVALID_ARG_VALUE');
381-
}
375+
assert.throws(
376+
() => compose(new PassThrough(), new Readable({ read() {} }), new PassThrough()),
377+
{ code: 'ERR_INVALID_ARG_VALUE' }
378+
);
382379
}
383380

384381
{

0 commit comments

Comments
 (0)