Skip to content

Commit 347b307

Browse files
committed
stream: use synchronous error validation & validate abort signal option
made sure top level methods aren't async/generators so that validation errors could be caught synchronously also added validation for the abort signal option
1 parent 253f934 commit 347b307

9 files changed

+174
-81
lines changed

lib/internal/streams/operators.js

+130-59
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,10 @@ const {
1010
},
1111
AbortError,
1212
} = require('internal/errors');
13-
const { validateInteger } = require('internal/validators');
13+
const {
14+
validateAbortSignal,
15+
validateInteger,
16+
} = require('internal/validators');
1417
const { kWeakHandler } = require('internal/event_target');
1518
const { finished } = require('internal/streams/end-of-stream');
1619

@@ -38,6 +41,10 @@ function map(fn, options) {
3841
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
3942
}
4043

44+
if (options?.signal != null) {
45+
validateAbortSignal(options.signal, 'options.signal');
46+
}
47+
4148
let concurrency = 1;
4249
if (options?.concurrency != null) {
4350
concurrency = MathFloor(options.concurrency);
@@ -161,17 +168,35 @@ function map(fn, options) {
161168
}.call(this);
162169
}
163170

164-
async function* asIndexedPairs(options) {
165-
let index = 0;
166-
for await (const val of this) {
167-
if (options?.signal?.aborted) {
168-
throw new AbortError({ cause: options.signal.reason });
169-
}
170-
yield [index++, val];
171+
function asIndexedPairs(options) {
172+
if (options != null && typeof options !== 'object') {
173+
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
171174
}
175+
176+
if (options?.signal != null) {
177+
validateAbortSignal(options.signal, 'options.signal');
178+
}
179+
180+
return async function* asIndexedPairs() {
181+
let index = 0;
182+
for await (const val of this) {
183+
if (options?.signal?.aborted) {
184+
throw new AbortError({ cause: options.signal.reason });
185+
}
186+
yield [index++, val];
187+
}
188+
}.call(this);
172189
}
173190

174-
async function some(fn, options) {
191+
function some(fn, options) {
192+
if (options != null && typeof options !== 'object') {
193+
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
194+
}
195+
196+
if (options?.signal != null) {
197+
validateAbortSignal(options.signal, 'options.signal');
198+
}
199+
175200
// https://tc39.es/proposal-iterator-helpers/#sec-iteratorprototype.some
176201
// Note that some does short circuit but also closes the iterator if it does
177202
const ac = new AbortController();
@@ -185,27 +210,32 @@ async function some(fn, options) {
185210
});
186211
}
187212
const mapped = this.map(fn, { ...options, signal: ac.signal });
188-
for await (const result of mapped) {
189-
if (result) {
190-
ac.abort();
191-
return true;
213+
return async function some() {
214+
for await (const result of mapped) {
215+
if (result) {
216+
ac.abort();
217+
return true;
218+
}
192219
}
193-
}
194-
return false;
220+
return false;
221+
}.call(this);
195222
}
196223

197-
async function every(fn, options) {
224+
function every(fn, options) {
198225
if (typeof fn !== 'function') {
199226
throw new ERR_INVALID_ARG_TYPE(
200227
'fn', ['Function', 'AsyncFunction'], fn);
201228
}
202229
// https://en.wikipedia.org/wiki/De_Morgan%27s_laws
203-
return !(await some.call(this, async (...args) => {
230+
const somePromise = some.call(this, async (...args) => {
204231
return !(await fn(...args));
205-
}, options));
232+
}, options);
233+
return async function every() {
234+
return !(await somePromise);
235+
}.call(this);
206236
}
207237

208-
async function forEach(fn, options) {
238+
function forEach(fn, options) {
209239
if (typeof fn !== 'function') {
210240
throw new ERR_INVALID_ARG_TYPE(
211241
'fn', ['Function', 'AsyncFunction'], fn);
@@ -214,8 +244,11 @@ async function forEach(fn, options) {
214244
await fn(value, options);
215245
return kEmpty;
216246
}
217-
// eslint-disable-next-line no-unused-vars
218-
for await (const unused of this.map(forEachFn, options));
247+
const values = this.map(forEachFn, options);
248+
return async function forEach() {
249+
// eslint-disable-next-line no-unused-vars
250+
for await (const unused of values);
251+
}.call(this);
219252
}
220253

221254
function filter(fn, options) {
@@ -241,56 +274,78 @@ class ReduceAwareErrMissingArgs extends ERR_MISSING_ARGS {
241274
}
242275
}
243276

244-
async function reduce(reducer, initialValue, options) {
277+
function reduce(reducer, initialValue, options) {
245278
if (typeof reducer !== 'function') {
246279
throw new ERR_INVALID_ARG_TYPE(
247280
'reducer', ['Function', 'AsyncFunction'], reducer);
248281
}
249-
let hasInitialValue = arguments.length > 1;
250-
if (options?.signal?.aborted) {
251-
const err = new AbortError(undefined, { cause: options.signal.reason });
252-
this.once('error', () => {}); // The error is already propagated
253-
await finished(this.destroy(err));
254-
throw err;
282+
283+
if (options != null && typeof options !== 'object') {
284+
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
255285
}
256-
const ac = new AbortController();
257-
const signal = ac.signal;
258-
if (options?.signal) {
259-
const opts = { once: true, [kWeakHandler]: this };
260-
options.signal.addEventListener('abort', () => ac.abort(), opts);
286+
287+
if (options?.signal != null) {
288+
validateAbortSignal(options.signal, 'options.signal');
261289
}
262-
let gotAnyItemFromStream = false;
263-
try {
264-
for await (const value of this) {
265-
gotAnyItemFromStream = true;
266-
if (options?.signal?.aborted) {
267-
throw new AbortError();
290+
291+
let hasInitialValue = arguments.length > 1;
292+
293+
return async function reduce() {
294+
if (options?.signal?.aborted) {
295+
const err = new AbortError(undefined, { cause: options.signal.reason });
296+
this.once('error', () => {}); // The error is already propagated
297+
await finished(this.destroy(err));
298+
throw err;
299+
}
300+
const ac = new AbortController();
301+
const signal = ac.signal;
302+
if (options?.signal) {
303+
const opts = { once: true, [kWeakHandler]: this };
304+
options.signal.addEventListener('abort', () => ac.abort(), opts);
305+
}
306+
let gotAnyItemFromStream = false;
307+
try {
308+
for await (const value of this) {
309+
gotAnyItemFromStream = true;
310+
if (options?.signal?.aborted) {
311+
throw new AbortError();
312+
}
313+
if (!hasInitialValue) {
314+
initialValue = value;
315+
hasInitialValue = true;
316+
} else {
317+
initialValue = await reducer(initialValue, value, { signal });
318+
}
268319
}
269-
if (!hasInitialValue) {
270-
initialValue = value;
271-
hasInitialValue = true;
272-
} else {
273-
initialValue = await reducer(initialValue, value, { signal });
320+
if (!gotAnyItemFromStream && !hasInitialValue) {
321+
throw new ReduceAwareErrMissingArgs();
274322
}
323+
} finally {
324+
ac.abort();
275325
}
276-
if (!gotAnyItemFromStream && !hasInitialValue) {
277-
throw new ReduceAwareErrMissingArgs();
278-
}
279-
} finally {
280-
ac.abort();
281-
}
282-
return initialValue;
326+
return initialValue;
327+
}.call(this);
283328
}
284329

285-
async function toArray(options) {
286-
const result = [];
287-
for await (const val of this) {
288-
if (options?.signal?.aborted) {
289-
throw new AbortError(undefined, { cause: options.signal.reason });
290-
}
291-
ArrayPrototypePush(result, val);
330+
function toArray(options) {
331+
if (options != null && typeof options !== 'object') {
332+
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
333+
}
334+
335+
if (options?.signal != null) {
336+
validateAbortSignal(options.signal, 'options.signal');
292337
}
293-
return result;
338+
339+
return async function toArray() {
340+
const result = [];
341+
for await (const val of this) {
342+
if (options?.signal?.aborted) {
343+
throw new AbortError(undefined, { cause: options.signal.reason });
344+
}
345+
ArrayPrototypePush(result, val);
346+
}
347+
return result;
348+
}.call(this);
294349
}
295350

296351
function flatMap(fn, options) {
@@ -316,6 +371,14 @@ function toIntegerOrInfinity(number) {
316371
}
317372

318373
function drop(number, options) {
374+
if (options != null && typeof options !== 'object') {
375+
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
376+
}
377+
378+
if (options?.signal != null) {
379+
validateAbortSignal(options.signal, 'options.signal');
380+
}
381+
319382
number = toIntegerOrInfinity(number);
320383
return async function* drop() {
321384
if (options?.signal?.aborted) {
@@ -334,6 +397,14 @@ function drop(number, options) {
334397

335398

336399
function take(number, options) {
400+
if (options != null && typeof options !== 'object') {
401+
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
402+
}
403+
404+
if (options?.signal != null) {
405+
validateAbortSignal(options.signal, 'options.signal');
406+
}
407+
337408
number = toIntegerOrInfinity(number);
338409
return async function* take() {
339410
if (options?.signal?.aborted) {

test/parallel/test-stream-asIndexedPairs.mjs

+7-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import '../common/index.mjs';
22
import { Readable } from 'stream';
3-
import { deepStrictEqual, rejects } from 'assert';
3+
import { deepStrictEqual, rejects, throws } from 'assert';
44

55
{
66
// asIndexedPairs with a synchronous stream
@@ -45,3 +45,9 @@ import { deepStrictEqual, rejects } from 'assert';
4545
await Readable.from([1, 2, 3]).asIndexedPairs({ signal }).toArray();
4646
}, /AbortError/);
4747
}
48+
49+
{
50+
// Error cases
51+
throws(() => Readable.from([1]).asIndexedPairs(1), /ERR_INVALID_ARG_TYPE/);
52+
throws(() => Readable.from([1]).asIndexedPairs({ signal: true }), /ERR_INVALID_ARG_TYPE/);
53+
}

test/parallel/test-stream-drop-take.js

+6
Original file line numberDiff line numberDiff line change
@@ -93,4 +93,10 @@ const naturals = () => from(async function*() {
9393
for (const example of invalidArgs) {
9494
throws(() => from([]).take(example).toArray(), /ERR_OUT_OF_RANGE/);
9595
}
96+
97+
throws(() => Readable.from([1]).drop(1, 1), /ERR_INVALID_ARG_TYPE/);
98+
throws(() => Readable.from([1]).drop(1, { signal: true }), /ERR_INVALID_ARG_TYPE/);
99+
100+
throws(() => Readable.from([1]).take(1, 1), /ERR_INVALID_ARG_TYPE/);
101+
throws(() => Readable.from([1]).take(1, { signal: true }), /ERR_INVALID_ARG_TYPE/);
96102
}

test/parallel/test-stream-flatMap.js

+1
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ function oneTo5() {
114114
concurrency: 'Foo'
115115
}), /ERR_OUT_OF_RANGE/);
116116
assert.throws(() => Readable.from([1]).flatMap((x) => x, 1), /ERR_INVALID_ARG_TYPE/);
117+
assert.throws(() => Readable.from([1]).flatMap((x) => x, { signal: true }), /ERR_INVALID_ARG_TYPE/);
117118
}
118119
{
119120
// Test result is a Readable

test/parallel/test-stream-forEach.js

+6-11
Original file line numberDiff line numberDiff line change
@@ -67,17 +67,12 @@ const { setTimeout } = require('timers/promises');
6767

6868
{
6969
// Error cases
70-
assert.rejects(async () => {
71-
await Readable.from([1]).forEach(1);
72-
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
73-
assert.rejects(async () => {
74-
await Readable.from([1]).forEach((x) => x, {
75-
concurrency: 'Foo'
76-
});
77-
}, /ERR_OUT_OF_RANGE/).then(common.mustCall());
78-
assert.rejects(async () => {
79-
await Readable.from([1]).forEach((x) => x, 1);
80-
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
70+
assert.throws(() => Readable.from([1]).forEach(1), /ERR_INVALID_ARG_TYPE/);
71+
assert.throws(() => Readable.from([1]).forEach((x) => x, {
72+
concurrency: 'Foo'
73+
}), /ERR_OUT_OF_RANGE/);
74+
assert.throws(() => Readable.from([1]).forEach((x) => x, 1), /ERR_INVALID_ARG_TYPE/);
75+
assert.throws(() => Readable.from([1]).forEach((x) => x, { signal: true }), /ERR_INVALID_ARG_TYPE/);
8176
}
8277
{
8378
// Test result is a Promise

test/parallel/test-stream-map.js

+1
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,7 @@ const { setTimeout } = require('timers/promises');
180180
concurrency: 'Foo'
181181
}), /ERR_OUT_OF_RANGE/);
182182
assert.throws(() => Readable.from([1]).map((x) => x, 1), /ERR_INVALID_ARG_TYPE/);
183+
assert.throws(() => Readable.from([1]).map((x) => x, { signal: true }), /ERR_INVALID_ARG_TYPE/);
183184
}
184185
{
185186
// Test result is a Readable

test/parallel/test-stream-reduce.js

+5-2
Original file line numberDiff line numberDiff line change
@@ -119,8 +119,11 @@ function sum(p, c) {
119119

120120
{
121121
// Error cases
122-
assert.rejects(() => Readable.from([]).reduce(1), /TypeError/);
123-
assert.rejects(() => Readable.from([]).reduce('5'), /TypeError/);
122+
assert.throws(() => Readable.from([]).reduce(1), /TypeError/);
123+
assert.throws(() => Readable.from([]).reduce('5'), /TypeError/);
124+
125+
assert.throws(() => Readable.from([]).reduce((x, y) => x + y, 0, 1), /ERR_INVALID_ARG_TYPE/);
126+
assert.throws(() => Readable.from([]).reduce((x, y) => x + y, 0, { signal: true }), /ERR_INVALID_ARG_TYPE/);
124127
}
125128

126129
{

test/parallel/test-stream-some-every.js

+13-8
Original file line numberDiff line numberDiff line change
@@ -84,12 +84,17 @@ function oneTo5Async() {
8484
}
8585
{
8686
// Error cases
87-
assert.rejects(async () => {
88-
await Readable.from([1]).every(1);
89-
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
90-
assert.rejects(async () => {
91-
await Readable.from([1]).every((x) => x, {
92-
concurrency: 'Foo'
93-
});
94-
}, /ERR_OUT_OF_RANGE/).then(common.mustCall());
87+
assert.throws(() => Readable.from([1]).some(1), /ERR_INVALID_ARG_TYPE/);
88+
assert.throws(() => Readable.from([1]).some((x) => x, {
89+
concurrency: 'Foo'
90+
}), /ERR_OUT_OF_RANGE/);
91+
assert.throws(() => Readable.from([1]).some((x) => x, 1), /ERR_INVALID_ARG_TYPE/);
92+
assert.throws(() => Readable.from([1]).some((x) => x, { signal: true }), /ERR_INVALID_ARG_TYPE/);
93+
94+
assert.throws(() => Readable.from([1]).every(1), /ERR_INVALID_ARG_TYPE/);
95+
assert.throws(() => Readable.from([1]).every((x) => x, {
96+
concurrency: 'Foo'
97+
}), /ERR_OUT_OF_RANGE/);
98+
assert.throws(() => Readable.from([1]).every((x) => x, 1), /ERR_INVALID_ARG_TYPE/);
99+
assert.throws(() => Readable.from([1]).every((x) => x, { signal: true }), /ERR_INVALID_ARG_TYPE/);
95100
}

test/parallel/test-stream-toArray.js

+5
Original file line numberDiff line numberDiff line change
@@ -79,3 +79,8 @@ const assert = require('assert');
7979
const result = Readable.from([1, 2, 3, 4, 5]).toArray();
8080
assert.strictEqual(result instanceof Promise, true);
8181
}
82+
{
83+
// Error cases
84+
assert.throws(() => Readable.from([1]).toArray(1), /ERR_INVALID_ARG_TYPE/);
85+
assert.throws(() => Readable.from([1]).toArray({ signal: true }), /ERR_INVALID_ARG_TYPE/);
86+
}

0 commit comments

Comments
 (0)