Skip to content

Commit 6c9a36c

Browse files
benjamingrbengl
authored andcommitted
stream: add more filter tests
PR-URL: #41936 Reviewed-By: Robert Nagy <[email protected]> Reviewed-By: James M Snell <[email protected]> Reviewed-By: Matteo Collina <[email protected]>
1 parent 88cbceb commit 6c9a36c

File tree

1 file changed

+69
-1
lines changed

1 file changed

+69
-1
lines changed

test/parallel/test-stream-filter.js

+69-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ const {
55
Readable,
66
} = require('stream');
77
const assert = require('assert');
8+
const { once } = require('events');
89
const { setTimeout } = require('timers/promises');
910

1011
{
@@ -46,13 +47,80 @@ const { setTimeout } = require('timers/promises');
4647
})().then(common.mustCall());
4748
}
4849

50+
{
51+
// Filter works on an infinite stream
52+
const stream = Readable.from(async function* () {
53+
while (true) yield 1;
54+
}()).filter(common.mustCall(async (x) => {
55+
return x < 3;
56+
}, 5));
57+
(async () => {
58+
let i = 1;
59+
for await (const item of stream) {
60+
assert.strictEqual(item, 1);
61+
if (++i === 5) break;
62+
}
63+
})().then(common.mustCall());
64+
}
65+
66+
{
67+
// Filter works on constructor created streams
68+
let i = 0;
69+
const stream = new Readable({
70+
read() {
71+
if (i === 10) {
72+
this.push(null);
73+
return;
74+
}
75+
this.push(Uint8Array.from([i]));
76+
i++;
77+
},
78+
highWaterMark: 0,
79+
}).filter(common.mustCall(async ([x]) => {
80+
return x !== 5;
81+
}, 10));
82+
(async () => {
83+
const result = (await stream.toArray()).map((x) => x[0]);
84+
const expected = [...Array(10).keys()].filter((x) => x !== 5);
85+
assert.deepStrictEqual(result, expected);
86+
})().then(common.mustCall());
87+
}
88+
89+
{
90+
// Throwing an error during `filter` (sync)
91+
const stream = Readable.from([1, 2, 3, 4, 5]).filter((x) => {
92+
if (x === 3) {
93+
throw new Error('boom');
94+
}
95+
return true;
96+
});
97+
assert.rejects(
98+
stream.map((x) => x + x).toArray(),
99+
/boom/,
100+
).then(common.mustCall());
101+
}
102+
103+
{
104+
// Throwing an error during `filter` (async)
105+
const stream = Readable.from([1, 2, 3, 4, 5]).filter(async (x) => {
106+
if (x === 3) {
107+
throw new Error('boom');
108+
}
109+
return true;
110+
});
111+
assert.rejects(
112+
stream.filter(() => true).toArray(),
113+
/boom/,
114+
).then(common.mustCall());
115+
}
116+
49117
{
50118
// Concurrency + AbortSignal
51119
const ac = new AbortController();
52120
let calls = 0;
53121
const stream = Readable.from([1, 2, 3, 4]).filter(async (_, { signal }) => {
54122
calls++;
55-
await setTimeout(100, { signal });
123+
await once(signal, 'abort');
56124
}, { signal: ac.signal, concurrency: 2 });
57125
// pump
58126
assert.rejects(async () => {

0 commit comments

Comments
 (0)