Skip to content

Commit 9933495

Browse files
committed
stream: add pipeline() for webstreams
Refs: nodejs#39316 PR-URL: nodejs#46307 Reviewed-By: Robert Nagy <[email protected]> Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Benjamin Gruenbaum <[email protected]>
1 parent 07ea797 commit 9933495

File tree

5 files changed

+500
-10
lines changed

5 files changed

+500
-10
lines changed

doc/api/stream.md

+8-4
Original file line numberDiff line numberDiff line change
@@ -2669,6 +2669,9 @@ const cleanup = finished(rs, (err) => {
26692669
<!-- YAML
26702670
added: v10.0.0
26712671
changes:
2672+
- version: REPLACEME
2673+
pr-url: https://github.com/nodejs/node/pull/46307
2674+
description: Added support for webstreams.
26722675
- version: v18.0.0
26732676
pr-url: https://github.com/nodejs/node/pull/41678
26742677
description: Passing an invalid callback to the `callback` argument
@@ -2685,13 +2688,14 @@ changes:
26852688
description: Add support for async generators.
26862689
-->
26872690

2688-
* `streams` {Stream\[]|Iterable\[]|AsyncIterable\[]|Function\[]}
2689-
* `source` {Stream|Iterable|AsyncIterable|Function}
2691+
* `streams` {Stream\[]|Iterable\[]|AsyncIterable\[]|Function\[]|
2692+
ReadableStream\[]|WritableStream\[]|TransformStream\[]}
2693+
* `source` {Stream|Iterable|AsyncIterable|Function|ReadableStream}
26902694
* Returns: {Iterable|AsyncIterable}
2691-
* `...transforms` {Stream|Function}
2695+
* `...transforms` {Stream|Function|TransformStream}
26922696
* `source` {AsyncIterable}
26932697
* Returns: {AsyncIterable}
2694-
* `destination` {Stream|Function}
2698+
* `destination` {Stream|Function|WritableStream}
26952699
* `source` {AsyncIterable}
26962700
* Returns: {AsyncIterable|Promise}
26972701
* `callback` {Function} Called when the pipeline is fully done.

lib/internal/streams/pipeline.js

+63-5
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ const {
3535
isReadable,
3636
isReadableNodeStream,
3737
isNodeStream,
38+
isTransformStream,
39+
isWebStream,
40+
isReadableStream,
3841
} = require('internal/streams/utils');
3942
const { AbortController } = require('internal/abort_controller');
4043

@@ -88,7 +91,7 @@ async function* fromReadable(val) {
8891
yield* Readable.prototype[SymbolAsyncIterator].call(val);
8992
}
9093

91-
async function pump(iterable, writable, finish, { end }) {
94+
async function pumpToNode(iterable, writable, finish, { end }) {
9295
let error;
9396
let onresolve = null;
9497

@@ -147,6 +150,35 @@ async function pump(iterable, writable, finish, { end }) {
147150
}
148151
}
149152

153+
async function pumpToWeb(readable, writable, finish, { end }) {
154+
if (isTransformStream(writable)) {
155+
writable = writable.writable;
156+
}
157+
// https://streams.spec.whatwg.org/#example-manual-write-with-backpressure
158+
const writer = writable.getWriter();
159+
try {
160+
for await (const chunk of readable) {
161+
await writer.ready;
162+
writer.write(chunk).catch(() => {});
163+
}
164+
165+
await writer.ready;
166+
167+
if (end) {
168+
await writer.close();
169+
}
170+
171+
finish();
172+
} catch (err) {
173+
try {
174+
await writer.abort(err);
175+
finish(err);
176+
} catch (err) {
177+
finish(err);
178+
}
179+
}
180+
}
181+
150182
function pipeline(...streams) {
151183
return pipelineImpl(streams, once(popCallback(streams)));
152184
}
@@ -259,7 +291,11 @@ function pipelineImpl(streams, callback, opts) {
259291
ret = Duplex.from(stream);
260292
}
261293
} else if (typeof stream === 'function') {
262-
ret = makeAsyncIterable(ret);
294+
if (isTransformStream(ret)) {
295+
ret = makeAsyncIterable(ret?.readable);
296+
} else {
297+
ret = makeAsyncIterable(ret);
298+
}
263299
ret = stream(ret, { signal });
264300

265301
if (reading) {
@@ -303,7 +339,11 @@ function pipelineImpl(streams, callback, opts) {
303339
);
304340
} else if (isIterable(ret, true)) {
305341
finishCount++;
306-
pump(ret, pt, finish, { end });
342+
pumpToNode(ret, pt, finish, { end });
343+
} else if (isReadableStream(ret) || isTransformStream(ret)) {
344+
const toRead = ret.readable || ret;
345+
finishCount++;
346+
pumpToNode(toRead, pt, finish, { end });
307347
} else {
308348
throw new ERR_INVALID_RETURN_VALUE(
309349
'AsyncIterable or Promise', 'destination', ret);
@@ -324,12 +364,30 @@ function pipelineImpl(streams, callback, opts) {
324364
if (isReadable(stream) && isLastStream) {
325365
lastStreamCleanup.push(cleanup);
326366
}
367+
} else if (isTransformStream(ret) || isReadableStream(ret)) {
368+
const toRead = ret.readable || ret;
369+
finishCount++;
370+
pumpToNode(toRead, stream, finish, { end });
327371
} else if (isIterable(ret)) {
328372
finishCount++;
329-
pump(ret, stream, finish, { end });
373+
pumpToNode(ret, stream, finish, { end });
374+
} else {
375+
throw new ERR_INVALID_ARG_TYPE(
376+
'val', ['Readable', 'Iterable', 'AsyncIterable', 'ReadableStream', 'TransformStream'], ret);
377+
}
378+
ret = stream;
379+
} else if (isWebStream(stream)) {
380+
if (isReadableNodeStream(ret)) {
381+
finishCount++;
382+
pumpToWeb(makeAsyncIterable(ret), stream, finish, { end });
383+
} else if (isReadableStream(ret) || isIterable(ret)) {
384+
finishCount++;
385+
pumpToWeb(ret, stream, finish, { end });
386+
} else if (isTransformStream(ret)) {
387+
pumpToWeb(ret.readable, stream, finish, { end });
330388
} else {
331389
throw new ERR_INVALID_ARG_TYPE(
332-
'val', ['Readable', 'Iterable', 'AsyncIterable'], ret);
390+
'val', ['Readable', 'Iterable', 'AsyncIterable', 'ReadableStream', 'TransformStream'], ret);
333391
}
334392
ret = stream;
335393
} else {

lib/internal/streams/utils.js

+15
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,19 @@ function isWritableStream(obj) {
7878
);
7979
}
8080

81+
function isTransformStream(obj) {
82+
return !!(
83+
obj &&
84+
!isNodeStream(obj) &&
85+
typeof obj.readable === 'object' &&
86+
typeof obj.writable === 'object'
87+
);
88+
}
89+
90+
function isWebStream(obj) {
91+
return isReadableStream(obj) || isWritableStream(obj) || isTransformStream(obj);
92+
}
93+
8194
function isIterable(obj, isAsync) {
8295
if (obj == null) return false;
8396
if (isAsync === true) return typeof obj[SymbolAsyncIterator] === 'function';
@@ -305,6 +318,7 @@ module.exports = {
305318
isReadableFinished,
306319
isReadableErrored,
307320
isNodeStream,
321+
isWebStream,
308322
isWritable,
309323
isWritableNodeStream,
310324
isWritableStream,
@@ -314,4 +328,5 @@ module.exports = {
314328
isServerRequest,
315329
isServerResponse,
316330
willEmitClose,
331+
isTransformStream,
317332
};

lib/stream/promises.js

+2-1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ const {
88
const {
99
isIterable,
1010
isNodeStream,
11+
isWebStream,
1112
} = require('internal/streams/utils');
1213

1314
const { pipelineImpl: pl } = require('internal/streams/pipeline');
@@ -19,7 +20,7 @@ function pipeline(...streams) {
1920
let end;
2021
const lastArg = streams[streams.length - 1];
2122
if (lastArg && typeof lastArg === 'object' &&
22-
!isNodeStream(lastArg) && !isIterable(lastArg)) {
23+
!isNodeStream(lastArg) && !isIterable(lastArg) && !isWebStream(lastArg)) {
2324
const options = ArrayPrototypePop(streams);
2425
signal = options.signal;
2526
end = options.end;

0 commit comments

Comments
 (0)