Skip to content

Commit 50a0a22

Browse files
benjamingrbengl
authored andcommitted
stream: add more forEach tests
PR-URL: #41937 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Robert Nagy <[email protected]> Reviewed-By: Tobias Nießen <[email protected]> Reviewed-By: Mestery <[email protected]> Reviewed-By: James M Snell <[email protected]> Reviewed-By: Antoine du Hamel <[email protected]>
1 parent 6c9a36c commit 50a0a22

File tree

1 file changed

+47
-2
lines changed

1 file changed

+47
-2
lines changed

test/parallel/test-stream-forEach.js

+47-2
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ const {
55
Readable,
66
} = require('stream');
77
const assert = require('assert');
8-
const { setTimeout } = require('timers/promises');
8+
const { once } = require('events');
99

1010
{
1111
// forEach works on synchronous streams with a synchronous predicate
@@ -43,14 +43,59 @@ const { setTimeout } = require('timers/promises');
4343
})().then(common.mustCall());
4444
}
4545

46+
{
47+
// forEach works on an infinite stream
48+
const ac = new AbortController();
49+
const { signal } = ac;
50+
const stream = Readable.from(async function* () {
51+
while (true) yield 1;
52+
}(), { signal });
53+
let i = 0;
54+
assert.rejects(stream.forEach(common.mustCall((x) => {
55+
i++;
56+
if (i === 10) ac.abort();
57+
assert.strictEqual(x, 1);
58+
}, 10)), { name: 'AbortError' }).then(common.mustCall());
59+
}
60+
61+
{
62+
// Emitting an error during `forEach`
63+
const stream = Readable.from([1, 2, 3, 4, 5]);
64+
assert.rejects(stream.forEach(async (x) => {
65+
if (x === 3) {
66+
stream.emit('error', new Error('boom'));
67+
}
68+
}), /boom/).then(common.mustCall());
69+
}
70+
71+
{
72+
// Throwing an error during `forEach` (sync)
73+
const stream = Readable.from([1, 2, 3, 4, 5]);
74+
assert.rejects(stream.forEach((x) => {
75+
if (x === 3) {
76+
throw new Error('boom');
77+
}
78+
}), /boom/).then(common.mustCall());
79+
}
80+
81+
{
82+
// Throwing an error during `forEach` (async)
83+
const stream = Readable.from([1, 2, 3, 4, 5]);
84+
assert.rejects(stream.forEach(async (x) => {
85+
if (x === 3) {
86+
return Promise.reject(new Error('boom'));
87+
}
88+
}), /boom/).then(common.mustCall());
89+
}
90+
4691
{
4792
// Concurrency + AbortSignal
4893
const ac = new AbortController();
4994
let calls = 0;
5095
const forEachPromise =
5196
Readable.from([1, 2, 3, 4]).forEach(async (_, { signal }) => {
5297
calls++;
53-
await setTimeout(100, { signal });
98+
await once(signal, 'abort');
5499
}, { signal: ac.signal, concurrency: 2 });
55100
// pump
56101
assert.rejects(async () => {

0 commit comments

Comments
 (0)