Skip to content

Commit 313ecaa

Browse files
ronagcodebytere
authored andcommitted
stream: fix broken pipeline error propagation
If the destination was an async function any error thrown from that function would be swallowed. Backport-PR-URL: #31975 PR-URL: #31835 Reviewed-By: Benjamin Gruenbaum <[email protected]> Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Denys Otrishko <[email protected]>
1 parent 8ad64b8 commit 313ecaa

File tree

3 files changed

+34
-13
lines changed

3 files changed

+34
-13
lines changed

lib/internal/streams/pipeline.js

+8-8
Original file line numberDiff line numberDiff line change
@@ -163,9 +163,10 @@ function pipeline(...streams) {
163163
}
164164

165165
let error;
166+
let value;
166167
const destroys = [];
167168

168-
function finish(err, val, final) {
169+
function finish(err, final) {
169170
if (!error && err) {
170171
error = err;
171172
}
@@ -177,13 +178,13 @@ function pipeline(...streams) {
177178
}
178179

179180
if (final) {
180-
callback(error, val);
181+
callback(error, value);
181182
}
182183
}
183184

184185
function wrap(stream, reading, writing, final) {
185186
destroys.push(destroyer(stream, reading, writing, (err) => {
186-
finish(err, null, final);
187+
finish(err, final);
187188
}));
188189
}
189190

@@ -229,11 +230,10 @@ function pipeline(...streams) {
229230
if (isPromise(ret)) {
230231
ret
231232
.then((val) => {
233+
value = val;
232234
pt.end(val);
233-
finish(null, val, true);
234-
})
235-
.catch((err) => {
236-
finish(err, null, true);
235+
}, (err) => {
236+
pt.destroy(err);
237237
});
238238
} else if (isIterable(ret, true)) {
239239
pump(ret, pt, finish);
@@ -243,7 +243,7 @@ function pipeline(...streams) {
243243
}
244244

245245
ret = pt;
246-
wrap(ret, true, false, true);
246+
wrap(ret, false, true, true);
247247
}
248248
} else if (isStream(stream)) {
249249
if (isReadable(ret)) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const {
5+
pipeline,
6+
PassThrough
7+
} = require('stream');
8+
const assert = require('assert');
9+
10+
process.on('uncaughtException', common.mustCall((err) => {
11+
assert.strictEqual(err.message, 'error');
12+
}));
13+
14+
// Ensure that pipeline that ends with Promise
15+
// still propagates error to uncaughtException.
16+
const s = new PassThrough();
17+
s.end('data');
18+
pipeline(s, async function(source) {
19+
for await (const chunk of source) {
20+
chunk;
21+
}
22+
}, common.mustCall((err) => {
23+
assert.ifError(err);
24+
throw new Error('error');
25+
}));

test/parallel/test-stream-pipeline.js

+1-5
Original file line numberDiff line numberDiff line change
@@ -613,11 +613,9 @@ const { promisify } = require('util');
613613
yield 'hello';
614614
yield 'world';
615615
}, async function*(source) {
616-
const ret = [];
617616
for await (const chunk of source) {
618-
ret.push(chunk.toUpperCase());
617+
yield chunk.toUpperCase();
619618
}
620-
yield ret;
621619
}, async function(source) {
622620
let ret = '';
623621
for await (const chunk of source) {
@@ -754,7 +752,6 @@ const { promisify } = require('util');
754752
}, common.mustCall((err) => {
755753
assert.strictEqual(err, undefined);
756754
assert.strictEqual(ret, 'asd');
757-
assert.strictEqual(s.destroyed, true);
758755
}));
759756
}
760757

@@ -775,7 +772,6 @@ const { promisify } = require('util');
775772
}, common.mustCall((err) => {
776773
assert.strictEqual(err, undefined);
777774
assert.strictEqual(ret, 'asd');
778-
assert.strictEqual(s.destroyed, true);
779775
}));
780776
}
781777

0 commit comments

Comments
 (0)