Skip to content

Commit 60e28ba

Browse files
benjamingrdanielleadams
authored andcommitted
stream: add asIndexedPairs
Add the asIndexedPairs method for readable streams. PR-URL: #41681 Refs: https://github.com/tc39/proposal-iterator-helpers#asindexedpairs Reviewed-By: Robert Nagy <[email protected]> Reviewed-By: Matteo Collina <[email protected]>
1 parent 11ec334 commit 60e28ba

File tree

3 files changed

+82
-0
lines changed

3 files changed

+82
-0
lines changed

doc/api/stream.md

+24
Original file line numberDiff line numberDiff line change
@@ -2065,6 +2065,30 @@ import { Readable } from 'stream';
20652065
await Readable.from([1, 2, 3, 4]).take(2).toArray(); // [1, 2]
20662066
```
20672067

2068+
### `readable.asIndexedPairs([options])`
2069+
2070+
<!-- YAML
2071+
added: REPLACEME
2072+
-->
2073+
2074+
> Stability: 1 - Experimental
2075+
2076+
* `options` {Object}
2077+
* `signal` {AbortSignal} allows destroying the stream if the signal is
2078+
aborted.
2079+
* Returns: {Readable} a stream of indexed pairs.
2080+
2081+
This method returns a new stream with chunks of the underlying stream paired
2082+
with a counter in the form `[index, chunk]`. The first index value is 0 and it
2083+
increases by 1 for each chunk produced.
2084+
2085+
```mjs
2086+
import { Readable } from 'stream';
2087+
2088+
const pairs = await Readable.from(['a', 'b', 'c']).asIndexedPairs().toArray();
2089+
console.log(pairs); // [[0, 'a'], [1, 'b'], [2, 'c']]
2090+
```
2091+
20682092
### Duplex and transform streams
20692093

20702094
#### Class: `stream.Duplex`

lib/internal/streams/operators.js

+11
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,16 @@ async function * map(fn, options) {
158158
}
159159
}
160160

161+
async function* asIndexedPairs(options) {
162+
let index = 0;
163+
for await (const val of this) {
164+
if (options?.signal?.aborted) {
165+
throw new AbortError({ cause: options.signal.reason });
166+
}
167+
yield [index++, val];
168+
}
169+
}
170+
161171
async function some(fn, options) {
162172
// https://tc39.es/proposal-iterator-helpers/#sec-iteratorprototype.some
163173
// Note that some does short circuit but also closes the iterator if it does
@@ -290,6 +300,7 @@ function take(number, options) {
290300
}
291301

292302
module.exports.streamReturningOperators = {
303+
asIndexedPairs,
293304
drop,
294305
filter,
295306
flatMap,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
import '../common/index.mjs';
2+
import { Readable } from 'stream';
3+
import { deepStrictEqual, rejects } from 'assert';
4+
5+
{
6+
// asIndexedPairs with a synchronous stream
7+
const pairs = await Readable.from([1, 2, 3]).asIndexedPairs().toArray();
8+
deepStrictEqual(pairs, [[0, 1], [1, 2], [2, 3]]);
9+
const empty = await Readable.from([]).asIndexedPairs().toArray();
10+
deepStrictEqual(empty, []);
11+
}
12+
13+
{
14+
// asIndexedPairs works an asynchronous streams
15+
const asyncFrom = (...args) => Readable.from(...args).map(async (x) => x);
16+
const pairs = await asyncFrom([1, 2, 3]).asIndexedPairs().toArray();
17+
deepStrictEqual(pairs, [[0, 1], [1, 2], [2, 3]]);
18+
const empty = await asyncFrom([]).asIndexedPairs().toArray();
19+
deepStrictEqual(empty, []);
20+
}
21+
22+
{
23+
// Does not enumerate an infinite stream
24+
const infinite = () => Readable.from(async function* () {
25+
while (true) yield 1;
26+
}());
27+
const pairs = await infinite().asIndexedPairs().take(3).toArray();
28+
deepStrictEqual(pairs, [[0, 1], [1, 1], [2, 1]]);
29+
const empty = await infinite().asIndexedPairs().take(0).toArray();
30+
deepStrictEqual(empty, []);
31+
}
32+
33+
{
34+
// AbortSignal
35+
await rejects(async () => {
36+
const ac = new AbortController();
37+
const { signal } = ac;
38+
const p = Readable.from([1, 2, 3]).asIndexedPairs({ signal }).toArray();
39+
ac.abort();
40+
await p;
41+
}, { name: 'AbortError' });
42+
43+
await rejects(async () => {
44+
const signal = AbortSignal.abort();
45+
await Readable.from([1, 2, 3]).asIndexedPairs({ signal }).toArray();
46+
}, /AbortError/);
47+
}

0 commit comments

Comments
 (0)