Skip to content

Commit 1787bfa

Browse files
ronagdanielleadams
authored andcommitted
stream: allow readable to end early without error
PR-URL: #40881 Reviewed-By: James M Snell <[email protected]> Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Luigi Pinca <[email protected]>
1 parent a7dfa43 commit 1787bfa

File tree

2 files changed

+90
-57
lines changed

2 files changed

+90
-57
lines changed

lib/internal/streams/pipeline.js

+64-55
Original file line numberDiff line numberDiff line change
@@ -33,52 +33,26 @@ const {
3333
isIterable,
3434
isReadableNodeStream,
3535
isNodeStream,
36-
isReadableFinished,
3736
} = require('internal/streams/utils');
3837
const { AbortController } = require('internal/abort_controller');
3938

4039
let PassThrough;
4140
let Readable;
4241

43-
function destroyer(stream, reading, writing, callback) {
44-
callback = once(callback);
45-
42+
function destroyer(stream, reading, writing) {
4643
let finished = false;
4744
stream.on('close', () => {
4845
finished = true;
4946
});
5047

5148
eos(stream, { readable: reading, writable: writing }, (err) => {
5249
finished = !err;
53-
54-
const rState = stream._readableState;
55-
if (
56-
err &&
57-
err.code === 'ERR_STREAM_PREMATURE_CLOSE' &&
58-
reading &&
59-
(rState && rState.ended && !rState.errored && !rState.errorEmitted)
60-
) {
61-
// Some readable streams will emit 'close' before 'end'. However, since
62-
// this is on the readable side 'end' should still be emitted if the
63-
// stream has been ended and no error emitted. This should be allowed in
64-
// favor of backwards compatibility. Since the stream is piped to a
65-
// destination this should not result in any observable difference.
66-
// We don't need to check if this is a writable premature close since
67-
// eos will only fail with premature close on the reading side for
68-
// duplex streams.
69-
stream
70-
.once('end', callback)
71-
.once('error', callback);
72-
} else {
73-
callback(err);
74-
}
7550
});
7651

7752
return (err) => {
7853
if (finished) return;
7954
finished = true;
80-
destroyImpl.destroyer(stream, err);
81-
callback(err || new ERR_STREAM_DESTROYED('pipe'));
55+
destroyImpl.destroyer(stream, err || new ERR_STREAM_DESTROYED('pipe'));
8256
};
8357
}
8458

@@ -109,7 +83,7 @@ async function* fromReadable(val) {
10983
yield* Readable.prototype[SymbolAsyncIterator].call(val);
11084
}
11185

112-
async function pump(iterable, writable, finish, opts) {
86+
async function pump(iterable, writable, finish, { end }) {
11387
let error;
11488
let onresolve = null;
11589

@@ -153,7 +127,7 @@ async function pump(iterable, writable, finish, opts) {
153127
}
154128
}
155129

156-
if (opts?.end !== false) {
130+
if (end) {
157131
writable.end();
158132
}
159133

@@ -220,7 +194,7 @@ function pipelineImpl(streams, callback, opts) {
220194
ac.abort();
221195

222196
if (final) {
223-
callback(error, value);
197+
process.nextTick(callback, error, value);
224198
}
225199
}
226200

@@ -233,18 +207,19 @@ function pipelineImpl(streams, callback, opts) {
233207

234208
if (isNodeStream(stream)) {
235209
if (end) {
236-
finishCount++;
237-
destroys.push(destroyer(stream, reading, writing, (err) => {
238-
if (!err && !reading && isReadableFinished(stream, false)) {
239-
stream.read(0);
240-
destroyer(stream, true, writing, finish);
241-
} else {
242-
finish(err);
243-
}
244-
}));
245-
} else {
246-
stream.on('error', finish);
210+
destroys.push(destroyer(stream, reading, writing));
247211
}
212+
213+
// Catch stream errors that occur after pipe/pump has completed.
214+
stream.on('error', (err) => {
215+
if (
216+
err &&
217+
err.name !== 'AbortError' &&
218+
err.code !== 'ERR_STREAM_PREMATURE_CLOSE'
219+
) {
220+
finish(err);
221+
}
222+
});
248223
}
249224

250225
if (i === 0) {
@@ -286,15 +261,18 @@ function pipelineImpl(streams, callback, opts) {
286261
// second use.
287262
const then = ret?.then;
288263
if (typeof then === 'function') {
264+
finishCount++;
289265
then.call(ret,
290266
(val) => {
291267
value = val;
292268
pt.write(val);
293269
if (end) {
294270
pt.end();
295271
}
272+
process.nextTick(finish);
296273
}, (err) => {
297274
pt.destroy(err);
275+
process.nextTick(finish, err);
298276
},
299277
);
300278
} else if (isIterable(ret, true)) {
@@ -307,24 +285,18 @@ function pipelineImpl(streams, callback, opts) {
307285

308286
ret = pt;
309287

310-
finishCount++;
311-
destroys.push(destroyer(ret, false, true, finish));
288+
destroys.push(destroyer(ret, false, true));
312289
}
313290
} else if (isNodeStream(stream)) {
314291
if (isReadableNodeStream(ret)) {
315-
ret.pipe(stream, { end });
316-
317-
// Compat. Before node v10.12.0 stdio used to throw an error so
318-
// pipe() did/does not end() stdio destinations.
319-
// Now they allow it but "secretly" don't close the underlying fd.
320-
if (stream === process.stdout || stream === process.stderr) {
321-
ret.on('end', () => stream.end());
322-
}
323-
} else {
324-
ret = makeAsyncIterable(ret);
325-
292+
finishCount += 2;
293+
pipe(ret, stream, finish, { end });
294+
} else if (isIterable(ret)) {
326295
finishCount++;
327296
pump(ret, stream, finish, { end });
297+
} else {
298+
throw new ERR_INVALID_ARG_TYPE(
299+
'val', ['Readable', 'Iterable', 'AsyncIterable'], ret);
328300
}
329301
ret = stream;
330302
} else {
@@ -339,4 +311,41 @@ function pipelineImpl(streams, callback, opts) {
339311
return ret;
340312
}
341313

314+
function pipe(src, dst, finish, { end }) {
315+
src.pipe(dst, { end });
316+
317+
if (end) {
318+
// Compat. Before node v10.12.0 stdio used to throw an error so
319+
// pipe() did/does not end() stdio destinations.
320+
// Now they allow it but "secretly" don't close the underlying fd.
321+
src.once('end', () => dst.end());
322+
} else {
323+
finish();
324+
}
325+
326+
eos(src, { readable: true, writable: false }, (err) => {
327+
const rState = src._readableState;
328+
if (
329+
err &&
330+
err.code === 'ERR_STREAM_PREMATURE_CLOSE' &&
331+
(rState && rState.ended && !rState.errored && !rState.errorEmitted)
332+
) {
333+
// Some readable streams will emit 'close' before 'end'. However, since
334+
// this is on the readable side 'end' should still be emitted if the
335+
// stream has been ended and no error emitted. This should be allowed in
336+
// favor of backwards compatibility. Since the stream is piped to a
337+
// destination this should not result in any observable difference.
338+
// We don't need to check if this is a writable premature close since
339+
// eos will only fail with premature close on the reading side for
340+
// duplex streams.
341+
src
342+
.once('end', finish)
343+
.once('error', finish);
344+
} else {
345+
finish(err);
346+
}
347+
});
348+
eos(dst, { readable: false, writable: true }, finish);
349+
}
350+
342351
module.exports = { pipelineImpl, pipeline };

test/parallel/test-stream-pipeline.js

+26-2
Original file line numberDiff line numberDiff line change
@@ -1027,7 +1027,7 @@ const tsp = require('timers/promises');
10271027
const src = new PassThrough();
10281028
const dst = new PassThrough();
10291029
pipeline(src, dst, common.mustSucceed(() => {
1030-
assert.strictEqual(dst.destroyed, true);
1030+
assert.strictEqual(dst.destroyed, false);
10311031
}));
10321032
src.end();
10331033
}
@@ -1462,7 +1462,7 @@ const tsp = require('timers/promises');
14621462

14631463
await pipelinePromise(read, duplex);
14641464

1465-
assert.strictEqual(duplex.destroyed, true);
1465+
assert.strictEqual(duplex.destroyed, false);
14661466
}
14671467

14681468
run().then(common.mustCall());
@@ -1488,3 +1488,27 @@ const tsp = require('timers/promises');
14881488

14891489
run().then(common.mustCall());
14901490
}
1491+
1492+
{
1493+
const s = new PassThrough({ objectMode: true });
1494+
pipeline(async function*() {
1495+
await Promise.resolve();
1496+
yield 'hello';
1497+
yield 'world';
1498+
yield 'world';
1499+
}, s, async function(source) {
1500+
let ret = '';
1501+
let n = 0;
1502+
for await (const chunk of source) {
1503+
if (n++ > 1) {
1504+
break;
1505+
}
1506+
ret += chunk;
1507+
}
1508+
return ret;
1509+
}, common.mustCall((err, val) => {
1510+
assert.strictEqual(err, undefined);
1511+
assert.strictEqual(val, 'helloworld');
1512+
assert.strictEqual(s.destroyed, true);
1513+
}));
1514+
}

0 commit comments

Comments
 (0)