Skip to content

Commit e110c96

Browse files
ronagdanielleadams
authored andcommitted
stream: pipeline with end option
Currently pipeline cannot fully replace pipe due to the missing end option. This PR adds the end option to the promisified pipeline method. PR-URL: #40886 Reviewed-By: Luigi Pinca <[email protected]> Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: James M Snell <[email protected]>
1 parent e092fde commit e110c96

File tree

3 files changed

+50
-17
lines changed

3 files changed

+50
-17
lines changed

lib/internal/streams/pipeline.js

+25-15
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ async function* fromReadable(val) {
109109
yield* Readable.prototype[SymbolAsyncIterator].call(val);
110110
}
111111

112-
async function pump(iterable, writable, finish) {
112+
async function pump(iterable, writable, finish, opts) {
113113
let error;
114114
let onresolve = null;
115115

@@ -153,7 +153,9 @@ async function pump(iterable, writable, finish) {
153153
}
154154
}
155155

156-
writable.end();
156+
if (opts?.end !== false) {
157+
writable.end();
158+
}
157159

158160
await wait();
159161

@@ -227,17 +229,22 @@ function pipelineImpl(streams, callback, opts) {
227229
const stream = streams[i];
228230
const reading = i < streams.length - 1;
229231
const writing = i > 0;
232+
const end = reading || opts?.end !== false;
230233

231234
if (isNodeStream(stream)) {
232-
finishCount++;
233-
destroys.push(destroyer(stream, reading, writing, (err) => {
234-
if (!err && !reading && isReadableFinished(stream, false)) {
235-
stream.read(0);
236-
destroyer(stream, true, writing, finish);
237-
} else {
238-
finish(err);
239-
}
240-
}));
235+
if (end) {
236+
finishCount++;
237+
destroys.push(destroyer(stream, reading, writing, (err) => {
238+
if (!err && !reading && isReadableFinished(stream, false)) {
239+
stream.read(0);
240+
destroyer(stream, true, writing, finish);
241+
} else {
242+
finish(err);
243+
}
244+
}));
245+
} else {
246+
stream.on('error', finish);
247+
}
241248
}
242249

243250
if (i === 0) {
@@ -282,14 +289,17 @@ function pipelineImpl(streams, callback, opts) {
282289
then.call(ret,
283290
(val) => {
284291
value = val;
285-
pt.end(val);
292+
pt.write(val);
293+
if (end) {
294+
pt.end();
295+
}
286296
}, (err) => {
287297
pt.destroy(err);
288298
},
289299
);
290300
} else if (isIterable(ret, true)) {
291301
finishCount++;
292-
pump(ret, pt, finish);
302+
pump(ret, pt, finish, { end });
293303
} else {
294304
throw new ERR_INVALID_RETURN_VALUE(
295305
'AsyncIterable or Promise', 'destination', ret);
@@ -302,7 +312,7 @@ function pipelineImpl(streams, callback, opts) {
302312
}
303313
} else if (isNodeStream(stream)) {
304314
if (isReadableNodeStream(ret)) {
305-
ret.pipe(stream);
315+
ret.pipe(stream, { end });
306316

307317
// Compat. Before node v10.12.0 stdio used to throw an error so
308318
// pipe() did/does not end() stdio destinations.
@@ -314,7 +324,7 @@ function pipelineImpl(streams, callback, opts) {
314324
ret = makeAsyncIterable(ret);
315325

316326
finishCount++;
317-
pump(ret, stream, finish);
327+
pump(ret, stream, finish, { end });
318328
}
319329
ret = stream;
320330
} else {

lib/stream/promises.js

+3-1
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,13 @@ const eos = require('internal/streams/end-of-stream');
1616
function pipeline(...streams) {
1717
return new Promise((resolve, reject) => {
1818
let signal;
19+
let end;
1920
const lastArg = streams[streams.length - 1];
2021
if (lastArg && typeof lastArg === 'object' &&
2122
!isNodeStream(lastArg) && !isIterable(lastArg)) {
2223
const options = ArrayPrototypePop(streams);
2324
signal = options.signal;
25+
end = options.end;
2426
}
2527

2628
pl(streams, (err, value) => {
@@ -29,7 +31,7 @@ function pipeline(...streams) {
2931
} else {
3032
resolve(value);
3133
}
32-
}, { signal });
34+
}, { signal, end });
3335
});
3436
}
3537

test/parallel/test-stream-pipeline.js

+22-1
Original file line numberDiff line numberDiff line change
@@ -1465,5 +1465,26 @@ const tsp = require('timers/promises');
14651465
assert.strictEqual(duplex.destroyed, true);
14661466
}
14671467

1468-
run();
1468+
run().then(common.mustCall());
1469+
}
1470+
1471+
{
1472+
const pipelinePromise = promisify(pipeline);
1473+
1474+
async function run() {
1475+
const read = new Readable({
1476+
read() {}
1477+
});
1478+
1479+
const duplex = new PassThrough();
1480+
1481+
read.push(null);
1482+
1483+
await pipelinePromise(read, duplex, { end: false });
1484+
1485+
assert.strictEqual(duplex.destroyed, false);
1486+
assert.strictEqual(duplex.writableEnded, false);
1487+
}
1488+
1489+
run().then(common.mustCall());
14691490
}

0 commit comments

Comments
 (0)