Skip to content

Commit 9d09969

Browse files
ronagaddaleax
authored andcommitted
stream: always invoke end callback
Ensure that the callback passed into end() is always invoke in order to avoid bug such as deadlock the user. PR-URL: #29747 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: James M Snell <[email protected]> Reviewed-By: Trivikram Kamat <[email protected]>
1 parent 535e957 commit 9d09969

4 files changed

+153
-4
lines changed

lib/_stream_writable.js

+30-4
Original file line numberDiff line numberDiff line change
@@ -611,11 +611,11 @@ Writable.prototype.end = function(chunk, encoding, cb) {
611611
}
612612

613613
// Ignore unnecessary end() calls.
614-
if (!state.ending)
614+
if (!state.ending) {
615615
endWritable(this, state, cb);
616-
else if (typeof cb === 'function') {
616+
} else if (typeof cb === 'function') {
617617
if (!state.finished) {
618-
this.once('finish', cb);
618+
onFinished(this, state, cb);
619619
} else {
620620
cb(new ERR_STREAM_ALREADY_FINISHED('end'));
621621
}
@@ -695,7 +695,7 @@ function endWritable(stream, state, cb) {
695695
if (state.finished)
696696
process.nextTick(cb);
697697
else
698-
stream.once('finish', cb);
698+
onFinished(stream, state, cb);
699699
}
700700
state.ended = true;
701701
stream.writable = false;
@@ -715,6 +715,32 @@ function onCorkedFinish(corkReq, state, err) {
715715
state.corkedRequestsFree.next = corkReq;
716716
}
717717

718+
function onFinished(stream, state, cb) {
719+
if (state.destroyed && state.errorEmitted) {
720+
// TODO(ronag): Backwards compat. Should be moved to end() without
721+
// errorEmitted check and with errorOrDestroy.
722+
const err = new ERR_STREAM_DESTROYED('end');
723+
process.nextTick(cb, err);
724+
return;
725+
}
726+
727+
function onerror(err) {
728+
stream.removeListener('finish', onfinish);
729+
stream.removeListener('error', onerror);
730+
cb(err);
731+
if (stream.listenerCount('error') === 0) {
732+
stream.emit('error', err);
733+
}
734+
}
735+
function onfinish() {
736+
stream.removeListener('finish', onfinish);
737+
stream.removeListener('error', onerror);
738+
cb();
739+
}
740+
stream.on('finish', onfinish);
741+
stream.prependListener('error', onerror);
742+
}
743+
718744
Object.defineProperty(Writable.prototype, 'destroyed', {
719745
// Making it explicit this property is not enumerable
720746
// because otherwise some prototype manipulation in

test/parallel/test-stream-writable-destroy.js

+52
Original file line numberDiff line numberDiff line change
@@ -292,3 +292,55 @@ const assert = require('assert');
292292
}));
293293
write.uncork();
294294
}
295+
296+
{
297+
// Call end(cb) after error & destroy
298+
299+
const write = new Writable({
300+
write(chunk, enc, cb) { cb(new Error('asd')); }
301+
});
302+
write.on('error', common.mustCall(() => {
303+
write.destroy();
304+
let ticked = false;
305+
write.end(common.mustCall((err) => {
306+
assert.strictEqual(ticked, true);
307+
assert.strictEqual(err.code, 'ERR_STREAM_DESTROYED');
308+
}));
309+
ticked = true;
310+
}));
311+
write.write('asd');
312+
}
313+
314+
{
315+
// Call end(cb) after finish & destroy
316+
317+
const write = new Writable({
318+
write(chunk, enc, cb) { cb(); }
319+
});
320+
write.on('finish', common.mustCall(() => {
321+
write.destroy();
322+
let ticked = false;
323+
write.end(common.mustCall((err) => {
324+
assert.strictEqual(ticked, false);
325+
assert.strictEqual(err.code, 'ERR_STREAM_ALREADY_FINISHED');
326+
}));
327+
ticked = true;
328+
}));
329+
write.end();
330+
}
331+
332+
{
333+
// Call end(cb) after error & destroy and don't trigger
334+
// unhandled exception.
335+
336+
const write = new Writable({
337+
write(chunk, enc, cb) { process.nextTick(cb); }
338+
});
339+
write.once('error', common.mustCall((err) => {
340+
assert.strictEqual(err.message, 'asd');
341+
}));
342+
write.end('asd', common.mustCall((err) => {
343+
assert.strictEqual(err.message, 'asd');
344+
}));
345+
write.destroy(new Error('asd'));
346+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const assert = require('assert');
5+
const stream = require('stream');
6+
7+
{
8+
// Invoke end callback on failure.
9+
const writable = new stream.Writable();
10+
11+
writable._write = (chunk, encoding, cb) => {
12+
process.nextTick(cb, new Error('kaboom'));
13+
};
14+
15+
writable.on('error', common.mustCall((err) => {
16+
assert.strictEqual(err.message, 'kaboom');
17+
}));
18+
writable.write('asd');
19+
writable.end(common.mustCall((err) => {
20+
assert.strictEqual(err.message, 'kaboom');
21+
}));
22+
writable.end(common.mustCall((err) => {
23+
assert.strictEqual(err.message, 'kaboom');
24+
}));
25+
}
26+
27+
{
28+
// Don't invoke end callback twice
29+
const writable = new stream.Writable();
30+
31+
writable._write = (chunk, encoding, cb) => {
32+
process.nextTick(cb);
33+
};
34+
35+
let called = false;
36+
writable.end('asd', common.mustCall((err) => {
37+
called = true;
38+
assert.strictEqual(err, undefined);
39+
}));
40+
41+
writable.on('error', common.mustCall((err) => {
42+
assert.strictEqual(err.message, 'kaboom');
43+
}));
44+
writable.on('finish', common.mustCall(() => {
45+
assert.strictEqual(called, true);
46+
writable.emit('error', new Error('kaboom'));
47+
}));
48+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const assert = require('assert');
5+
const stream = require('stream');
6+
7+
process.on('uncaughtException', common.mustCall((err) => {
8+
assert.strictEqual(err.message, 'kaboom');
9+
}));
10+
11+
const writable = new stream.Writable();
12+
13+
writable._write = (chunk, encoding, cb) => {
14+
cb();
15+
};
16+
writable._final = (cb) => {
17+
cb(new Error('kaboom'));
18+
};
19+
20+
writable.write('asd');
21+
writable.end(common.mustCall((err) => {
22+
assert.strictEqual(err.message, 'kaboom');
23+
}));

0 commit comments

Comments
 (0)