Skip to content

Commit dd724f6

Browse files
committed
stream: duplexify
1 parent ab03ab4 commit dd724f6

File tree

7 files changed

+367
-132
lines changed

7 files changed

+367
-132
lines changed

lib/internal/streams/compose.js

+5-115
Original file line numberDiff line numberDiff line change
@@ -2,69 +2,38 @@
22

33
const pipeline = require('internal/streams/pipeline');
44
const Duplex = require('internal/streams/duplex');
5-
const { createDeferredPromise } = require('internal/util');
65
const { destroyer } = require('internal/streams/destroy');
7-
const from = require('internal/streams/from');
86
const {
97
isNodeStream,
10-
isIterable,
118
isReadable,
129
isWritable,
1310
} = require('internal/streams/utils');
14-
const {
15-
PromiseResolve,
16-
} = primordials;
1711
const {
1812
AbortError,
1913
codes: {
20-
ERR_INVALID_ARG_TYPE,
2114
ERR_INVALID_ARG_VALUE,
22-
ERR_INVALID_RETURN_VALUE,
2315
ERR_MISSING_ARGS,
2416
},
2517
} = require('internal/errors');
26-
const assert = require('internal/assert');
27-
28-
// This is needed for pre node 17.
29-
class ComposeDuplex extends Duplex {
30-
constructor(options) {
31-
super(options);
32-
33-
// https://github.com/nodejs/node/pull/34385
34-
35-
if (options?.readable === false) {
36-
this._readableState.readable = false;
37-
this._readableState.ended = true;
38-
this._readableState.endEmitted = true;
39-
}
40-
41-
if (options?.writable === false) {
42-
this._writableState.writable = false;
43-
this._writableState.ending = true;
44-
this._writableState.ended = true;
45-
this._writableState.finished = true;
46-
}
47-
}
48-
}
4918

5019
module.exports = function compose(...streams) {
5120
if (streams.length === 0) {
5221
throw new ERR_MISSING_ARGS('streams');
5322
}
5423

5524
if (streams.length === 1) {
56-
return makeDuplex(streams[0], 'streams[0]');
25+
return Duplex.from(streams[0]);
5726
}
5827

5928
const orgStreams = [...streams];
6029

6130
if (typeof streams[0] === 'function') {
62-
streams[0] = makeDuplex(streams[0], 'streams[0]');
31+
streams[0] = Duplex.from(streams[0]);
6332
}
6433

6534
if (typeof streams[streams.length - 1] === 'function') {
6635
const idx = streams.length - 1;
67-
streams[idx] = makeDuplex(streams[idx], `streams[${idx}]`);
36+
streams[idx] = Duplex.from(streams[idx]);
6837
}
6938

7039
for (let n = 0; n < streams.length; ++n) {
@@ -116,8 +85,8 @@ module.exports = function compose(...streams) {
11685
// TODO(ronag): Avoid double buffering.
11786
// Implement Writable/Readable/Duplex traits.
11887
// See, https://github.com/nodejs/node/pull/33515.
119-
d = new ComposeDuplex({
120-
highWaterMark: 1,
88+
d = new Duplex({
89+
// TODO (ronag): highWaterMark?
12190
writableObjectMode: !!head?.writableObjectMode,
12291
readableObjectMode: !!tail?.writableObjectMode,
12392
writable,
@@ -203,82 +172,3 @@ module.exports = function compose(...streams) {
203172

204173
return d;
205174
};
206-
207-
function makeDuplex(stream, name) {
208-
let ret;
209-
if (typeof stream === 'function') {
210-
assert(stream.length > 0);
211-
212-
const { value, write, final } = fromAsyncGen(stream);
213-
214-
if (isIterable(value)) {
215-
ret = from(ComposeDuplex, value, {
216-
objectMode: true,
217-
highWaterMark: 1,
218-
write,
219-
final
220-
});
221-
} else if (typeof value?.then === 'function') {
222-
const promise = PromiseResolve(value)
223-
.then((val) => {
224-
if (val != null) {
225-
throw new ERR_INVALID_RETURN_VALUE('nully', name, val);
226-
}
227-
})
228-
.catch((err) => {
229-
destroyer(ret, err);
230-
});
231-
232-
ret = new ComposeDuplex({
233-
objectMode: true,
234-
highWaterMark: 1,
235-
readable: false,
236-
write,
237-
final(cb) {
238-
final(() => promise.then(cb, cb));
239-
}
240-
});
241-
} else {
242-
throw new ERR_INVALID_RETURN_VALUE(
243-
'Iterable, AsyncIterable or AsyncFunction', name, value);
244-
}
245-
} else if (isNodeStream(stream)) {
246-
ret = stream;
247-
} else if (isIterable(stream)) {
248-
ret = from(ComposeDuplex, stream, {
249-
objectMode: true,
250-
highWaterMark: 1,
251-
writable: false
252-
});
253-
} else {
254-
throw new ERR_INVALID_ARG_TYPE(
255-
name,
256-
['Stream', 'Iterable', 'AsyncIterable', 'Function'],
257-
stream)
258-
;
259-
}
260-
return ret;
261-
}
262-
263-
function fromAsyncGen(fn) {
264-
let { promise, resolve } = createDeferredPromise();
265-
const value = fn(async function*() {
266-
while (true) {
267-
const { chunk, done, cb } = await promise;
268-
process.nextTick(cb);
269-
if (done) return;
270-
yield chunk;
271-
({ promise, resolve } = createDeferredPromise());
272-
}
273-
}());
274-
275-
return {
276-
value,
277-
write(chunk, encoding, cb) {
278-
resolve({ chunk, done: false, cb });
279-
},
280-
final(cb) {
281-
resolve({ done: true, cb });
282-
}
283-
};
284-
}

lib/internal/streams/duplex.js

+9
Original file line numberDiff line numberDiff line change
@@ -133,3 +133,12 @@ Duplex.fromWeb = function(pair, options) {
133133
Duplex.toWeb = function(duplex) {
134134
return lazyWebStreams().newReadableWritablePairFromDuplex(duplex);
135135
};
136+
137+
let duplexify;
138+
139+
Duplex.from = function(body) {
140+
if (!duplexify) {
141+
duplexify = require('internal/streams/duplexify');
142+
}
143+
return duplexify(body, 'body');
144+
};

0 commit comments

Comments
 (0)