From bb0fff5f6636c59df506c45ff604f637c23e29a6 Mon Sep 17 00:00:00 2001 From: Nitzan Uziely <linkgoron@gmail.com> Date: Fri, 4 Feb 2022 14:24:01 +0200 Subject: [PATCH 1/2] stream: add iterator helper find Continue iterator-helpers work by adding `find` to readable streams. --- doc/api/stream.md | 50 +++++++ lib/internal/streams/operators.js | 42 ++---- test/parallel/test-stream-some-every.js | 166 +++++++++++++++++------- 3 files changed, 182 insertions(+), 76 deletions(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index 265e3a4ce4657c..9defb46ecb5d3f 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1976,6 +1976,56 @@ console.log(anyBigFile); // `true` if any file in the list is bigger than 1MB console.log('done'); // Stream has finished ``` +### `readable.find(fn[, options])` + +<!-- YAML +added: REPLACEME +--> + +> Stability: 1 - Experimental + +* `fn` {Function|AsyncFunction} a function to call on each item of the stream. + * `data` {any} a chunk of data from the stream. + * `options` {Object} + * `signal` {AbortSignal} aborted if the stream is destroyed allowing to + abort the `fn` call early. +* `options` {Object} + * `concurrency` {number} the maximum concurrent invocation of `fn` to call + on the stream at once. **Default:** `1`. + * `signal` {AbortSignal} allows destroying the stream if the signal is + aborted. +* Returns: {Promise} a promise evaluating to the first chunk for which `fn` + evaluated with a truthy value, or `undefined` if no element was found. + +This method is similar to `Array.prototype.find` and calls `fn` on each chunk +in the stream to find a chunk with a truthy value for `fn`. Once an `fn` call's +awaited return value is truthy, the stream is destroyed and the promise is +fulfilled with value for which the value of `fn` was truthy for. If all of the +`fn` calls on the chunks return a falsy value, the promise is fulfilled with +`undefined`. + +```mjs +import { Readable } from 'stream'; +import { stat } from 'fs/promises'; + +// With a synchronous predicate. +await Readable.from([1, 2, 3, 4]).find((x) => x > 2); // 3 +await Readable.from([1, 2, 3, 4]).find((x) => x > 0); // 1 +await Readable.from([1, 2, 3, 4]).find((x) => x > 10); // undefined + +// With an asynchronous predicate, making at most 2 file checks at a time. +const foundBigFile = await Readable.from([ + 'file1', + 'file2', + 'file3', +]).find(async (fileName) => { + const stats = await stat(fileName); + return stat.size > 1024 * 1024; +}, { concurrency: 2 }); +console.log(foundBigFile); // File name of large file, if any file in the list is bigger than 1MB +console.log('done'); // Stream has finished +``` + ### `readable.every(fn[, options])` <!-- YAML diff --git a/lib/internal/streams/operators.js b/lib/internal/streams/operators.js index a6efa92afb7025..fe77bf91e94885 100644 --- a/lib/internal/streams/operators.js +++ b/lib/internal/streams/operators.js @@ -186,31 +186,9 @@ function asIndexedPairs(options = undefined) { } async function some(fn, options) { - if (options != null && typeof options !== 'object') { - throw new ERR_INVALID_ARG_TYPE('options', ['Object']); - } - if (options?.signal != null) { - validateAbortSignal(options.signal, 'options.signal'); - } - - // https://tc39.es/proposal-iterator-helpers/#sec-iteratorprototype.some - // Note that some does short circuit but also closes the iterator if it does - const ac = new AbortController(); - if (options?.signal) { - if (options.signal.aborted) { - ac.abort(); - } - options.signal.addEventListener('abort', () => ac.abort(), { - [kWeakHandler]: this, - once: true, - }); - } - const mapped = this.map(fn, { ...options, signal: ac.signal }); - for await (const result of mapped) { - if (result) { - ac.abort(); - return true; - } + // eslint-disable-next-line no-unused-vars + for await (const unused of filter.call(this, fn, options)) { + return true; } return false; } @@ -226,6 +204,13 @@ async function every(fn, options) { }, options)); } +async function find(fn, options) { + for await (const result of filter.call(this, fn, options)) { + return result; + } + return undefined; +} + async function forEach(fn, options) { if (typeof fn !== 'function') { throw new ERR_INVALID_ARG_TYPE( @@ -236,7 +221,7 @@ async function forEach(fn, options) { return kEmpty; } // eslint-disable-next-line no-unused-vars - for await (const unused of this.map(forEachFn, options)); + for await (const unused of map.call(this, forEachFn, options)); } function filter(fn, options) { @@ -250,7 +235,7 @@ function filter(fn, options) { } return kEmpty; } - return this.map(filterFn, options); + return map.call(this, filterFn, options); } // Specific to provide better error to reduce since the argument is only @@ -329,7 +314,7 @@ async function toArray(options) { } function flatMap(fn, options) { - const values = this.map(fn, options); + const values = map.call(this, fn, options); return async function* flatMap() { for await (const val of values) { yield* val; @@ -415,4 +400,5 @@ module.exports.promiseReturningOperators = { reduce, toArray, some, + find, }; diff --git a/test/parallel/test-stream-some-every.js b/test/parallel/test-stream-some-every.js index 91733613049121..c63d5c561af31d 100644 --- a/test/parallel/test-stream-some-every.js +++ b/test/parallel/test-stream-some-every.js @@ -1,6 +1,7 @@ 'use strict'; const common = require('../common'); +const { setTimeout } = require('timers/promises'); const { Readable, } = require('stream'); @@ -17,90 +18,159 @@ function oneTo5Async() { }); } { - // Some and every work with a synchronous stream and predicate + // Some, find, and every work with a synchronous stream and predicate (async () => { assert.strictEqual(await oneTo5().some((x) => x > 3), true); assert.strictEqual(await oneTo5().every((x) => x > 3), false); + assert.strictEqual(await oneTo5().find((x) => x > 3), 4); assert.strictEqual(await oneTo5().some((x) => x > 6), false); assert.strictEqual(await oneTo5().every((x) => x < 6), true); - assert.strictEqual(await Readable.from([]).some((x) => true), false); - assert.strictEqual(await Readable.from([]).every((x) => true), true); + assert.strictEqual(await oneTo5().find((x) => x > 6), undefined); + assert.strictEqual(await Readable.from([]).some(() => true), false); + assert.strictEqual(await Readable.from([]).every(() => true), true); + assert.strictEqual(await Readable.from([]).find(() => true), undefined); })().then(common.mustCall()); } { - // Some and every work with an asynchronous stream and synchronous predicate + // Some, find, and every work with an asynchronous stream and synchronous predicate (async () => { assert.strictEqual(await oneTo5Async().some((x) => x > 3), true); assert.strictEqual(await oneTo5Async().every((x) => x > 3), false); + assert.strictEqual(await oneTo5Async().find((x) => x > 3), 4); assert.strictEqual(await oneTo5Async().some((x) => x > 6), false); assert.strictEqual(await oneTo5Async().every((x) => x < 6), true); + assert.strictEqual(await oneTo5Async().find((x) => x > 6), undefined); })().then(common.mustCall()); } { - // Some and every work on asynchronous streams with an asynchronous predicate + // Some, find, and every work on synchronous streams with an asynchronous predicate (async () => { assert.strictEqual(await oneTo5().some(async (x) => x > 3), true); assert.strictEqual(await oneTo5().every(async (x) => x > 3), false); + assert.strictEqual(await oneTo5().find(async (x) => x > 3), 4); assert.strictEqual(await oneTo5().some(async (x) => x > 6), false); assert.strictEqual(await oneTo5().every(async (x) => x < 6), true); + assert.strictEqual(await oneTo5().find(async (x) => x > 6), undefined); })().then(common.mustCall()); } { - // Some and every short circuit + // Some, find, and every work on asynchronous streams with an asynchronous predicate (async () => { - await oneTo5().some(common.mustCall((x) => x > 2, 3)); - await oneTo5().every(common.mustCall((x) => x < 3, 3)); - // When short circuit isn't possible the whole stream is iterated - await oneTo5().some(common.mustCall((x) => x > 6, 5)); - // The stream is destroyed afterwards - const stream = oneTo5(); - await stream.some(common.mustCall((x) => x > 2, 3)); + assert.strictEqual(await oneTo5Async().some(async (x) => x > 3), true); + assert.strictEqual(await oneTo5Async().every(async (x) => x > 3), false); + assert.strictEqual(await oneTo5Async().find(async (x) => x > 3), 4); + assert.strictEqual(await oneTo5Async().some(async (x) => x > 6), false); + assert.strictEqual(await oneTo5Async().every(async (x) => x < 6), true); + assert.strictEqual(await oneTo5Async().find(async (x) => x > 6), undefined); + })().then(common.mustCall()); +} + +{ + async function checkDestroyed(stream) { + // eslint-disable-next-line no-restricted-syntax + await setTimeout(); assert.strictEqual(stream.destroyed, true); + } + // Some, find, and every short circuit + (async () => { + const someStream = oneTo5(); + await someStream.some(common.mustCall((x) => x > 2, 3)); + await checkDestroyed(someStream); + + const everyStream = oneTo5(); + await everyStream.every(common.mustCall((x) => x < 3, 3)); + await checkDestroyed(everyStream); + + const findStream = oneTo5(); + await findStream.find(common.mustCall((x) => x > 1, 2)); + await checkDestroyed(findStream); + + // When short circuit isn't possible the whole stream is iterated + await oneTo5().some(common.mustCall(() => false, 5)); + await oneTo5().every(common.mustCall(() => true, 5)); + await oneTo5().find(common.mustCall(() => false, 5)); + })().then(common.mustCall()); + + // Some, find, and every short circuit async stream/predicate + (async () => { + const someStream = oneTo5Async(); + await someStream.some(common.mustCall(async (x) => x > 2, 3)); + await checkDestroyed(someStream); + + const everyStream = oneTo5Async(); + await everyStream.every(common.mustCall(async (x) => x < 3, 3)); + await checkDestroyed(everyStream); + + const findStream = oneTo5Async(); + await findStream.find(common.mustCall(async (x) => x > 1, 2)); + await checkDestroyed(findStream); + + // When short circuit isn't possible the whole stream is iterated + await oneTo5Async().some(common.mustCall(async () => false, 5)); + await oneTo5Async().every(common.mustCall(async () => true, 5)); + await oneTo5Async().find(common.mustCall(async () => false, 5)); })().then(common.mustCall()); } { - // Support for AbortSignal - const ac = new AbortController(); - assert.rejects(Readable.from([1, 2, 3]).some( - () => new Promise(() => {}), - { signal: ac.signal } - ), { - name: 'AbortError', - }).then(common.mustCall()); - ac.abort(); + // Concurrency doesn't affect which value is found. + (async () => { + const found = await Readable.from([1, 2]).find(async (val) => { + if (val === 1) { + // eslint-disable-next-line no-restricted-syntax + await setTimeout(100); + } + return true; + }, { concurrency: 2 }); + assert.strictEqual(found, 1); + })().then(common.mustCall()); } + { - // Support for pre-aborted AbortSignal - assert.rejects(Readable.from([1, 2, 3]).some( - () => new Promise(() => {}), - { signal: AbortSignal.abort() } - ), { - name: 'AbortError', - }).then(common.mustCall()); + // Support for AbortSignal + for (const op of ['some', 'every', 'find']) { + { + const ac = new AbortController(); + assert.rejects(Readable.from([1, 2, 3])[op]( + () => new Promise(() => { }), + { signal: ac.signal } + ), { + name: 'AbortError', + }, `${op} should abort correctly with sync abort`).then(common.mustCall()); + ac.abort(); + } + { + // Support for pre-aborted AbortSignal + assert.rejects(Readable.from([1, 2, 3])[op]( + () => new Promise(() => { }), + { signal: AbortSignal.abort() } + ), { + name: 'AbortError', + }, `${op} should abort with pre-aborted abort controller`).then(common.mustCall()); + } + } } { // Error cases - assert.rejects(async () => { - await Readable.from([1]).every(1); - }, /ERR_INVALID_ARG_TYPE/).then(common.mustCall()); - - assert.rejects(async () => { - await Readable.from([1]).every((x) => x, 1); - }, /ERR_INVALID_ARG_TYPE/).then(common.mustCall()); - - assert.rejects(async () => { - await Readable.from([1]).every((x) => x, { - signal: true - }); - }, /ERR_INVALID_ARG_TYPE/).then(common.mustCall()); - - assert.rejects(async () => { - await Readable.from([1]).every((x) => x, { - concurrency: 'Foo' - }); - }, /ERR_OUT_OF_RANGE/).then(common.mustCall()); + for (const op of ['some', 'every', 'find']) { + assert.rejects(async () => { + await Readable.from([1])[op](1); + }, /ERR_INVALID_ARG_TYPE/, `${op} should throw for invalid function`).then(common.mustCall()); + assert.rejects(async () => { + await Readable.from([1])[op]((x) => x, { + concurrency: 'Foo' + }); + }, /ERR_OUT_OF_RANGE/, `${op} should throw for invalid concurrency`).then(common.mustCall()); + assert.rejects(async () => { + await Readable.from([1])[op]((x) => x, 1); + }, /ERR_INVALID_ARG_TYPE/, `${op} should throw for invalid concurrency`).then(common.mustCall()); + assert.rejects(async () => { + await Readable.from([1])[op]((x) => x, { + signal: true + }); + }, /ERR_INVALID_ARG_TYPE/, `${op} should throw for invalid signal`).then(common.mustCall()); + } } From ba84defb3c1e6056d2c54c9c6e2a9ae4b7c8c317 Mon Sep 17 00:00:00 2001 From: Nitzan Uziely <linkgoron@gmail.com> Date: Fri, 4 Feb 2022 14:24:41 +0200 Subject: [PATCH 2/2] fixup! stream: add iterator helper find --- doc/api/stream.md | 43 +++---- test/parallel/test-stream-filter.js | 8 ++ test/parallel/test-stream-flatMap.js | 8 ++ test/parallel/test-stream-forEach.js | 8 ++ ...ery.js => test-stream-some-find-every.mjs} | 114 +++++++++--------- 5 files changed, 102 insertions(+), 79 deletions(-) rename test/parallel/{test-stream-some-every.js => test-stream-some-find-every.mjs} (57%) diff --git a/doc/api/stream.md b/doc/api/stream.md index 9defb46ecb5d3f..10116fe4a6088c 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1745,7 +1745,8 @@ added: v17.4.0 > Stability: 1 - Experimental -* `fn` {Function|AsyncFunction} a function to map over every item in the stream. +* `fn` {Function|AsyncFunction} a function to map over every chunk in the + stream. * `data` {any} a chunk of data from the stream. * `options` {Object} * `signal` {AbortSignal} aborted if the stream is destroyed allowing to @@ -1758,7 +1759,7 @@ added: v17.4.0 * Returns: {Readable} a stream mapped with the function `fn`. This method allows mapping over the stream. The `fn` function will be called -for every item in the stream. If the `fn` function returns a promise - that +for every chunk in the stream. If the `fn` function returns a promise - that promise will be `await`ed before being passed to the result stream. ```mjs @@ -1766,8 +1767,8 @@ import { Readable } from 'stream'; import { Resolver } from 'dns/promises'; // With a synchronous mapper. -for await (const item of Readable.from([1, 2, 3, 4]).map((x) => x * 2)) { - console.log(item); // 2, 4, 6, 8 +for await (const chunk of Readable.from([1, 2, 3, 4]).map((x) => x * 2)) { + console.log(chunk); // 2, 4, 6, 8 } // With an asynchronous mapper, making at most 2 queries at a time. const resolver = new Resolver(); @@ -1789,7 +1790,7 @@ added: v17.4.0 > Stability: 1 - Experimental -* `fn` {Function|AsyncFunction} a function to filter items from stream. +* `fn` {Function|AsyncFunction} a function to filter chunks from the stream. * `data` {any} a chunk of data from the stream. * `options` {Object} * `signal` {AbortSignal} aborted if the stream is destroyed allowing to @@ -1801,8 +1802,8 @@ added: v17.4.0 aborted. * Returns: {Readable} a stream filtered with the predicate `fn`. -This method allows filtering the stream. For each item in the stream the `fn` -function will be called and if it returns a truthy value, the item will be +This method allows filtering the stream. For each chunk in the stream the `fn` +function will be called and if it returns a truthy value, the chunk will be passed to the result stream. If the `fn` function returns a promise - that promise will be `await`ed. @@ -1811,8 +1812,8 @@ import { Readable } from 'stream'; import { Resolver } from 'dns/promises'; // With a synchronous predicate. -for await (const item of Readable.from([1, 2, 3, 4]).filter((x) => x > 2)) { - console.log(item); // 3, 4 +for await (const chunk of Readable.from([1, 2, 3, 4]).filter((x) => x > 2)) { + console.log(chunk); // 3, 4 } // With an asynchronous predicate, making at most 2 queries at a time. const resolver = new Resolver(); @@ -1838,7 +1839,7 @@ added: REPLACEME > Stability: 1 - Experimental -* `fn` {Function|AsyncFunction} a function to call on each item of the stream. +* `fn` {Function|AsyncFunction} a function to call on each chunk of the stream. * `data` {any} a chunk of data from the stream. * `options` {Object} * `signal` {AbortSignal} aborted if the stream is destroyed allowing to @@ -1850,12 +1851,12 @@ added: REPLACEME aborted. * Returns: {Promise} a promise for when the stream has finished. -This method allows iterating a stream. For each item in the stream the +This method allows iterating a stream. For each chunk in the stream the `fn` function will be called. If the `fn` function returns a promise - that promise will be `await`ed. This method is different from `for await...of` loops in that it can optionally -process items concurrently. In addition, a `forEach` iteration can only be +process chunks concurrently. In addition, a `forEach` iteration can only be stopped by having passed a `signal` option and aborting the related `AbortController` while `for await...of` can be stopped with `break` or `return`. In either case the stream will be destroyed. @@ -1869,8 +1870,8 @@ import { Readable } from 'stream'; import { Resolver } from 'dns/promises'; // With a synchronous predicate. -for await (const item of Readable.from([1, 2, 3, 4]).filter((x) => x > 2)) { - console.log(item); // 3, 4 +for await (const chunk of Readable.from([1, 2, 3, 4]).filter((x) => x > 2)) { + console.log(chunk); // 3, 4 } // With an asynchronous predicate, making at most 2 queries at a time. const resolver = new Resolver(); @@ -1935,7 +1936,7 @@ added: REPLACEME > Stability: 1 - Experimental -* `fn` {Function|AsyncFunction} a function to call on each item of the stream. +* `fn` {Function|AsyncFunction} a function to call on each chunk of the stream. * `data` {any} a chunk of data from the stream. * `options` {Object} * `signal` {AbortSignal} aborted if the stream is destroyed allowing to @@ -1984,7 +1985,7 @@ added: REPLACEME > Stability: 1 - Experimental -* `fn` {Function|AsyncFunction} a function to call on each item of the stream. +* `fn` {Function|AsyncFunction} a function to call on each chunk of the stream. * `data` {any} a chunk of data from the stream. * `options` {Object} * `signal` {AbortSignal} aborted if the stream is destroyed allowing to @@ -2000,7 +2001,7 @@ added: REPLACEME This method is similar to `Array.prototype.find` and calls `fn` on each chunk in the stream to find a chunk with a truthy value for `fn`. Once an `fn` call's awaited return value is truthy, the stream is destroyed and the promise is -fulfilled with value for which the value of `fn` was truthy for. If all of the +fulfilled with value for which `fn` returned a truthy value. If all of the `fn` calls on the chunks return a falsy value, the promise is fulfilled with `undefined`. @@ -2034,7 +2035,7 @@ added: REPLACEME > Stability: 1 - Experimental -* `fn` {Function|AsyncFunction} a function to call on each item of the stream. +* `fn` {Function|AsyncFunction} a function to call on each chunk of the stream. * `data` {any} a chunk of data from the stream. * `options` {Object} * `signal` {AbortSignal} aborted if the stream is destroyed allowing to @@ -2084,7 +2085,7 @@ added: REPLACEME > Stability: 1 - Experimental * `fn` {Function|AsyncGeneratorFunction|AsyncFunction} a function to map over - every item in the stream. + every chunk in the stream. * `data` {any} a chunk of data from the stream. * `options` {Object} * `signal` {AbortSignal} aborted if the stream is destroyed allowing to @@ -2108,8 +2109,8 @@ import { Readable } from 'stream'; import { createReadStream } from 'fs'; // With a synchronous mapper. -for await (const item of Readable.from([1, 2, 3, 4]).flatMap((x) => [x, x])) { - console.log(item); // 1, 1, 2, 2, 3, 3, 4, 4 +for await (const chunk of Readable.from([1, 2, 3, 4]).flatMap((x) => [x, x])) { + console.log(chunk); // 1, 1, 2, 2, 3, 3, 4, 4 } // With an asynchronous mapper, combine the contents of 4 files const concatResult = Readable.from([ diff --git a/test/parallel/test-stream-filter.js b/test/parallel/test-stream-filter.js index 494c94f02f8cb0..4eeb877980b971 100644 --- a/test/parallel/test-stream-filter.js +++ b/test/parallel/test-stream-filter.js @@ -98,3 +98,11 @@ const { setTimeout } = require('timers/promises'); const stream = Readable.from([1, 2, 3, 4, 5]).filter((x) => true); assert.strictEqual(stream.readable, true); } +{ + const stream = Readable.from([1, 2, 3, 4, 5]); + Object.defineProperty(stream, 'map', { + value: common.mustNotCall(() => {}), + }); + // Check that map isn't getting called. + stream.filter(() => true); +} diff --git a/test/parallel/test-stream-flatMap.js b/test/parallel/test-stream-flatMap.js index cd5e0c8a9c403e..07c534a9c1f481 100644 --- a/test/parallel/test-stream-flatMap.js +++ b/test/parallel/test-stream-flatMap.js @@ -121,3 +121,11 @@ function oneTo5() { const stream = oneTo5().flatMap((x) => x); assert.strictEqual(stream.readable, true); } +{ + const stream = oneTo5(); + Object.defineProperty(stream, 'map', { + value: common.mustNotCall(() => {}), + }); + // Check that map isn't getting called. + stream.flatMap(() => true); +} diff --git a/test/parallel/test-stream-forEach.js b/test/parallel/test-stream-forEach.js index 82013554cd2aa8..e26310e3bccb85 100644 --- a/test/parallel/test-stream-forEach.js +++ b/test/parallel/test-stream-forEach.js @@ -84,3 +84,11 @@ const { setTimeout } = require('timers/promises'); const stream = Readable.from([1, 2, 3, 4, 5]).forEach((_) => true); assert.strictEqual(typeof stream.then, 'function'); } +{ + const stream = Readable.from([1, 2, 3, 4, 5]); + Object.defineProperty(stream, 'map', { + value: common.mustNotCall(() => {}), + }); + // Check that map isn't getting called. + stream.forEach(() => true); +} diff --git a/test/parallel/test-stream-some-every.js b/test/parallel/test-stream-some-find-every.mjs similarity index 57% rename from test/parallel/test-stream-some-every.js rename to test/parallel/test-stream-some-find-every.mjs index c63d5c561af31d..1b657128a2695d 100644 --- a/test/parallel/test-stream-some-every.js +++ b/test/parallel/test-stream-some-find-every.mjs @@ -1,11 +1,8 @@ -'use strict'; +import * as common from '../common/index.mjs'; +import { setTimeout } from 'timers/promises'; +import { Readable } from 'stream'; +import assert from 'assert'; -const common = require('../common'); -const { setTimeout } = require('timers/promises'); -const { - Readable, -} = require('stream'); -const assert = require('assert'); function oneTo5() { return Readable.from([1, 2, 3, 4, 5]); @@ -19,53 +16,45 @@ function oneTo5Async() { } { // Some, find, and every work with a synchronous stream and predicate - (async () => { - assert.strictEqual(await oneTo5().some((x) => x > 3), true); - assert.strictEqual(await oneTo5().every((x) => x > 3), false); - assert.strictEqual(await oneTo5().find((x) => x > 3), 4); - assert.strictEqual(await oneTo5().some((x) => x > 6), false); - assert.strictEqual(await oneTo5().every((x) => x < 6), true); - assert.strictEqual(await oneTo5().find((x) => x > 6), undefined); - assert.strictEqual(await Readable.from([]).some(() => true), false); - assert.strictEqual(await Readable.from([]).every(() => true), true); - assert.strictEqual(await Readable.from([]).find(() => true), undefined); - })().then(common.mustCall()); + assert.strictEqual(await oneTo5().some((x) => x > 3), true); + assert.strictEqual(await oneTo5().every((x) => x > 3), false); + assert.strictEqual(await oneTo5().find((x) => x > 3), 4); + assert.strictEqual(await oneTo5().some((x) => x > 6), false); + assert.strictEqual(await oneTo5().every((x) => x < 6), true); + assert.strictEqual(await oneTo5().find((x) => x > 6), undefined); + assert.strictEqual(await Readable.from([]).some(() => true), false); + assert.strictEqual(await Readable.from([]).every(() => true), true); + assert.strictEqual(await Readable.from([]).find(() => true), undefined); } { // Some, find, and every work with an asynchronous stream and synchronous predicate - (async () => { - assert.strictEqual(await oneTo5Async().some((x) => x > 3), true); - assert.strictEqual(await oneTo5Async().every((x) => x > 3), false); - assert.strictEqual(await oneTo5Async().find((x) => x > 3), 4); - assert.strictEqual(await oneTo5Async().some((x) => x > 6), false); - assert.strictEqual(await oneTo5Async().every((x) => x < 6), true); - assert.strictEqual(await oneTo5Async().find((x) => x > 6), undefined); - })().then(common.mustCall()); + assert.strictEqual(await oneTo5Async().some((x) => x > 3), true); + assert.strictEqual(await oneTo5Async().every((x) => x > 3), false); + assert.strictEqual(await oneTo5Async().find((x) => x > 3), 4); + assert.strictEqual(await oneTo5Async().some((x) => x > 6), false); + assert.strictEqual(await oneTo5Async().every((x) => x < 6), true); + assert.strictEqual(await oneTo5Async().find((x) => x > 6), undefined); } { // Some, find, and every work on synchronous streams with an asynchronous predicate - (async () => { - assert.strictEqual(await oneTo5().some(async (x) => x > 3), true); - assert.strictEqual(await oneTo5().every(async (x) => x > 3), false); - assert.strictEqual(await oneTo5().find(async (x) => x > 3), 4); - assert.strictEqual(await oneTo5().some(async (x) => x > 6), false); - assert.strictEqual(await oneTo5().every(async (x) => x < 6), true); - assert.strictEqual(await oneTo5().find(async (x) => x > 6), undefined); - })().then(common.mustCall()); + assert.strictEqual(await oneTo5().some(async (x) => x > 3), true); + assert.strictEqual(await oneTo5().every(async (x) => x > 3), false); + assert.strictEqual(await oneTo5().find(async (x) => x > 3), 4); + assert.strictEqual(await oneTo5().some(async (x) => x > 6), false); + assert.strictEqual(await oneTo5().every(async (x) => x < 6), true); + assert.strictEqual(await oneTo5().find(async (x) => x > 6), undefined); } { // Some, find, and every work on asynchronous streams with an asynchronous predicate - (async () => { - assert.strictEqual(await oneTo5Async().some(async (x) => x > 3), true); - assert.strictEqual(await oneTo5Async().every(async (x) => x > 3), false); - assert.strictEqual(await oneTo5Async().find(async (x) => x > 3), 4); - assert.strictEqual(await oneTo5Async().some(async (x) => x > 6), false); - assert.strictEqual(await oneTo5Async().every(async (x) => x < 6), true); - assert.strictEqual(await oneTo5Async().find(async (x) => x > 6), undefined); - })().then(common.mustCall()); + assert.strictEqual(await oneTo5Async().some(async (x) => x > 3), true); + assert.strictEqual(await oneTo5Async().every(async (x) => x > 3), false); + assert.strictEqual(await oneTo5Async().find(async (x) => x > 3), 4); + assert.strictEqual(await oneTo5Async().some(async (x) => x > 6), false); + assert.strictEqual(await oneTo5Async().every(async (x) => x < 6), true); + assert.strictEqual(await oneTo5Async().find(async (x) => x > 6), undefined); } { @@ -74,8 +63,9 @@ function oneTo5Async() { await setTimeout(); assert.strictEqual(stream.destroyed, true); } - // Some, find, and every short circuit - (async () => { + + { + // Some, find, and every short circuit const someStream = oneTo5(); await someStream.some(common.mustCall((x) => x > 2, 3)); await checkDestroyed(someStream); @@ -92,10 +82,10 @@ function oneTo5Async() { await oneTo5().some(common.mustCall(() => false, 5)); await oneTo5().every(common.mustCall(() => true, 5)); await oneTo5().find(common.mustCall(() => false, 5)); - })().then(common.mustCall()); + } - // Some, find, and every short circuit async stream/predicate - (async () => { + { + // Some, find, and every short circuit async stream/predicate const someStream = oneTo5Async(); await someStream.some(common.mustCall(async (x) => x > 2, 3)); await checkDestroyed(someStream); @@ -112,21 +102,19 @@ function oneTo5Async() { await oneTo5Async().some(common.mustCall(async () => false, 5)); await oneTo5Async().every(common.mustCall(async () => true, 5)); await oneTo5Async().find(common.mustCall(async () => false, 5)); - })().then(common.mustCall()); + } } { // Concurrency doesn't affect which value is found. - (async () => { - const found = await Readable.from([1, 2]).find(async (val) => { - if (val === 1) { - // eslint-disable-next-line no-restricted-syntax - await setTimeout(100); - } - return true; - }, { concurrency: 2 }); - assert.strictEqual(found, 1); - })().then(common.mustCall()); + const found = await Readable.from([1, 2]).find(async (val) => { + if (val === 1) { + // eslint-disable-next-line no-restricted-syntax + await setTimeout(100); + } + return true; + }, { concurrency: 2 }); + assert.strictEqual(found, 1); } { @@ -174,3 +162,13 @@ function oneTo5Async() { }, /ERR_INVALID_ARG_TYPE/, `${op} should throw for invalid signal`).then(common.mustCall()); } } +{ + for (const op of ['some', 'every', 'find']) { + const stream = oneTo5(); + Object.defineProperty(stream, 'map', { + value: common.mustNotCall(() => {}), + }); + // Check that map isn't getting called. + stream[op](() => {}); + } +}