Skip to content

Commit 09c25bb

Browse files
benjamingrtargos
authored andcommittedJan 14, 2022
stream: add filter method to readable
This continues the work in #40815 to make streams compatible with upcoming ECMAScript language features. It adds an experimental `filter` api to streams and tests/docs for it. See https://github.com/tc39/proposal-iterator-helpers/ Co-Authored-By: Robert Nagy <[email protected]> PR-URL: #41354 Reviewed-By: Robert Nagy <[email protected]> Reviewed-By: Matteo Collina <[email protected]>
1 parent 0d18a8c commit 09c25bb

File tree

3 files changed

+172
-0
lines changed

3 files changed

+172
-0
lines changed
 

‎doc/api/stream.md

+49
Original file line numberDiff line numberDiff line change
@@ -1727,6 +1727,55 @@ for await (const result of dnsResults) {
17271727
}
17281728
```
17291729

1730+
### `readable.filter(fn[, options])`
1731+
1732+
<!-- YAML
1733+
added: REPLACEME
1734+
-->
1735+
1736+
> Stability: 1 - Experimental
1737+
1738+
* `fn` {Function|AsyncFunction} a function to filter items from stream.
1739+
* `data` {any} a chunk of data from the stream.
1740+
* `options` {Object}
1741+
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
1742+
abort the `fn` call early.
1743+
* `options` {Object}
1744+
* `concurrency` {number} the maximal concurrent invocation of `fn` to call
1745+
on the stream at once. **Default:** `1`.
1746+
* `signal` {AbortSignal} allows destroying the stream if the signal is
1747+
aborted.
1748+
* Returns: {Readable} a stream filtered with the predicate `fn`.
1749+
1750+
This method allows filtering the stream. For each item in the stream the `fn`
1751+
function will be called and if it returns a truthy value, the item will be
1752+
passed to the result stream. If the `fn` function returns a promise - that
1753+
promise will be `await`ed.
1754+
1755+
```mjs
1756+
import { Readable } from 'stream';
1757+
import { Resolver } from 'dns/promises';
1758+
1759+
// With a synchronous predicate.
1760+
for await (const item of Readable.from([1, 2, 3, 4]).filter((x) => x > 2)) {
1761+
console.log(item); // 3, 4
1762+
}
1763+
// With an asynchronous predicate, making at most 2 queries at a time.
1764+
const resolver = new Resolver();
1765+
const dnsResults = await Readable.from([
1766+
'nodejs.org',
1767+
'openjsf.org',
1768+
'www.linuxfoundation.org',
1769+
]).filter(async (domain) => {
1770+
const { address } = await resolver.resolve4(domain, { ttl: true });
1771+
return address.ttl > 60;
1772+
}, { concurrency: 2 });
1773+
for await (const result of dnsResults) {
1774+
// Logs domains with more than 60 seconds on the resolved dns record.
1775+
console.log(result);
1776+
}
1777+
```
1778+
17301779
### Duplex and transform streams
17311780

17321781
#### Class: `stream.Duplex`

‎lib/internal/streams/operators.js

+14
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,20 @@ async function * map(fn, options) {
147147
}
148148
}
149149

150+
async function * filter(fn, options) {
151+
if (typeof fn !== 'function') {
152+
throw (new ERR_INVALID_ARG_TYPE(
153+
'fn', ['Function', 'AsyncFunction'], this));
154+
}
155+
async function filterFn(value, options) {
156+
if (await fn(value, options)) {
157+
return value;
158+
}
159+
return kEmpty;
160+
}
161+
yield* this.map(filterFn, options);
162+
}
150163
module.exports = {
151164
map,
165+
filter
152166
};

‎test/parallel/test-stream-filter.js

+109
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const {
5+
Readable,
6+
} = require('stream');
7+
const assert = require('assert');
8+
const { setTimeout } = require('timers/promises');
9+
10+
{
11+
// Filter works on synchronous streams with a synchronous predicate
12+
const stream = Readable.from([1, 2, 3, 4, 5]).filter((x) => x < 3);
13+
const result = [1, 2];
14+
(async () => {
15+
for await (const item of stream) {
16+
assert.strictEqual(item, result.shift());
17+
}
18+
})().then(common.mustCall());
19+
}
20+
21+
{
22+
// Filter works on synchronous streams with an asynchronous predicate
23+
const stream = Readable.from([1, 2, 3, 4, 5]).filter(async (x) => {
24+
await Promise.resolve();
25+
return x > 3;
26+
});
27+
const result = [4, 5];
28+
(async () => {
29+
for await (const item of stream) {
30+
assert.strictEqual(item, result.shift());
31+
}
32+
})().then(common.mustCall());
33+
}
34+
35+
{
36+
// Map works on asynchronous streams with a asynchronous mapper
37+
const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => {
38+
await Promise.resolve();
39+
return x + x;
40+
}).filter((x) => x > 5);
41+
const result = [6, 8, 10];
42+
(async () => {
43+
for await (const item of stream) {
44+
assert.strictEqual(item, result.shift());
45+
}
46+
})().then(common.mustCall());
47+
}
48+
49+
{
50+
// Concurrency + AbortSignal
51+
const ac = new AbortController();
52+
let calls = 0;
53+
const stream = Readable.from([1, 2, 3, 4]).filter(async (_, { signal }) => {
54+
calls++;
55+
await setTimeout(100, { signal });
56+
}, { signal: ac.signal, concurrency: 2 });
57+
// pump
58+
assert.rejects(async () => {
59+
for await (const item of stream) {
60+
// nope
61+
console.log(item);
62+
}
63+
}, {
64+
name: 'AbortError',
65+
}).then(common.mustCall());
66+
67+
setImmediate(() => {
68+
ac.abort();
69+
assert.strictEqual(calls, 2);
70+
});
71+
}
72+
73+
{
74+
// Concurrency result order
75+
const stream = Readable.from([1, 2]).filter(async (item, { signal }) => {
76+
await setTimeout(10 - item, { signal });
77+
return true;
78+
}, { concurrency: 2 });
79+
80+
(async () => {
81+
const expected = [1, 2];
82+
for await (const item of stream) {
83+
assert.strictEqual(item, expected.shift());
84+
}
85+
})().then(common.mustCall());
86+
}
87+
88+
{
89+
// Error cases
90+
assert.rejects(async () => {
91+
// eslint-disable-next-line no-unused-vars
92+
for await (const unused of Readable.from([1]).filter(1));
93+
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
94+
assert.rejects(async () => {
95+
// eslint-disable-next-line no-unused-vars
96+
for await (const _ of Readable.from([1]).filter((x) => x, {
97+
concurrency: 'Foo'
98+
}));
99+
}, /ERR_OUT_OF_RANGE/).then(common.mustCall());
100+
assert.rejects(async () => {
101+
// eslint-disable-next-line no-unused-vars
102+
for await (const _ of Readable.from([1]).filter((x) => x, 1));
103+
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
104+
}
105+
{
106+
// Test result is a Readable
107+
const stream = Readable.from([1, 2, 3, 4, 5]).filter((x) => true);
108+
assert.strictEqual(stream.readable, true);
109+
}

0 commit comments

Comments
 (0)
Please sign in to comment.