Skip to content

Commit 5badf46

Browse files
benjamingrronag
andcommitted
stream: support some and every
This continues on the iterator-helpers work by adding `.some` and `.every` to readable streams. Co-Authored-By: Robert Nagy <[email protected]> PR-URL: #41573 Reviewed-By: Robert Nagy <[email protected]> Reviewed-By: Matteo Collina <[email protected]>
1 parent ef35175 commit 5badf46

File tree

3 files changed

+235
-1
lines changed

3 files changed

+235
-1
lines changed

doc/api/stream.md

+99-1
Original file line numberDiff line numberDiff line change
@@ -1918,7 +1918,7 @@ import { Resolver } from 'dns/promises';
19181918
await Readable.from([1, 2, 3, 4]).toArray(); // [1, 2, 3, 4]
19191919

19201920
// Make dns queries concurrently using .map and collect
1921-
// the results into an aray using toArray
1921+
// the results into an array using toArray
19221922
const dnsResults = await Readable.from([
19231923
'nodejs.org',
19241924
'openjsf.org',
@@ -1929,6 +1929,104 @@ const dnsResults = await Readable.from([
19291929
}, { concurrency: 2 }).toArray();
19301930
```
19311931

1932+
### `readable.some(fn[, options])`
1933+
1934+
<!-- YAML
1935+
added: REPLACEME
1936+
-->
1937+
1938+
> Stability: 1 - Experimental
1939+
1940+
* `fn` {Function|AsyncFunction} a function to call on each item of the stream.
1941+
* `data` {any} a chunk of data from the stream.
1942+
* `options` {Object}
1943+
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
1944+
abort the `fn` call early.
1945+
* `options` {Object}
1946+
* `concurrency` {number} the maximum concurrent invocation of `fn` to call
1947+
on the stream at once. **Default:** `1`.
1948+
* `signal` {AbortSignal} allows destroying the stream if the signal is
1949+
aborted.
1950+
* Returns: {Promise} a promise evaluating to `true` if `fn` returned a truthy
1951+
value for at least one of the chunks.
1952+
1953+
This method is similar to `Array.prototype.some` and calls `fn` on each chunk
1954+
in the stream until the awaited return value is `true` (or any truthy value).
1955+
Once an `fn` call on a chunk awaited return value is truthy, the stream is
1956+
destroyed and the promise is fulfilled with `true`. If none of the `fn`
1957+
calls on the chunks return a truthy value, the promise is fulfilled with
1958+
`false`.
1959+
1960+
```mjs
1961+
import { Readable } from 'stream';
1962+
import { stat } from 'fs/promises';
1963+
1964+
// With a synchronous predicate.
1965+
await Readable.from([1, 2, 3, 4]).some((x) => x > 2); // true
1966+
await Readable.from([1, 2, 3, 4]).some((x) => x < 0); // false
1967+
1968+
// With an asynchronous predicate, making at most 2 file checks at a time.
1969+
const anyBigFile = await Readable.from([
1970+
'file1',
1971+
'file2',
1972+
'file3',
1973+
]).some(async (fileName) => {
1974+
const stats = await stat(fileName);
1975+
return stat.size > 1024 * 1024;
1976+
}, { concurrency: 2 });
1977+
console.log(anyBigFile); // `true` if any file in the list is bigger than 1MB
1978+
console.log('done'); // Stream has finished
1979+
```
1980+
1981+
### `readable.every(fn[, options])`
1982+
1983+
<!-- YAML
1984+
added: REPLACEME
1985+
-->
1986+
1987+
> Stability: 1 - Experimental
1988+
1989+
* `fn` {Function|AsyncFunction} a function to call on each item of the stream.
1990+
* `data` {any} a chunk of data from the stream.
1991+
* `options` {Object}
1992+
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
1993+
abort the `fn` call early.
1994+
* `options` {Object}
1995+
* `concurrency` {number} the maximum concurrent invocation of `fn` to call
1996+
on the stream at once. **Default:** `1`.
1997+
* `signal` {AbortSignal} allows destroying the stream if the signal is
1998+
aborted.
1999+
* Returns: {Promise} a promise evaluating to `true` if `fn` returned a truthy
2000+
value for all of the chunks.
2001+
2002+
This method is similar to `Array.prototype.every` and calls `fn` on each chunk
2003+
in the stream to check if all awaited return values are truthy value for `fn`.
2004+
Once an `fn` call on a chunk awaited return value is falsy, the stream is
2005+
destroyed and the promise is fulfilled with `false`. If all of the `fn` calls
2006+
on the chunks return a truthy value, the promise is fulfilled with `true`.
2007+
2008+
```mjs
2009+
import { Readable } from 'stream';
2010+
import { stat } from 'fs/promises';
2011+
2012+
// With a synchronous predicate.
2013+
await Readable.from([1, 2, 3, 4]).every((x) => x > 2); // false
2014+
await Readable.from([1, 2, 3, 4]).every((x) => x > 0); // true
2015+
2016+
// With an asynchronous predicate, making at most 2 file checks at a time.
2017+
const allBigFiles = await Readable.from([
2018+
'file1',
2019+
'file2',
2020+
'file3',
2021+
]).every(async (fileName) => {
2022+
const stats = await stat(fileName);
2023+
return stat.size > 1024 * 1024;
2024+
}, { concurrency: 2 });
2025+
// `true` if all files in the list are bigger than 1MiB
2026+
console.log(allBigFiles);
2027+
console.log('done'); // Stream has finished
2028+
```
2029+
19322030
### Duplex and transform streams
19332031

19342032
#### Class: `stream.Duplex`

lib/internal/streams/operators.js

+41
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ const {
1010
AbortError,
1111
} = require('internal/errors');
1212
const { validateInteger } = require('internal/validators');
13+
const { kWeakHandler } = require('internal/event_target');
1314

1415
const {
1516
ArrayPrototypePush,
@@ -47,6 +48,10 @@ async function * map(fn, options) {
4748
const signalOpt = { signal };
4849

4950
const abort = () => ac.abort();
51+
if (options?.signal?.aborted) {
52+
abort();
53+
}
54+
5055
options?.signal?.addEventListener('abort', abort);
5156

5257
let next;
@@ -150,6 +155,40 @@ async function * map(fn, options) {
150155
}
151156
}
152157

158+
async function some(fn, options) {
159+
// https://tc39.es/proposal-iterator-helpers/#sec-iteratorprototype.some
160+
// Note that some does short circuit but also closes the iterator if it does
161+
const ac = new AbortController();
162+
if (options?.signal) {
163+
if (options.signal.aborted) {
164+
ac.abort();
165+
}
166+
options.signal.addEventListener('abort', () => ac.abort(), {
167+
[kWeakHandler]: this,
168+
once: true,
169+
});
170+
}
171+
const mapped = this.map(fn, { ...options, signal: ac.signal });
172+
for await (const result of mapped) {
173+
if (result) {
174+
ac.abort();
175+
return true;
176+
}
177+
}
178+
return false;
179+
}
180+
181+
async function every(fn, options) {
182+
if (typeof fn !== 'function') {
183+
throw new ERR_INVALID_ARG_TYPE(
184+
'fn', ['Function', 'AsyncFunction'], fn);
185+
}
186+
// https://en.wikipedia.org/wiki/De_Morgan%27s_laws
187+
return !(await some.call(this, async (x) => {
188+
return !(await fn(x));
189+
}, options));
190+
}
191+
153192
async function forEach(fn, options) {
154193
if (typeof fn !== 'function') {
155194
throw new ERR_INVALID_ARG_TYPE(
@@ -196,6 +235,8 @@ module.exports.streamReturningOperators = {
196235
};
197236

198237
module.exports.promiseReturningOperators = {
238+
every,
199239
forEach,
200240
toArray,
241+
some,
201242
};
+95
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const {
5+
Readable,
6+
} = require('stream');
7+
const assert = require('assert');
8+
9+
function oneTo5() {
10+
return Readable.from([1, 2, 3, 4, 5]);
11+
}
12+
13+
function oneTo5Async() {
14+
return oneTo5().map(async (x) => {
15+
await Promise.resolve();
16+
return x;
17+
});
18+
}
19+
{
20+
// Some and every work with a synchronous stream and predicate
21+
(async () => {
22+
assert.strictEqual(await oneTo5().some((x) => x > 3), true);
23+
assert.strictEqual(await oneTo5().every((x) => x > 3), false);
24+
assert.strictEqual(await oneTo5().some((x) => x > 6), false);
25+
assert.strictEqual(await oneTo5().every((x) => x < 6), true);
26+
assert.strictEqual(await Readable.from([]).some((x) => true), false);
27+
assert.strictEqual(await Readable.from([]).every((x) => true), true);
28+
})().then(common.mustCall());
29+
}
30+
31+
{
32+
// Some and every work with an asynchronous stream and synchronous predicate
33+
(async () => {
34+
assert.strictEqual(await oneTo5Async().some((x) => x > 3), true);
35+
assert.strictEqual(await oneTo5Async().every((x) => x > 3), false);
36+
assert.strictEqual(await oneTo5Async().some((x) => x > 6), false);
37+
assert.strictEqual(await oneTo5Async().every((x) => x < 6), true);
38+
})().then(common.mustCall());
39+
}
40+
41+
{
42+
// Some and every work on asynchronous streams with an asynchronous predicate
43+
(async () => {
44+
assert.strictEqual(await oneTo5().some(async (x) => x > 3), true);
45+
assert.strictEqual(await oneTo5().every(async (x) => x > 3), false);
46+
assert.strictEqual(await oneTo5().some(async (x) => x > 6), false);
47+
assert.strictEqual(await oneTo5().every(async (x) => x < 6), true);
48+
})().then(common.mustCall());
49+
}
50+
51+
{
52+
// Some and every short circuit
53+
(async () => {
54+
await oneTo5().some(common.mustCall((x) => x > 2, 3));
55+
await oneTo5().every(common.mustCall((x) => x < 3, 3));
56+
// When short circuit isn't possible the whole stream is iterated
57+
await oneTo5().some(common.mustCall((x) => x > 6, 5));
58+
// The stream is destroyed afterwards
59+
const stream = oneTo5();
60+
await stream.some(common.mustCall((x) => x > 2, 3));
61+
assert.strictEqual(stream.destroyed, true);
62+
})().then(common.mustCall());
63+
}
64+
65+
{
66+
// Support for AbortSignal
67+
const ac = new AbortController();
68+
assert.rejects(Readable.from([1, 2, 3]).some(
69+
() => new Promise(() => {}),
70+
{ signal: ac.signal }
71+
), {
72+
name: 'AbortError',
73+
}).then(common.mustCall());
74+
ac.abort();
75+
}
76+
{
77+
// Support for pre-aborted AbortSignal
78+
assert.rejects(Readable.from([1, 2, 3]).some(
79+
() => new Promise(() => {}),
80+
{ signal: AbortSignal.abort() }
81+
), {
82+
name: 'AbortError',
83+
}).then(common.mustCall());
84+
}
85+
{
86+
// Error cases
87+
assert.rejects(async () => {
88+
await Readable.from([1]).every(1);
89+
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
90+
assert.rejects(async () => {
91+
await Readable.from([1]).every((x) => x, {
92+
concurrency: 'Foo'
93+
});
94+
}, /ERR_OUT_OF_RANGE/).then(common.mustCall());
95+
}

0 commit comments

Comments
 (0)