Skip to content

Commit 24ed3ce

Browse files
benjamingrronag
andcommitted
stream: add drop and take
This adds the `drop` and `take` methods to readable streams allowing users easily drop and take items from the stream. This continues the iterator-helper proposal alignment task. Co-Authored-By: Robert Nagy <[email protected]>
1 parent ca48949 commit 24ed3ce

File tree

3 files changed

+195
-0
lines changed

3 files changed

+195
-0
lines changed

doc/api/stream.md

+44
Original file line numberDiff line numberDiff line change
@@ -2074,6 +2074,50 @@ for await (const result of concatResult) {
20742074
}
20752075
```
20762076

2077+
### `readable.drop(limit[, options])`
2078+
2079+
<!-- YAML
2080+
added: REPLACEME
2081+
-->
2082+
2083+
> Stability: 1 - Experimental
2084+
2085+
* `limit` {number} the number of chunks to drop from the readable.
2086+
* `options` {Object}
2087+
* `signal` {AbortSignal} allows destroying the stream if the signal is
2088+
aborted.
2089+
* Returns: {Readable} a stream with `limit` chunks dropped.
2090+
2091+
This method returns a new stream with the first `limit` chunks dropped.
2092+
2093+
```mjs
2094+
import { Readable } from 'stream';
2095+
2096+
await Readable.from([1, 2, 3, 4]).drop(2).toArray(); // [3, 4]
2097+
```
2098+
2099+
### `readable.take(limit[, options])`
2100+
2101+
<!-- YAML
2102+
added: REPLACEME
2103+
-->
2104+
2105+
> Stability: 1 - Experimental
2106+
2107+
* `limit` {number} the number of chunks to take from the readable.
2108+
* `options` {Object}
2109+
* `signal` {AbortSignal} allows destroying the stream if the signal is
2110+
aborted.
2111+
* Returns: {Readable} a stream with `limit` chunks dropped.
2112+
2113+
This method returns a new stream with the first `limit` chunks dropped.
2114+
2115+
```mjs
2116+
import { Readable } from 'stream';
2117+
2118+
await Readable.from([1, 2, 3, 4]).take(2).toArray(); // [1, 2]
2119+
```
2120+
20772121
### Duplex and transform streams
20782122

20792123
#### Class: `stream.Duplex`

lib/internal/streams/operators.js

+55
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ const { AbortController } = require('internal/abort_controller');
55
const {
66
codes: {
77
ERR_INVALID_ARG_TYPE,
8+
ERR_OUT_OF_RANGE,
89
},
910
AbortError,
1011
} = require('internal/errors');
@@ -14,6 +15,8 @@ const { kWeakHandler } = require('internal/event_target');
1415
const {
1516
ArrayPrototypePush,
1617
MathFloor,
18+
Number,
19+
NumberIsNaN,
1720
Promise,
1821
PromiseReject,
1922
PromisePrototypeCatch,
@@ -232,10 +235,62 @@ async function* flatMap(fn, options) {
232235
}
233236
}
234237

238+
function toIntegerOrInfinity(number) {
239+
// We coerce here to align with the spec
240+
// https://github.com/tc39/proposal-iterator-helpers/issues/169
241+
number = Number(number);
242+
if (NumberIsNaN(number)) {
243+
return 0;
244+
}
245+
if (number < 0) {
246+
throw new ERR_OUT_OF_RANGE('number', '>= 0', number);
247+
}
248+
return number;
249+
}
250+
251+
function drop(number, options) {
252+
number = toIntegerOrInfinity(number);
253+
return async function* drop() {
254+
if (options?.signal?.aborted) {
255+
throw new AbortError();
256+
}
257+
for await (const val of this) {
258+
if (options?.signal?.aborted) {
259+
throw new AbortError();
260+
}
261+
if (number-- <= 0) {
262+
yield val;
263+
}
264+
}
265+
}.call(this);
266+
}
267+
268+
269+
function take(number, options) {
270+
number = toIntegerOrInfinity(number);
271+
return async function* take() {
272+
if (options?.signal?.aborted) {
273+
throw new AbortError();
274+
}
275+
for await (const val of this) {
276+
if (options?.signal?.aborted) {
277+
throw new AbortError();
278+
}
279+
if (number-- > 0) {
280+
yield val;
281+
} else {
282+
return;
283+
}
284+
}
285+
}.call(this);
286+
}
287+
235288
module.exports.streamReturningOperators = {
289+
drop,
236290
filter,
237291
flatMap,
238292
map,
293+
take,
239294
};
240295

241296
module.exports.promiseReturningOperators = {
+96
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const {
5+
Readable,
6+
} = require('stream');
7+
const { deepStrictEqual, rejects, throws } = require('assert');
8+
9+
const { from } = Readable;
10+
11+
const fromAsync = (...args) => from(...args).map(async (x) => x);
12+
13+
const naturals = () => from(async function*() {
14+
let i = 1;
15+
while (true) {
16+
yield i++;
17+
}
18+
}());
19+
20+
{
21+
// Synchronous streams
22+
(async () => {
23+
deepStrictEqual(await from([1, 2, 3]).drop(2).toArray(), [3]);
24+
deepStrictEqual(await from([1, 2, 3]).take(1).toArray(), [1]);
25+
deepStrictEqual(await from([]).drop(2).toArray(), []);
26+
deepStrictEqual(await from([]).take(1).toArray(), []);
27+
deepStrictEqual(await from([1, 2, 3]).drop(1).take(1).toArray(), [2]);
28+
deepStrictEqual(await from([1, 2]).drop(0).toArray(), [1, 2]);
29+
deepStrictEqual(await from([1, 2]).take(0).toArray(), []);
30+
})().then(common.mustCall());
31+
// Asynchronous streams
32+
(async () => {
33+
deepStrictEqual(await fromAsync([1, 2, 3]).drop(2).toArray(), [3]);
34+
deepStrictEqual(await fromAsync([1, 2, 3]).take(1).toArray(), [1]);
35+
deepStrictEqual(await fromAsync([]).drop(2).toArray(), []);
36+
deepStrictEqual(await fromAsync([]).take(1).toArray(), []);
37+
deepStrictEqual(await fromAsync([1, 2, 3]).drop(1).take(1).toArray(), [2]);
38+
deepStrictEqual(await fromAsync([1, 2]).drop(0).toArray(), [1, 2]);
39+
deepStrictEqual(await fromAsync([1, 2]).take(0).toArray(), []);
40+
})().then(common.mustCall());
41+
// Infinite streams
42+
// Asynchronous streams
43+
(async () => {
44+
deepStrictEqual(await naturals().take(1).toArray(), [1]);
45+
deepStrictEqual(await naturals().drop(1).take(1).toArray(), [2]);
46+
const next10 = [11, 12, 13, 14, 15, 16, 17, 18, 19, 20];
47+
deepStrictEqual(await naturals().drop(10).take(10).toArray(), next10);
48+
deepStrictEqual(await naturals().take(5).take(1).toArray(), [1]);
49+
})().then(common.mustCall());
50+
}
51+
52+
{
53+
// Coercion
54+
(async () => {
55+
// The spec made me do this ^^
56+
deepStrictEqual(await naturals().take('cat').toArray(), []);
57+
deepStrictEqual(await naturals().take('2').toArray(), [1, 2]);
58+
deepStrictEqual(await naturals().take(true).toArray(), [1]);
59+
})().then(common.mustCall());
60+
}
61+
62+
{
63+
// Support for AbortSignal
64+
const ac = new AbortController();
65+
rejects(
66+
Readable.from([1, 2, 3]).take(1, { signal: ac.signal }).toArray(), {
67+
name: 'AbortError',
68+
}).then(common.mustCall());
69+
rejects(
70+
Readable.from([1, 2, 3]).drop(1, { signal: ac.signal }).toArray(), {
71+
name: 'AbortError',
72+
}).then(common.mustCall());
73+
ac.abort();
74+
}
75+
76+
{
77+
// Support for AbortSignal, already aborted
78+
const signal = AbortSignal.abort();
79+
rejects(
80+
Readable.from([1, 2, 3]).take(1, { signal }).toArray(), {
81+
name: 'AbortError',
82+
}).then(common.mustCall());
83+
}
84+
85+
{
86+
// Error cases
87+
const invalidArgs = [
88+
-1,
89+
-Infinity,
90+
-40,
91+
];
92+
93+
for (const example of invalidArgs) {
94+
throws(() => from([]).take(example).toArray(), /ERR_OUT_OF_RANGE/);
95+
}
96+
}

0 commit comments

Comments
 (0)