Skip to content

Commit 68cde4c

Browse files
debadree25RafaelGSS
authored andcommitted
lib: add webstreams to Duplex.from()
Refs: #39519 PR-URL: #46190 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: James M Snell <[email protected]> Reviewed-By: Robert Nagy <[email protected]>
1 parent 2f23f17 commit 68cde4c

File tree

2 files changed

+125
-15
lines changed

2 files changed

+125
-15
lines changed

lib/internal/streams/duplexify.js

+23-15
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ const {
2020
const { destroyer } = require('internal/streams/destroy');
2121
const Duplex = require('internal/streams/duplex');
2222
const Readable = require('internal/streams/readable');
23+
const Writable = require('internal/streams/writable');
2324
const { createDeferredPromise } = require('internal/util');
2425
const from = require('internal/streams/from');
2526

@@ -32,6 +33,16 @@ const {
3233
FunctionPrototypeCall
3334
} = primordials;
3435

36+
37+
const {
38+
isBrandCheck,
39+
} = require('internal/webstreams/util');
40+
41+
const isReadableStream =
42+
isBrandCheck('ReadableStream');
43+
const isWritableStream =
44+
isBrandCheck('WritableStream');
45+
3546
// This is needed for pre node 17.
3647
class Duplexify extends Duplex {
3748
constructor(options) {
@@ -71,15 +82,13 @@ module.exports = function duplexify(body, name) {
7182
return _duplexify({ writable: false, readable: false });
7283
}
7384

74-
// TODO: Webstreams
75-
// if (isReadableStream(body)) {
76-
// return _duplexify({ readable: Readable.fromWeb(body) });
77-
// }
85+
if (isReadableStream(body)) {
86+
return _duplexify({ readable: Readable.fromWeb(body) });
87+
}
7888

79-
// TODO: Webstreams
80-
// if (isWritableStream(body)) {
81-
// return _duplexify({ writable: Writable.fromWeb(body) });
82-
// }
89+
if (isWritableStream(body)) {
90+
return _duplexify({ writable: Writable.fromWeb(body) });
91+
}
8392

8493
if (typeof body === 'function') {
8594
const { value, write, final, destroy } = fromAsyncGen(body);
@@ -146,13 +155,12 @@ module.exports = function duplexify(body, name) {
146155
});
147156
}
148157

149-
// TODO: Webstreams.
150-
// if (
151-
// isReadableStream(body?.readable) &&
152-
// isWritableStream(body?.writable)
153-
// ) {
154-
// return Duplexify.fromWeb(body);
155-
// }
158+
if (
159+
isReadableStream(body?.readable) &&
160+
isWritableStream(body?.writable)
161+
) {
162+
return Duplexify.fromWeb(body);
163+
}
156164

157165
if (
158166
typeof body?.writable === 'object' ||

test/parallel/test-stream-duplex-from.js

+102
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
const common = require('../common');
44
const assert = require('assert');
55
const { Duplex, Readable, Writable, pipeline, PassThrough } = require('stream');
6+
const { ReadableStream, WritableStream } = require('stream/web');
67
const { Blob } = require('buffer');
78

89
{
@@ -299,3 +300,104 @@ const { Blob } = require('buffer');
299300
assert.strictEqual(res, 'foobar');
300301
})).on('close', common.mustCall());
301302
}
303+
304+
function makeATestReadableStream(value) {
305+
return new ReadableStream({
306+
start(controller) {
307+
controller.enqueue(value);
308+
controller.close();
309+
}
310+
});
311+
}
312+
313+
function makeATestWritableStream(writeFunc) {
314+
return new WritableStream({
315+
write(chunk) {
316+
writeFunc(chunk);
317+
}
318+
});
319+
}
320+
321+
{
322+
const d = Duplex.from({
323+
readable: makeATestReadableStream('foo'),
324+
});
325+
assert.strictEqual(d.readable, true);
326+
assert.strictEqual(d.writable, false);
327+
328+
d.on('data', common.mustCall((data) => {
329+
assert.strictEqual(data.toString(), 'foo');
330+
}));
331+
332+
d.on('end', common.mustCall(() => {
333+
assert.strictEqual(d.readable, false);
334+
}));
335+
}
336+
337+
{
338+
const d = Duplex.from(makeATestReadableStream('foo'));
339+
340+
assert.strictEqual(d.readable, true);
341+
assert.strictEqual(d.writable, false);
342+
343+
d.on('data', common.mustCall((data) => {
344+
assert.strictEqual(data.toString(), 'foo');
345+
}));
346+
347+
d.on('end', common.mustCall(() => {
348+
assert.strictEqual(d.readable, false);
349+
}));
350+
}
351+
352+
{
353+
let ret = '';
354+
const d = Duplex.from({
355+
writable: makeATestWritableStream((chunk) => ret += chunk),
356+
});
357+
358+
assert.strictEqual(d.readable, false);
359+
assert.strictEqual(d.writable, true);
360+
361+
d.end('foo');
362+
d.on('finish', common.mustCall(() => {
363+
assert.strictEqual(ret, 'foo');
364+
assert.strictEqual(d.writable, false);
365+
}));
366+
}
367+
368+
{
369+
let ret = '';
370+
const d = Duplex.from(makeATestWritableStream((chunk) => ret += chunk));
371+
372+
assert.strictEqual(d.readable, false);
373+
assert.strictEqual(d.writable, true);
374+
375+
d.end('foo');
376+
d.on('finish', common.mustCall(() => {
377+
assert.strictEqual(ret, 'foo');
378+
assert.strictEqual(d.writable, false);
379+
}));
380+
}
381+
382+
{
383+
let ret = '';
384+
const d = Duplex.from({
385+
readable: makeATestReadableStream('foo'),
386+
writable: makeATestWritableStream((chunk) => ret += chunk),
387+
});
388+
389+
d.end('bar');
390+
391+
d.on('data', common.mustCall((data) => {
392+
assert.strictEqual(data.toString(), 'foo');
393+
}));
394+
395+
d.on('end', common.mustCall(() => {
396+
assert.strictEqual(d.readable, false);
397+
}));
398+
399+
d.on('finish', common.mustCall(() => {
400+
assert.strictEqual(ret, 'bar');
401+
assert.strictEqual(d.writable, false);
402+
}));
403+
}

0 commit comments

Comments
 (0)