Skip to content

Commit 0a00552

Browse files
mcollinaMylesBorins
authored andcommittedMar 11, 2020
stream: do not swallow errors with async iterators and pipeline
Before this patch, pipeline() could swallow errors by pre-emptively producing a ERR_STREAM_PREMATURE_CLOSE that was not really helpful to the user. Co-Authored-By: Robert Nagy <[email protected]> PR-URL: #32051 Reviewed-By: Robert Nagy <[email protected]> Reviewed-By: Benjamin Gruenbaum <[email protected]>
1 parent 55a8ca8 commit 0a00552

File tree

2 files changed

+51
-15
lines changed

2 files changed

+51
-15
lines changed
 

‎lib/internal/streams/pipeline.js

+24-15
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ async function pump(iterable, writable, finish) {
109109
if (!EE) {
110110
EE = require('events');
111111
}
112+
let error;
112113
try {
113114
for await (const chunk of iterable) {
114115
if (!writable.write(chunk)) {
@@ -118,7 +119,9 @@ async function pump(iterable, writable, finish) {
118119
}
119120
writable.end();
120121
} catch (err) {
121-
finish(err);
122+
error = err;
123+
} finally {
124+
finish(error);
122125
}
123126
}
124127

@@ -135,36 +138,37 @@ function pipeline(...streams) {
135138
let value;
136139
const destroys = [];
137140

138-
function finish(err, final) {
139-
if (!error && err) {
141+
let finishCount = 0;
142+
143+
function finish(err) {
144+
const final = --finishCount === 0;
145+
146+
if (err && (!error || error.code === 'ERR_STREAM_PREMATURE_CLOSE')) {
140147
error = err;
141148
}
142149

143-
if (error || final) {
144-
for (const destroy of destroys) {
145-
destroy(error);
146-
}
150+
if (!error && !final) {
151+
return;
152+
}
153+
154+
while (destroys.length) {
155+
destroys.shift()(error);
147156
}
148157

149158
if (final) {
150159
callback(error, value);
151160
}
152161
}
153162

154-
function wrap(stream, reading, writing, final) {
155-
destroys.push(destroyer(stream, reading, writing, final, (err) => {
156-
finish(err, final);
157-
}));
158-
}
159-
160163
let ret;
161164
for (let i = 0; i < streams.length; i++) {
162165
const stream = streams[i];
163166
const reading = i < streams.length - 1;
164167
const writing = i > 0;
165168

166169
if (isStream(stream)) {
167-
wrap(stream, reading, writing, !reading);
170+
finishCount++;
171+
destroys.push(destroyer(stream, reading, writing, !reading, finish));
168172
}
169173

170174
if (i === 0) {
@@ -210,20 +214,25 @@ function pipeline(...streams) {
210214
pt.destroy(err);
211215
});
212216
} else if (isIterable(ret, true)) {
217+
finishCount++;
213218
pump(ret, pt, finish);
214219
} else {
215220
throw new ERR_INVALID_RETURN_VALUE(
216221
'AsyncIterable or Promise', 'destination', ret);
217222
}
218223

219224
ret = pt;
220-
wrap(ret, false, true, true);
225+
226+
finishCount++;
227+
destroys.push(destroyer(ret, false, true, true, finish));
221228
}
222229
} else if (isStream(stream)) {
223230
if (isReadable(ret)) {
224231
ret.pipe(stream);
225232
} else {
226233
ret = makeAsyncIterable(ret);
234+
235+
finishCount++;
227236
pump(ret, stream, finish);
228237
}
229238
ret = stream;

‎test/parallel/test-stream-pipeline.js

+27
Original file line numberDiff line numberDiff line change
@@ -968,3 +968,30 @@ const { promisify } = require('util');
968968
}));
969969
src.end();
970970
}
971+
972+
{
973+
let res = '';
974+
const rs = new Readable({
975+
read() {
976+
setImmediate(() => {
977+
rs.push('hello');
978+
});
979+
}
980+
});
981+
const ws = new Writable({
982+
write: common.mustNotCall()
983+
});
984+
pipeline(rs, async function*(stream) {
985+
/* eslint no-unused-vars: off */
986+
for await (const chunk of stream) {
987+
throw new Error('kaboom');
988+
}
989+
}, async function *(source) {
990+
for await (const chunk of source) {
991+
res += chunk;
992+
}
993+
}, ws, common.mustCall((err) => {
994+
assert.strictEqual(err.message, 'kaboom');
995+
assert.strictEqual(res, '');
996+
}));
997+
}

0 commit comments

Comments
 (0)
Please sign in to comment.