Skip to content

Commit f3afed1

Browse files
committed
stream: pipeline with end option
Currently pipeline cannot fully replace pipe due to the missing end option. This PR adds the end option to the promisified pipeline method.
1 parent 340b770 commit f3afed1

File tree

2 files changed

+15
-7
lines changed

2 files changed

+15
-7
lines changed

lib/internal/streams/pipeline.js

+12-6
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ async function* fromReadable(val) {
109109
yield* Readable.prototype[SymbolAsyncIterator].call(val);
110110
}
111111

112-
async function pump(iterable, writable, finish) {
112+
async function pump(iterable, writable, finish, opts) {
113113
let error;
114114
let onresolve = null;
115115

@@ -153,7 +153,9 @@ async function pump(iterable, writable, finish) {
153153
}
154154
}
155155

156-
writable.end();
156+
if (opts !== false) {
157+
writable.end();
158+
}
157159

158160
await wait();
159161

@@ -227,6 +229,7 @@ function pipelineImpl(streams, callback, opts) {
227229
const stream = streams[i];
228230
const reading = i < streams.length - 1;
229231
const writing = i > 0;
232+
const end = reading || opts?.end !== false;
230233

231234
if (isNodeStream(stream)) {
232235
finishCount++;
@@ -282,14 +285,17 @@ function pipelineImpl(streams, callback, opts) {
282285
then.call(ret,
283286
(val) => {
284287
value = val;
285-
pt.end(val);
288+
pt.write(val);
289+
if (end) {
290+
pt.end();
291+
}
286292
}, (err) => {
287293
pt.destroy(err);
288294
},
289295
);
290296
} else if (isIterable(ret, true)) {
291297
finishCount++;
292-
pump(ret, pt, finish);
298+
pump(ret, pt, finish, { end });
293299
} else {
294300
throw new ERR_INVALID_RETURN_VALUE(
295301
'AsyncIterable or Promise', 'destination', ret);
@@ -302,7 +308,7 @@ function pipelineImpl(streams, callback, opts) {
302308
}
303309
} else if (isNodeStream(stream)) {
304310
if (isReadableNodeStream(ret)) {
305-
ret.pipe(stream);
311+
ret.pipe(stream, { end });
306312

307313
// Compat. Before node v10.12.0 stdio used to throw an error so
308314
// pipe() did/does not end() stdio destinations.
@@ -314,7 +320,7 @@ function pipelineImpl(streams, callback, opts) {
314320
ret = makeAsyncIterable(ret);
315321

316322
finishCount++;
317-
pump(ret, stream, finish);
323+
pump(ret, stream, finish, { end });
318324
}
319325
ret = stream;
320326
} else {

lib/stream/promises.js

+3-1
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,13 @@ const eos = require('internal/streams/end-of-stream');
1616
function pipeline(...streams) {
1717
return new Promise((resolve, reject) => {
1818
let signal;
19+
let end;
1920
const lastArg = streams[streams.length - 1];
2021
if (lastArg && typeof lastArg === 'object' &&
2122
!isNodeStream(lastArg) && !isIterable(lastArg)) {
2223
const options = ArrayPrototypePop(streams);
2324
signal = options.signal;
25+
end = options.end;
2426
}
2527

2628
pl(streams, (err, value) => {
@@ -29,7 +31,7 @@ function pipeline(...streams) {
2931
} else {
3032
resolve(value);
3133
}
32-
}, { signal });
34+
}, { signal, end });
3335
});
3436
}
3537

0 commit comments

Comments
 (0)