@@ -35,6 +35,9 @@ const {
35
35
isReadable,
36
36
isReadableNodeStream,
37
37
isNodeStream,
38
+ isTransformStream,
39
+ isWebStream,
40
+ isReadableStream,
38
41
} = require ( 'internal/streams/utils' ) ;
39
42
const { AbortController } = require ( 'internal/abort_controller' ) ;
40
43
@@ -88,7 +91,7 @@ async function* fromReadable(val) {
88
91
yield * Readable . prototype [ SymbolAsyncIterator ] . call ( val ) ;
89
92
}
90
93
91
- async function pump ( iterable , writable , finish , { end } ) {
94
+ async function pumpToNode ( iterable , writable , finish , { end } ) {
92
95
let error ;
93
96
let onresolve = null ;
94
97
@@ -147,6 +150,35 @@ async function pump(iterable, writable, finish, { end }) {
147
150
}
148
151
}
149
152
153
+ async function pumpToWeb ( readable , writable , finish , { end } ) {
154
+ if ( isTransformStream ( writable ) ) {
155
+ writable = writable . writable ;
156
+ }
157
+ // https://streams.spec.whatwg.org/#example-manual-write-with-backpressure
158
+ const writer = writable . getWriter ( ) ;
159
+ try {
160
+ for await ( const chunk of readable ) {
161
+ await writer . ready ;
162
+ writer . write ( chunk ) . catch ( ( ) => { } ) ;
163
+ }
164
+
165
+ await writer . ready ;
166
+
167
+ if ( end ) {
168
+ await writer . close ( ) ;
169
+ }
170
+
171
+ finish ( ) ;
172
+ } catch ( err ) {
173
+ try {
174
+ await writer . abort ( err ) ;
175
+ finish ( err ) ;
176
+ } catch ( err ) {
177
+ finish ( err ) ;
178
+ }
179
+ }
180
+ }
181
+
150
182
function pipeline ( ...streams ) {
151
183
return pipelineImpl ( streams , once ( popCallback ( streams ) ) ) ;
152
184
}
@@ -259,7 +291,11 @@ function pipelineImpl(streams, callback, opts) {
259
291
ret = Duplex . from ( stream ) ;
260
292
}
261
293
} else if ( typeof stream === 'function' ) {
262
- ret = makeAsyncIterable ( ret ) ;
294
+ if ( isTransformStream ( ret ) ) {
295
+ ret = makeAsyncIterable ( ret ?. readable ) ;
296
+ } else {
297
+ ret = makeAsyncIterable ( ret ) ;
298
+ }
263
299
ret = stream ( ret , { signal } ) ;
264
300
265
301
if ( reading ) {
@@ -303,7 +339,11 @@ function pipelineImpl(streams, callback, opts) {
303
339
) ;
304
340
} else if ( isIterable ( ret , true ) ) {
305
341
finishCount ++ ;
306
- pump ( ret , pt , finish , { end } ) ;
342
+ pumpToNode ( ret , pt , finish , { end } ) ;
343
+ } else if ( isReadableStream ( ret ) || isTransformStream ( ret ) ) {
344
+ const toRead = ret . readable || ret ;
345
+ finishCount ++ ;
346
+ pumpToNode ( toRead , pt , finish , { end } ) ;
307
347
} else {
308
348
throw new ERR_INVALID_RETURN_VALUE (
309
349
'AsyncIterable or Promise' , 'destination' , ret ) ;
@@ -324,12 +364,30 @@ function pipelineImpl(streams, callback, opts) {
324
364
if ( isReadable ( stream ) && isLastStream ) {
325
365
lastStreamCleanup . push ( cleanup ) ;
326
366
}
367
+ } else if ( isTransformStream ( ret ) || isReadableStream ( ret ) ) {
368
+ const toRead = ret . readable || ret ;
369
+ finishCount ++ ;
370
+ pumpToNode ( toRead , stream , finish , { end } ) ;
327
371
} else if ( isIterable ( ret ) ) {
328
372
finishCount ++ ;
329
- pump ( ret , stream , finish , { end } ) ;
373
+ pumpToNode ( ret , stream , finish , { end } ) ;
374
+ } else {
375
+ throw new ERR_INVALID_ARG_TYPE (
376
+ 'val' , [ 'Readable' , 'Iterable' , 'AsyncIterable' , 'ReadableStream' , 'TransformStream' ] , ret ) ;
377
+ }
378
+ ret = stream ;
379
+ } else if ( isWebStream ( stream ) ) {
380
+ if ( isReadableNodeStream ( ret ) ) {
381
+ finishCount ++ ;
382
+ pumpToWeb ( makeAsyncIterable ( ret ) , stream , finish , { end } ) ;
383
+ } else if ( isReadableStream ( ret ) || isIterable ( ret ) ) {
384
+ finishCount ++ ;
385
+ pumpToWeb ( ret , stream , finish , { end } ) ;
386
+ } else if ( isTransformStream ( ret ) ) {
387
+ pumpToWeb ( ret . readable , stream , finish , { end } ) ;
330
388
} else {
331
389
throw new ERR_INVALID_ARG_TYPE (
332
- 'val' , [ 'Readable' , 'Iterable' , 'AsyncIterable' ] , ret ) ;
390
+ 'val' , [ 'Readable' , 'Iterable' , 'AsyncIterable' , 'ReadableStream' , 'TransformStream' ] , ret ) ;
333
391
}
334
392
ret = stream ;
335
393
} else {
0 commit comments