Skip to content

Commit dcc5831

Browse files
benjamingrronag
authored andcommitted
stream: add forEach method
Add a `forEach` method to readable streams to enable concurrent iteration and align with the iterator-helpers proposal. Co-Authored-By: Robert Nagy <[email protected]> PR-URL: #41445 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Robert Nagy <[email protected]>
1 parent 647805a commit dcc5831

File tree

4 files changed

+182
-10
lines changed

4 files changed

+182
-10
lines changed

doc/api/stream.md

+61-2
Original file line numberDiff line numberDiff line change
@@ -1696,7 +1696,7 @@ added: v16.14.0
16961696
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
16971697
abort the `fn` call early.
16981698
* `options` {Object}
1699-
* `concurrency` {number} the maximal concurrent invocation of `fn` to call
1699+
* `concurrency` {number} the maximum concurrent invocation of `fn` to call
17001700
on the stream at once. **Default:** `1`.
17011701
* `signal` {AbortSignal} allows destroying the stream if the signal is
17021702
aborted.
@@ -1740,7 +1740,7 @@ added: v16.14.0
17401740
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
17411741
abort the `fn` call early.
17421742
* `options` {Object}
1743-
* `concurrency` {number} the maximal concurrent invocation of `fn` to call
1743+
* `concurrency` {number} the maximum concurrent invocation of `fn` to call
17441744
on the stream at once. **Default:** `1`.
17451745
* `signal` {AbortSignal} allows destroying the stream if the signal is
17461746
aborted.
@@ -1775,6 +1775,65 @@ for await (const result of dnsResults) {
17751775
}
17761776
```
17771777

1778+
### `readable.forEach(fn[, options])`
1779+
1780+
<!-- YAML
1781+
added: REPLACEME
1782+
-->
1783+
1784+
> Stability: 1 - Experimental
1785+
1786+
* `fn` {Function|AsyncFunction} a function to call on each item of the stream.
1787+
* `data` {any} a chunk of data from the stream.
1788+
* `options` {Object}
1789+
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
1790+
abort the `fn` call early.
1791+
* `options` {Object}
1792+
* `concurrency` {number} the maximum concurrent invocation of `fn` to call
1793+
on the stream at once. **Default:** `1`.
1794+
* `signal` {AbortSignal} allows destroying the stream if the signal is
1795+
aborted.
1796+
* Returns: {Promise} a promise for when the stream has finished.
1797+
1798+
This method allows iterating a stream. For each item in the stream the
1799+
`fn` function will be called. If the `fn` function returns a promise - that
1800+
promise will be `await`ed.
1801+
1802+
This method is different from `for await...of` loops in that it can optionally
1803+
process items concurrently. In addition, a `forEach` iteration can only be
1804+
stopped by having passed a `signal` option and aborting the related
1805+
`AbortController` while `for await...of` can be stopped with `break` or
1806+
`return`. In either case the stream will be destroyed.
1807+
1808+
This method is different from listening to the [`'data'`][] event in that it
1809+
uses the [`readable`][] event in the underlying machinary and can limit the
1810+
number of concurrent `fn` calls.
1811+
1812+
```mjs
1813+
import { Readable } from 'stream';
1814+
import { Resolver } from 'dns/promises';
1815+
1816+
// With a synchronous predicate.
1817+
for await (const item of Readable.from([1, 2, 3, 4]).filter((x) => x > 2)) {
1818+
console.log(item); // 3, 4
1819+
}
1820+
// With an asynchronous predicate, making at most 2 queries at a time.
1821+
const resolver = new Resolver();
1822+
const dnsResults = await Readable.from([
1823+
'nodejs.org',
1824+
'openjsf.org',
1825+
'www.linuxfoundation.org',
1826+
]).map(async (domain) => {
1827+
const { address } = await resolver.resolve4(domain, { ttl: true });
1828+
return address;
1829+
}, { concurrency: 2 });
1830+
await dnsResults.forEach((result) => {
1831+
// Logs result, similar to `for await (const result of dnsResults)`
1832+
console.log(result);
1833+
});
1834+
console.log('done'); // Stream has finished
1835+
```
1836+
17781837
### Duplex and transform streams
17791838

17801839
#### Class: `stream.Duplex`

lib/internal/streams/operators.js

+23-5
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ const kEof = Symbol('kEof');
2323
async function * map(fn, options) {
2424
if (typeof fn !== 'function') {
2525
throw new ERR_INVALID_ARG_TYPE(
26-
'fn', ['Function', 'AsyncFunction'], this);
26+
'fn', ['Function', 'AsyncFunction'], fn);
2727
}
2828

2929
if (options != null && typeof options !== 'object') {
@@ -147,10 +147,23 @@ async function * map(fn, options) {
147147
}
148148
}
149149

150+
async function forEach(fn, options) {
151+
if (typeof fn !== 'function') {
152+
throw new ERR_INVALID_ARG_TYPE(
153+
'fn', ['Function', 'AsyncFunction'], fn);
154+
}
155+
async function forEachFn(value, options) {
156+
await fn(value, options);
157+
return kEmpty;
158+
}
159+
// eslint-disable-next-line no-unused-vars
160+
for await (const unused of this.map(forEachFn, options));
161+
}
162+
150163
async function * filter(fn, options) {
151164
if (typeof fn !== 'function') {
152-
throw (new ERR_INVALID_ARG_TYPE(
153-
'fn', ['Function', 'AsyncFunction'], this));
165+
throw new ERR_INVALID_ARG_TYPE(
166+
'fn', ['Function', 'AsyncFunction'], fn);
154167
}
155168
async function filterFn(value, options) {
156169
if (await fn(value, options)) {
@@ -160,7 +173,12 @@ async function * filter(fn, options) {
160173
}
161174
yield* this.map(filterFn, options);
162175
}
163-
module.exports = {
176+
177+
module.exports.streamReturningOperators = {
178+
filter,
164179
map,
165-
filter
180+
};
181+
182+
module.exports.promiseReturningOperators = {
183+
forEach,
166184
};

lib/stream.js

+12-3
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,10 @@ const {
3131
promisify: { custom: customPromisify },
3232
} = require('internal/util');
3333

34-
const operators = require('internal/streams/operators');
34+
const {
35+
streamReturningOperators,
36+
promiseReturningOperators,
37+
} = require('internal/streams/operators');
3538
const compose = require('internal/streams/compose');
3639
const { pipeline } = require('internal/streams/pipeline');
3740
const { destroyer } = require('internal/streams/destroy');
@@ -46,12 +49,18 @@ Stream.isDisturbed = utils.isDisturbed;
4649
Stream.isErrored = utils.isErrored;
4750
Stream.isReadable = utils.isReadable;
4851
Stream.Readable = require('internal/streams/readable');
49-
for (const key of ObjectKeys(operators)) {
50-
const op = operators[key];
52+
for (const key of ObjectKeys(streamReturningOperators)) {
53+
const op = streamReturningOperators[key];
5154
Stream.Readable.prototype[key] = function(...args) {
5255
return Stream.Readable.from(ReflectApply(op, this, args));
5356
};
5457
}
58+
for (const key of ObjectKeys(promiseReturningOperators)) {
59+
const op = promiseReturningOperators[key];
60+
Stream.Readable.prototype[key] = function(...args) {
61+
return ReflectApply(op, this, args);
62+
};
63+
}
5564
Stream.Writable = require('internal/streams/writable');
5665
Stream.Duplex = require('internal/streams/duplex');
5766
Stream.Transform = require('internal/streams/transform');

test/parallel/test-stream-forEach.js

+86
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const {
5+
Readable,
6+
} = require('stream');
7+
const assert = require('assert');
8+
const { setTimeout } = require('timers/promises');
9+
10+
{
11+
// forEach works on synchronous streams with a synchronous predicate
12+
const stream = Readable.from([1, 2, 3]);
13+
const result = [1, 2, 3];
14+
(async () => {
15+
await stream.forEach((value) => assert.strictEqual(value, result.shift()));
16+
})().then(common.mustCall());
17+
}
18+
19+
{
20+
// forEach works an asynchronous streams
21+
const stream = Readable.from([1, 2, 3]).filter(async (x) => {
22+
await Promise.resolve();
23+
return true;
24+
});
25+
const result = [1, 2, 3];
26+
(async () => {
27+
await stream.forEach((value) => assert.strictEqual(value, result.shift()));
28+
})().then(common.mustCall());
29+
}
30+
31+
{
32+
// forEach works on asynchronous streams with a asynchronous forEach fn
33+
const stream = Readable.from([1, 2, 3]).filter(async (x) => {
34+
await Promise.resolve();
35+
return true;
36+
});
37+
const result = [1, 2, 3];
38+
(async () => {
39+
await stream.forEach(async (value) => {
40+
await Promise.resolve();
41+
assert.strictEqual(value, result.shift());
42+
});
43+
})().then(common.mustCall());
44+
}
45+
46+
{
47+
// Concurrency + AbortSignal
48+
const ac = new AbortController();
49+
let calls = 0;
50+
const forEachPromise =
51+
Readable.from([1, 2, 3, 4]).forEach(async (_, { signal }) => {
52+
calls++;
53+
await setTimeout(100, { signal });
54+
}, { signal: ac.signal, concurrency: 2 });
55+
// pump
56+
assert.rejects(async () => {
57+
await forEachPromise;
58+
}, {
59+
name: 'AbortError',
60+
}).then(common.mustCall());
61+
62+
setImmediate(() => {
63+
ac.abort();
64+
assert.strictEqual(calls, 2);
65+
});
66+
}
67+
68+
{
69+
// Error cases
70+
assert.rejects(async () => {
71+
await Readable.from([1]).forEach(1);
72+
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
73+
assert.rejects(async () => {
74+
await Readable.from([1]).forEach((x) => x, {
75+
concurrency: 'Foo'
76+
});
77+
}, /ERR_OUT_OF_RANGE/).then(common.mustCall());
78+
assert.rejects(async () => {
79+
await Readable.from([1]).forEach((x) => x, 1);
80+
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
81+
}
82+
{
83+
// Test result is a Promise
84+
const stream = Readable.from([1, 2, 3, 4, 5]).forEach((_) => true);
85+
assert.strictEqual(typeof stream.then, 'function');
86+
}

0 commit comments

Comments
 (0)