Skip to content

Commit f12cf6d

Browse files
benjamingrdanielleadams
authored andcommitted
stream: add reduce
PR-URL: #41669 Reviewed-By: Matteo Collina <[email protected]>
1 parent 4b63439 commit f12cf6d

File tree

5 files changed

+246
-16
lines changed

5 files changed

+246
-16
lines changed

doc/api/stream.md

+44
Original file line numberDiff line numberDiff line change
@@ -2089,6 +2089,49 @@ const pairs = await Readable.from(['a', 'b', 'c']).asIndexedPairs().toArray();
20892089
console.log(pairs); // [[0, 'a'], [1, 'b'], [2, 'c']]
20902090
```
20912091

2092+
### `readable.reduce(fn[, initial[, options]])`
2093+
2094+
<!-- YAML
2095+
added: REPLACEME
2096+
-->
2097+
2098+
> Stability: 1 - Experimental
2099+
2100+
* `fn` {Function|AsyncFunction} a reducer function to call over every chunk
2101+
in the stream.
2102+
* `previous` {any} the value obtained from the last call to `fn` or the
2103+
`initial` value if specified or the first chunk of the stream otherwise.
2104+
* `data` {any} a chunk of data from the stream.
2105+
* `options` {Object}
2106+
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
2107+
abort the `fn` call early.
2108+
* `initial` {any} the initial value to use in the reduction.
2109+
* `options` {Object}
2110+
* `signal` {AbortSignal} allows destroying the stream if the signal is
2111+
aborted.
2112+
* Returns: {Promise} a promise for the final value of the reduction.
2113+
2114+
This method calls `fn` on each chunk of the stream in order, passing it the
2115+
result from the calculation on the previous element. It returns a promise for
2116+
the final value of the reduction.
2117+
2118+
The reducer function iterates the stream element-by-element which means that
2119+
there is no `concurrency` parameter or parallism. To perform a `reduce`
2120+
concurrently, it can be chained to the [`readable.map`][] method.
2121+
2122+
If no `initial` value is supplied the first chunk of the stream is used as the
2123+
initial value. If the stream is empty, the promise is rejected with a
2124+
`TypeError` with the `ERR_INVALID_ARGS` code property.
2125+
2126+
```mjs
2127+
import { Readable } from 'stream';
2128+
2129+
const ten = await Readable.from([1, 2, 3, 4]).reduce((previous, data) => {
2130+
return previous + data;
2131+
});
2132+
console.log(ten); // 10
2133+
```
2134+
20922135
### Duplex and transform streams
20932136

20942137
#### Class: `stream.Duplex`
@@ -4137,6 +4180,7 @@ contain multi-byte characters.
41374180
[`process.stdin`]: process.md#processstdin
41384181
[`process.stdout`]: process.md#processstdout
41394182
[`readable._read()`]: #readable_readsize
4183+
[`readable.map`]: #readablemapfn-options
41404184
[`readable.push('')`]: #readablepush
41414185
[`readable.setEncoding()`]: #readablesetencodingencoding
41424186
[`stream.Readable.from()`]: #streamreadablefromiterable-options

lib/internal/streams/end-of-stream.js

+15
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ const {
1717
validateObject,
1818
} = require('internal/validators');
1919

20+
const { Promise } = primordials;
21+
2022
function isRequest(stream) {
2123
return stream.setHeader && typeof stream.abort === 'function';
2224
}
@@ -232,4 +234,17 @@ function eos(stream, options, callback) {
232234
return cleanup;
233235
}
234236

237+
function finished(stream, opts) {
238+
return new Promise((resolve, reject) => {
239+
eos(stream, opts, (err) => {
240+
if (err) {
241+
reject(err);
242+
} else {
243+
resolve();
244+
}
245+
});
246+
});
247+
}
248+
235249
module.exports = eos;
250+
module.exports.finished = finished;

lib/internal/streams/operators.js

+56-3
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,14 @@ const { Buffer } = require('buffer');
66
const {
77
codes: {
88
ERR_INVALID_ARG_TYPE,
9+
ERR_MISSING_ARGS,
910
ERR_OUT_OF_RANGE,
1011
},
1112
AbortError,
1213
} = require('internal/errors');
1314
const { validateInteger } = require('internal/validators');
1415
const { kWeakHandler } = require('internal/event_target');
16+
const { finished } = require('internal/streams/end-of-stream');
1517

1618
const {
1719
ArrayPrototypePush,
@@ -199,8 +201,8 @@ async function every(fn, options) {
199201
'fn', ['Function', 'AsyncFunction'], fn);
200202
}
201203
// https://en.wikipedia.org/wiki/De_Morgan%27s_laws
202-
return !(await some.call(this, async (x) => {
203-
return !(await fn(x));
204+
return !(await some.call(this, async (...args) => {
205+
return !(await fn(...args));
204206
}, options));
205207
}
206208

@@ -231,11 +233,61 @@ function filter(fn, options) {
231233
return this.map(filterFn, options);
232234
}
233235

236+
// Specific to provide better error to reduce since the argument is only
237+
// missing if the stream has no items in it - but the code is still appropriate
238+
class ReduceAwareErrMissingArgs extends ERR_MISSING_ARGS {
239+
constructor() {
240+
super('reduce');
241+
this.message = 'Reduce of an empty stream requires an initial value';
242+
}
243+
}
244+
245+
async function reduce(reducer, initialValue, options) {
246+
if (typeof reducer !== 'function') {
247+
throw new ERR_INVALID_ARG_TYPE(
248+
'reducer', ['Function', 'AsyncFunction'], reducer);
249+
}
250+
let hasInitialValue = arguments.length > 1;
251+
if (options?.signal?.aborted) {
252+
const err = new AbortError(undefined, { cause: options.signal.reason });
253+
this.once('error', () => {}); // The error is already propagated
254+
await finished(this.destroy(err));
255+
throw err;
256+
}
257+
const ac = new AbortController();
258+
const signal = ac.signal;
259+
if (options?.signal) {
260+
const opts = { once: true, [kWeakHandler]: this };
261+
options.signal.addEventListener('abort', () => ac.abort(), opts);
262+
}
263+
let gotAnyItemFromStream = false;
264+
try {
265+
for await (const value of this) {
266+
gotAnyItemFromStream = true;
267+
if (options?.signal?.aborted) {
268+
throw new AbortError();
269+
}
270+
if (!hasInitialValue) {
271+
initialValue = value;
272+
hasInitialValue = true;
273+
} else {
274+
initialValue = await reducer(initialValue, value, { signal });
275+
}
276+
}
277+
if (!gotAnyItemFromStream && !hasInitialValue) {
278+
throw new ReduceAwareErrMissingArgs();
279+
}
280+
} finally {
281+
ac.abort();
282+
}
283+
return initialValue;
284+
}
285+
234286
async function toArray(options) {
235287
const result = [];
236288
for await (const val of this) {
237289
if (options?.signal?.aborted) {
238-
throw new AbortError({ cause: options.signal.reason });
290+
throw new AbortError(undefined, { cause: options.signal.reason });
239291
}
240292
ArrayPrototypePush(result, val);
241293
}
@@ -316,6 +368,7 @@ module.exports.streamReturningOperators = {
316368
module.exports.promiseReturningOperators = {
317369
every,
318370
forEach,
371+
reduce,
319372
toArray,
320373
some,
321374
};

lib/stream/promises.js

+1-13
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ const {
1111
} = require('internal/streams/utils');
1212

1313
const { pipelineImpl: pl } = require('internal/streams/pipeline');
14-
const eos = require('internal/streams/end-of-stream');
14+
const { finished } = require('internal/streams/end-of-stream');
1515

1616
function pipeline(...streams) {
1717
return new Promise((resolve, reject) => {
@@ -35,18 +35,6 @@ function pipeline(...streams) {
3535
});
3636
}
3737

38-
function finished(stream, opts) {
39-
return new Promise((resolve, reject) => {
40-
eos(stream, opts, (err) => {
41-
if (err) {
42-
reject(err);
43-
} else {
44-
resolve();
45-
}
46-
});
47-
});
48-
}
49-
5038
module.exports = {
5139
finished,
5240
pipeline,

test/parallel/test-stream-reduce.js

+130
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const {
5+
Readable,
6+
} = require('stream');
7+
const assert = require('assert');
8+
9+
function sum(p, c) {
10+
return p + c;
11+
}
12+
13+
{
14+
// Does the same thing as `(await stream.toArray()).reduce(...)`
15+
(async () => {
16+
const tests = [
17+
[[], sum, 0],
18+
[[1], sum, 0],
19+
[[1, 2, 3, 4, 5], sum, 0],
20+
[[...Array(100).keys()], sum, 0],
21+
[['a', 'b', 'c'], sum, ''],
22+
[[1, 2], sum],
23+
[[1, 2, 3], (x, y) => y],
24+
];
25+
for (const [values, fn, initial] of tests) {
26+
const streamReduce = await Readable.from(values)
27+
.reduce(fn, initial);
28+
const arrayReduce = values.reduce(fn, initial);
29+
assert.deepStrictEqual(streamReduce, arrayReduce);
30+
}
31+
// Does the same thing as `(await stream.toArray()).reduce(...)` with an
32+
// asynchronous reducer
33+
for (const [values, fn, initial] of tests) {
34+
const streamReduce = await Readable.from(values)
35+
.map(async (x) => x)
36+
.reduce(fn, initial);
37+
const arrayReduce = values.reduce(fn, initial);
38+
assert.deepStrictEqual(streamReduce, arrayReduce);
39+
}
40+
})().then(common.mustCall());
41+
}
42+
{
43+
// Works with an async reducer, with or without initial value
44+
(async () => {
45+
const six = await Readable.from([1, 2, 3]).reduce(async (p, c) => p + c, 0);
46+
assert.strictEqual(six, 6);
47+
})().then(common.mustCall());
48+
(async () => {
49+
const six = await Readable.from([1, 2, 3]).reduce(async (p, c) => p + c);
50+
assert.strictEqual(six, 6);
51+
})().then(common.mustCall());
52+
}
53+
{
54+
// Works lazily
55+
assert.rejects(Readable.from([1, 2, 3, 4, 5, 6])
56+
.map(common.mustCall((x) => {
57+
return x;
58+
}, 3)) // Two consumed and one buffered by `map` due to default concurrency
59+
.reduce(async (p, c) => {
60+
if (p === 1) {
61+
throw new Error('boom');
62+
}
63+
return c;
64+
}, 0)
65+
, /boom/).then(common.mustCall());
66+
}
67+
68+
{
69+
// Support for AbortSignal
70+
const ac = new AbortController();
71+
assert.rejects(async () => {
72+
await Readable.from([1, 2, 3]).reduce(async (p, c) => {
73+
if (c === 3) {
74+
await new Promise(() => {}); // Explicitly do not pass signal here
75+
}
76+
return Promise.resolve();
77+
}, 0, { signal: ac.signal });
78+
}, {
79+
name: 'AbortError',
80+
}).then(common.mustCall());
81+
ac.abort();
82+
}
83+
84+
85+
{
86+
// Support for AbortSignal - pre aborted
87+
const stream = Readable.from([1, 2, 3]);
88+
assert.rejects(async () => {
89+
await stream.reduce(async (p, c) => {
90+
if (c === 3) {
91+
await new Promise(() => {}); // Explicitly do not pass signal here
92+
}
93+
return Promise.resolve();
94+
}, 0, { signal: AbortSignal.abort() });
95+
}, {
96+
name: 'AbortError',
97+
}).then(common.mustCall(() => {
98+
assert.strictEqual(stream.destroyed, true);
99+
}));
100+
}
101+
102+
{
103+
// Support for AbortSignal - deep
104+
const stream = Readable.from([1, 2, 3]);
105+
assert.rejects(async () => {
106+
await stream.reduce(async (p, c, { signal }) => {
107+
signal.addEventListener('abort', common.mustCall(), { once: true });
108+
if (c === 3) {
109+
await new Promise(() => {}); // Explicitly do not pass signal here
110+
}
111+
return Promise.resolve();
112+
}, 0, { signal: AbortSignal.abort() });
113+
}, {
114+
name: 'AbortError',
115+
}).then(common.mustCall(() => {
116+
assert.strictEqual(stream.destroyed, true);
117+
}));
118+
}
119+
120+
{
121+
// Error cases
122+
assert.rejects(() => Readable.from([]).reduce(1), /TypeError/);
123+
assert.rejects(() => Readable.from([]).reduce('5'), /TypeError/);
124+
}
125+
126+
{
127+
// Test result is a Promise
128+
const result = Readable.from([1, 2, 3, 4, 5]).reduce(sum, 0);
129+
assert.ok(result instanceof Promise);
130+
}

0 commit comments

Comments
 (0)