Skip to content

Commit 66f334a

Browse files
committed
stream: refactor streams
- Remove unnecessary scope. - Refactor to use more validators. - Avoid using deprecated APIs.
1 parent 224b78f commit 66f334a

File tree

3 files changed

+27
-43
lines changed

3 files changed

+27
-43
lines changed

lib/internal/streams/duplex.js

+4-6
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,10 @@ const Writable = require('internal/streams/writable');
4141
ObjectSetPrototypeOf(Duplex.prototype, Readable.prototype);
4242
ObjectSetPrototypeOf(Duplex, Readable);
4343

44-
{
45-
// Allow the keys array to be GC'ed.
46-
for (const method of ObjectKeys(Writable.prototype)) {
47-
if (!Duplex.prototype[method])
48-
Duplex.prototype[method] = Writable.prototype[method];
49-
}
44+
// Allow the keys array to be GC'ed.
45+
for (const method of ObjectKeys(Writable.prototype)) {
46+
if (!Duplex.prototype[method])
47+
Duplex.prototype[method] = Writable.prototype[method];
5048
}
5149

5250
function Duplex(options) {

lib/internal/streams/operators.js

+21-35
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,16 @@ const { AbortController } = require('internal/abort_controller');
44

55
const {
66
codes: {
7-
ERR_INVALID_ARG_TYPE,
87
ERR_MISSING_ARGS,
98
ERR_OUT_OF_RANGE,
109
},
1110
AbortError,
1211
} = require('internal/errors');
1312
const {
1413
validateAbortSignal,
14+
validateFunction,
1515
validateInteger,
16+
validateObject,
1617
} = require('internal/validators');
1718
const { kWeakHandler } = require('internal/event_target');
1819
const { finished } = require('internal/streams/end-of-stream');
@@ -32,12 +33,9 @@ const kEmpty = Symbol('kEmpty');
3233
const kEof = Symbol('kEof');
3334

3435
function map(fn, options) {
35-
if (typeof fn !== 'function') {
36-
throw new ERR_INVALID_ARG_TYPE(
37-
'fn', ['Function', 'AsyncFunction'], fn);
38-
}
39-
if (options != null && typeof options !== 'object') {
40-
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
36+
validateFunction(fn, 'fn');
37+
if (options != null) {
38+
validateObject(options, 'options');
4139
}
4240
if (options?.signal != null) {
4341
validateAbortSignal(options.signal, 'options.signal');
@@ -167,8 +165,8 @@ function map(fn, options) {
167165
}
168166

169167
function asIndexedPairs(options = undefined) {
170-
if (options != null && typeof options !== 'object') {
171-
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
168+
if (options != null) {
169+
validateObject(options, 'options');
172170
}
173171
if (options?.signal != null) {
174172
validateAbortSignal(options.signal, 'options.signal');
@@ -186,8 +184,8 @@ function asIndexedPairs(options = undefined) {
186184
}
187185

188186
async function some(fn, options) {
189-
if (options != null && typeof options !== 'object') {
190-
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
187+
if (options != null) {
188+
validateObject(options, 'options');
191189
}
192190
if (options?.signal != null) {
193191
validateAbortSignal(options.signal, 'options.signal');
@@ -216,21 +214,15 @@ async function some(fn, options) {
216214
}
217215

218216
async function every(fn, options) {
219-
if (typeof fn !== 'function') {
220-
throw new ERR_INVALID_ARG_TYPE(
221-
'fn', ['Function', 'AsyncFunction'], fn);
222-
}
217+
validateFunction(fn, 'fn');
223218
// https://en.wikipedia.org/wiki/De_Morgan%27s_laws
224219
return !(await some.call(this, async (...args) => {
225220
return !(await fn(...args));
226221
}, options));
227222
}
228223

229224
async function forEach(fn, options) {
230-
if (typeof fn !== 'function') {
231-
throw new ERR_INVALID_ARG_TYPE(
232-
'fn', ['Function', 'AsyncFunction'], fn);
233-
}
225+
validateFunction(fn, 'fn');
234226
async function forEachFn(value, options) {
235227
await fn(value, options);
236228
return kEmpty;
@@ -240,10 +232,7 @@ async function forEach(fn, options) {
240232
}
241233

242234
function filter(fn, options) {
243-
if (typeof fn !== 'function') {
244-
throw new ERR_INVALID_ARG_TYPE(
245-
'fn', ['Function', 'AsyncFunction'], fn);
246-
}
235+
validateFunction(fn, 'fn');
247236
async function filterFn(value, options) {
248237
if (await fn(value, options)) {
249238
return value;
@@ -263,12 +252,9 @@ class ReduceAwareErrMissingArgs extends ERR_MISSING_ARGS {
263252
}
264253

265254
async function reduce(reducer, initialValue, options) {
266-
if (typeof reducer !== 'function') {
267-
throw new ERR_INVALID_ARG_TYPE(
268-
'reducer', ['Function', 'AsyncFunction'], reducer);
269-
}
270-
if (options != null && typeof options !== 'object') {
271-
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
255+
validateFunction(reducer, 'reducer');
256+
if (options != null) {
257+
validateObject(options, 'options');
272258
}
273259
if (options?.signal != null) {
274260
validateAbortSignal(options.signal, 'options.signal');
@@ -311,8 +297,8 @@ async function reduce(reducer, initialValue, options) {
311297
}
312298

313299
async function toArray(options) {
314-
if (options != null && typeof options !== 'object') {
315-
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
300+
if (options != null) {
301+
validateObject(options, 'options');
316302
}
317303
if (options?.signal != null) {
318304
validateAbortSignal(options.signal, 'options.signal');
@@ -351,8 +337,8 @@ function toIntegerOrInfinity(number) {
351337
}
352338

353339
function drop(number, options = undefined) {
354-
if (options != null && typeof options !== 'object') {
355-
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
340+
if (options != null) {
341+
validateObject(options, 'options');
356342
}
357343
if (options?.signal != null) {
358344
validateAbortSignal(options.signal, 'options.signal');
@@ -375,8 +361,8 @@ function drop(number, options = undefined) {
375361
}
376362

377363
function take(number, options = undefined) {
378-
if (options != null && typeof options !== 'object') {
379-
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
364+
if (options != null) {
365+
validateObject(options, 'options');
380366
}
381367
if (options?.signal != null) {
382368
validateAbortSignal(options.signal, 'options.signal');

lib/internal/streams/readable.js

+2-2
Original file line numberDiff line numberDiff line change
@@ -787,7 +787,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
787787
debug('onerror', er);
788788
unpipe();
789789
dest.removeListener('error', onerror);
790-
if (EE.listenerCount(dest, 'error') === 0) {
790+
if (dest.listenerCount('error') === 0) {
791791
const s = dest._writableState || dest._readableState;
792792
if (s && !s.errorEmitted) {
793793
// User incorrectly emitted 'error' directly on the stream.
@@ -852,7 +852,7 @@ function pipeOnDrain(src, dest) {
852852
}
853853

854854
if ((!state.awaitDrainWriters || state.awaitDrainWriters.size === 0) &&
855-
EE.listenerCount(src, 'data')) {
855+
src.listenerCount('data')) {
856856
src.resume();
857857
}
858858
};

0 commit comments

Comments
 (0)