@@ -1696,7 +1696,8 @@ added: v16.14.0
1696
1696
1697
1697
> Stability: 1 - Experimental
1698
1698
1699
- * ` fn ` {Function|AsyncFunction} a function to map over every item in the stream.
1699
+ * ` fn ` {Function|AsyncFunction} a function to map over every chunk in the
1700
+ stream.
1700
1701
* ` data ` {any} a chunk of data from the stream.
1701
1702
* ` options ` {Object}
1702
1703
* ` signal ` {AbortSignal} aborted if the stream is destroyed allowing to
@@ -1709,16 +1710,16 @@ added: v16.14.0
1709
1710
* Returns: {Readable} a stream mapped with the function ` fn ` .
1710
1711
1711
1712
This method allows mapping over the stream. The ` fn ` function will be called
1712
- for every item in the stream. If the ` fn ` function returns a promise - that
1713
+ for every chunk in the stream. If the ` fn ` function returns a promise - that
1713
1714
promise will be ` await ` ed before being passed to the result stream.
1714
1715
1715
1716
``` mjs
1716
1717
import { Readable } from ' stream' ;
1717
1718
import { Resolver } from ' dns/promises' ;
1718
1719
1719
1720
// With a synchronous mapper.
1720
- for await (const item of Readable .from ([1 , 2 , 3 , 4 ]).map ((x ) => x * 2 )) {
1721
- console .log (item ); // 2, 4, 6, 8
1721
+ for await (const chunk of Readable .from ([1 , 2 , 3 , 4 ]).map ((x ) => x * 2 )) {
1722
+ console .log (chunk ); // 2, 4, 6, 8
1722
1723
}
1723
1724
// With an asynchronous mapper, making at most 2 queries at a time.
1724
1725
const resolver = new Resolver ();
@@ -1740,7 +1741,7 @@ added: v16.14.0
1740
1741
1741
1742
> Stability: 1 - Experimental
1742
1743
1743
- * ` fn ` {Function|AsyncFunction} a function to filter items from stream.
1744
+ * ` fn ` {Function|AsyncFunction} a function to filter chunks from the stream.
1744
1745
* ` data ` {any} a chunk of data from the stream.
1745
1746
* ` options ` {Object}
1746
1747
* ` signal ` {AbortSignal} aborted if the stream is destroyed allowing to
@@ -1752,8 +1753,8 @@ added: v16.14.0
1752
1753
aborted.
1753
1754
* Returns: {Readable} a stream filtered with the predicate ` fn ` .
1754
1755
1755
- This method allows filtering the stream. For each item in the stream the ` fn `
1756
- function will be called and if it returns a truthy value, the item will be
1756
+ This method allows filtering the stream. For each chunk in the stream the ` fn `
1757
+ function will be called and if it returns a truthy value, the chunk will be
1757
1758
passed to the result stream. If the ` fn ` function returns a promise - that
1758
1759
promise will be ` await ` ed.
1759
1760
@@ -1762,8 +1763,8 @@ import { Readable } from 'stream';
1762
1763
import { Resolver } from ' dns/promises' ;
1763
1764
1764
1765
// With a synchronous predicate.
1765
- for await (const item of Readable .from ([1 , 2 , 3 , 4 ]).filter ((x ) => x > 2 )) {
1766
- console .log (item ); // 3, 4
1766
+ for await (const chunk of Readable .from ([1 , 2 , 3 , 4 ]).filter ((x ) => x > 2 )) {
1767
+ console .log (chunk ); // 3, 4
1767
1768
}
1768
1769
// With an asynchronous predicate, making at most 2 queries at a time.
1769
1770
const resolver = new Resolver ();
@@ -1789,7 +1790,7 @@ added: v16.15.0
1789
1790
1790
1791
> Stability: 1 - Experimental
1791
1792
1792
- * ` fn ` {Function|AsyncFunction} a function to call on each item of the stream.
1793
+ * ` fn ` {Function|AsyncFunction} a function to call on each chunk of the stream.
1793
1794
* ` data ` {any} a chunk of data from the stream.
1794
1795
* ` options ` {Object}
1795
1796
* ` signal ` {AbortSignal} aborted if the stream is destroyed allowing to
@@ -1801,12 +1802,12 @@ added: v16.15.0
1801
1802
aborted.
1802
1803
* Returns: {Promise} a promise for when the stream has finished.
1803
1804
1804
- This method allows iterating a stream. For each item in the stream the
1805
+ This method allows iterating a stream. For each chunk in the stream the
1805
1806
` fn ` function will be called. If the ` fn ` function returns a promise - that
1806
1807
promise will be ` await ` ed.
1807
1808
1808
1809
This method is different from ` for await...of ` loops in that it can optionally
1809
- process items concurrently. In addition, a ` forEach ` iteration can only be
1810
+ process chunks concurrently. In addition, a ` forEach ` iteration can only be
1810
1811
stopped by having passed a ` signal ` option and aborting the related
1811
1812
` AbortController ` while ` for await...of ` can be stopped with ` break ` or
1812
1813
` return ` . In either case the stream will be destroyed.
@@ -1820,8 +1821,8 @@ import { Readable } from 'stream';
1820
1821
import { Resolver } from ' dns/promises' ;
1821
1822
1822
1823
// With a synchronous predicate.
1823
- for await (const item of Readable .from ([1 , 2 , 3 , 4 ]).filter ((x ) => x > 2 )) {
1824
- console .log (item ); // 3, 4
1824
+ for await (const chunk of Readable .from ([1 , 2 , 3 , 4 ]).filter ((x ) => x > 2 )) {
1825
+ console .log (chunk ); // 3, 4
1825
1826
}
1826
1827
// With an asynchronous predicate, making at most 2 queries at a time.
1827
1828
const resolver = new Resolver ();
@@ -1886,7 +1887,7 @@ added: v16.15.0
1886
1887
1887
1888
> Stability: 1 - Experimental
1888
1889
1889
- * ` fn ` {Function|AsyncFunction} a function to call on each item of the stream.
1890
+ * ` fn ` {Function|AsyncFunction} a function to call on each chunk of the stream.
1890
1891
* ` data ` {any} a chunk of data from the stream.
1891
1892
* ` options ` {Object}
1892
1893
* ` signal ` {AbortSignal} aborted if the stream is destroyed allowing to
@@ -1927,6 +1928,56 @@ console.log(anyBigFile); // `true` if any file in the list is bigger than 1MB
1927
1928
console .log (' done' ); // Stream has finished
1928
1929
```
1929
1930
1931
+ ##### ` readable.find(fn[, options]) `
1932
+
1933
+ <!-- YAML
1934
+ added: REPLACEME
1935
+ -->
1936
+
1937
+ > Stability: 1 - Experimental
1938
+
1939
+ * ` fn ` {Function|AsyncFunction} a function to call on each chunk of the stream.
1940
+ * ` data ` {any} a chunk of data from the stream.
1941
+ * ` options ` {Object}
1942
+ * ` signal ` {AbortSignal} aborted if the stream is destroyed allowing to
1943
+ abort the ` fn ` call early.
1944
+ * ` options ` {Object}
1945
+ * ` concurrency ` {number} the maximum concurrent invocation of ` fn ` to call
1946
+ on the stream at once. ** Default:** ` 1 ` .
1947
+ * ` signal ` {AbortSignal} allows destroying the stream if the signal is
1948
+ aborted.
1949
+ * Returns: {Promise} a promise evaluating to the first chunk for which ` fn `
1950
+ evaluated with a truthy value, or ` undefined ` if no element was found.
1951
+
1952
+ This method is similar to ` Array.prototype.find ` and calls ` fn ` on each chunk
1953
+ in the stream to find a chunk with a truthy value for ` fn ` . Once an ` fn ` call's
1954
+ awaited return value is truthy, the stream is destroyed and the promise is
1955
+ fulfilled with value for which ` fn ` returned a truthy value. If all of the
1956
+ ` fn ` calls on the chunks return a falsy value, the promise is fulfilled with
1957
+ ` undefined ` .
1958
+
1959
+ ``` mjs
1960
+ import { Readable } from ' stream' ;
1961
+ import { stat } from ' fs/promises' ;
1962
+
1963
+ // With a synchronous predicate.
1964
+ await Readable .from ([1 , 2 , 3 , 4 ]).find ((x ) => x > 2 ); // 3
1965
+ await Readable .from ([1 , 2 , 3 , 4 ]).find ((x ) => x > 0 ); // 1
1966
+ await Readable .from ([1 , 2 , 3 , 4 ]).find ((x ) => x > 10 ); // undefined
1967
+
1968
+ // With an asynchronous predicate, making at most 2 file checks at a time.
1969
+ const foundBigFile = await Readable .from ([
1970
+ ' file1' ,
1971
+ ' file2' ,
1972
+ ' file3' ,
1973
+ ]).find (async (fileName ) => {
1974
+ const stats = await stat (fileName);
1975
+ return stat .size > 1024 * 1024 ;
1976
+ }, { concurrency: 2 });
1977
+ console .log (foundBigFile); // File name of large file, if any file in the list is bigger than 1MB
1978
+ console .log (' done' ); // Stream has finished
1979
+ ```
1980
+
1930
1981
##### ` readable.every(fn[, options]) `
1931
1982
1932
1983
<!-- YAML
@@ -1935,7 +1986,7 @@ added: v16.15.0
1935
1986
1936
1987
> Stability: 1 - Experimental
1937
1988
1938
- * ` fn ` {Function|AsyncFunction} a function to call on each item of the stream.
1989
+ * ` fn ` {Function|AsyncFunction} a function to call on each chunk of the stream.
1939
1990
* ` data ` {any} a chunk of data from the stream.
1940
1991
* ` options ` {Object}
1941
1992
* ` signal ` {AbortSignal} aborted if the stream is destroyed allowing to
@@ -1985,7 +2036,7 @@ added: v16.15.0
1985
2036
> Stability: 1 - Experimental
1986
2037
1987
2038
* ` fn ` {Function|AsyncGeneratorFunction|AsyncFunction} a function to map over
1988
- every item in the stream.
2039
+ every chunk in the stream.
1989
2040
* ` data ` {any} a chunk of data from the stream.
1990
2041
* ` options ` {Object}
1991
2042
* ` signal ` {AbortSignal} aborted if the stream is destroyed allowing to
@@ -2009,8 +2060,8 @@ import { Readable } from 'stream';
2009
2060
import { createReadStream } from ' fs' ;
2010
2061
2011
2062
// With a synchronous mapper.
2012
- for await (const item of Readable .from ([1 , 2 , 3 , 4 ]).flatMap ((x ) => [x, x])) {
2013
- console .log (item ); // 1, 1, 2, 2, 3, 3, 4, 4
2063
+ for await (const chunk of Readable .from ([1 , 2 , 3 , 4 ]).flatMap ((x ) => [x, x])) {
2064
+ console .log (chunk ); // 1, 1, 2, 2, 3, 3, 4, 4
2014
2065
}
2015
2066
// With an asynchronous mapper, combine the contents of 4 files
2016
2067
const concatResult = Readable .from ([
0 commit comments