Skip to content

Commit 7904fb0

Browse files
committed
stream: duplexify
1 parent ab03ab4 commit 7904fb0

File tree

6 files changed

+309
-119
lines changed

6 files changed

+309
-119
lines changed

lib/internal/streams/compose.js

+5-114
Original file line numberDiff line numberDiff line change
@@ -2,69 +2,39 @@
22

33
const pipeline = require('internal/streams/pipeline');
44
const Duplex = require('internal/streams/duplex');
5-
const { createDeferredPromise } = require('internal/util');
65
const { destroyer } = require('internal/streams/destroy');
7-
const from = require('internal/streams/from');
86
const {
97
isNodeStream,
10-
isIterable,
118
isReadable,
129
isWritable,
1310
} = require('internal/streams/utils');
14-
const {
15-
PromiseResolve,
16-
} = primordials;
11+
const duplexify = require('internal/streams/duplexify');
1712
const {
1813
AbortError,
1914
codes: {
20-
ERR_INVALID_ARG_TYPE,
2115
ERR_INVALID_ARG_VALUE,
22-
ERR_INVALID_RETURN_VALUE,
2316
ERR_MISSING_ARGS,
2417
},
2518
} = require('internal/errors');
26-
const assert = require('internal/assert');
27-
28-
// This is needed for pre node 17.
29-
class ComposeDuplex extends Duplex {
30-
constructor(options) {
31-
super(options);
32-
33-
// https://github.com/nodejs/node/pull/34385
34-
35-
if (options?.readable === false) {
36-
this._readableState.readable = false;
37-
this._readableState.ended = true;
38-
this._readableState.endEmitted = true;
39-
}
40-
41-
if (options?.writable === false) {
42-
this._writableState.writable = false;
43-
this._writableState.ending = true;
44-
this._writableState.ended = true;
45-
this._writableState.finished = true;
46-
}
47-
}
48-
}
4919

5020
module.exports = function compose(...streams) {
5121
if (streams.length === 0) {
5222
throw new ERR_MISSING_ARGS('streams');
5323
}
5424

5525
if (streams.length === 1) {
56-
return makeDuplex(streams[0], 'streams[0]');
26+
return duplexify(streams[0], 'streams[0]');
5727
}
5828

5929
const orgStreams = [...streams];
6030

6131
if (typeof streams[0] === 'function') {
62-
streams[0] = makeDuplex(streams[0], 'streams[0]');
32+
streams[0] = duplexify(streams[0], 'streams[0]');
6333
}
6434

6535
if (typeof streams[streams.length - 1] === 'function') {
6636
const idx = streams.length - 1;
67-
streams[idx] = makeDuplex(streams[idx], `streams[${idx}]`);
37+
streams[idx] = duplexify(streams[idx], `streams[${idx}]`);
6838
}
6939

7040
for (let n = 0; n < streams.length; ++n) {
@@ -116,7 +86,7 @@ module.exports = function compose(...streams) {
11686
// TODO(ronag): Avoid double buffering.
11787
// Implement Writable/Readable/Duplex traits.
11888
// See, https://github.com/nodejs/node/pull/33515.
119-
d = new ComposeDuplex({
89+
d = new Duplex({
12090
highWaterMark: 1,
12191
writableObjectMode: !!head?.writableObjectMode,
12292
readableObjectMode: !!tail?.writableObjectMode,
@@ -203,82 +173,3 @@ module.exports = function compose(...streams) {
203173

204174
return d;
205175
};
206-
207-
function makeDuplex(stream, name) {
208-
let ret;
209-
if (typeof stream === 'function') {
210-
assert(stream.length > 0);
211-
212-
const { value, write, final } = fromAsyncGen(stream);
213-
214-
if (isIterable(value)) {
215-
ret = from(ComposeDuplex, value, {
216-
objectMode: true,
217-
highWaterMark: 1,
218-
write,
219-
final
220-
});
221-
} else if (typeof value?.then === 'function') {
222-
const promise = PromiseResolve(value)
223-
.then((val) => {
224-
if (val != null) {
225-
throw new ERR_INVALID_RETURN_VALUE('nully', name, val);
226-
}
227-
})
228-
.catch((err) => {
229-
destroyer(ret, err);
230-
});
231-
232-
ret = new ComposeDuplex({
233-
objectMode: true,
234-
highWaterMark: 1,
235-
readable: false,
236-
write,
237-
final(cb) {
238-
final(() => promise.then(cb, cb));
239-
}
240-
});
241-
} else {
242-
throw new ERR_INVALID_RETURN_VALUE(
243-
'Iterable, AsyncIterable or AsyncFunction', name, value);
244-
}
245-
} else if (isNodeStream(stream)) {
246-
ret = stream;
247-
} else if (isIterable(stream)) {
248-
ret = from(ComposeDuplex, stream, {
249-
objectMode: true,
250-
highWaterMark: 1,
251-
writable: false
252-
});
253-
} else {
254-
throw new ERR_INVALID_ARG_TYPE(
255-
name,
256-
['Stream', 'Iterable', 'AsyncIterable', 'Function'],
257-
stream)
258-
;
259-
}
260-
return ret;
261-
}
262-
263-
function fromAsyncGen(fn) {
264-
let { promise, resolve } = createDeferredPromise();
265-
const value = fn(async function*() {
266-
while (true) {
267-
const { chunk, done, cb } = await promise;
268-
process.nextTick(cb);
269-
if (done) return;
270-
yield chunk;
271-
({ promise, resolve } = createDeferredPromise());
272-
}
273-
}());
274-
275-
return {
276-
value,
277-
write(chunk, encoding, cb) {
278-
resolve({ chunk, done: false, cb });
279-
},
280-
final(cb) {
281-
resolve({ done: true, cb });
282-
}
283-
};
284-
}

0 commit comments

Comments
 (0)