Skip to content

Commit 0a9e96e

Browse files
mcollinaaddaleax
authored andcommitted
stream: finish must always follow error
When _write completes with an Error, 'finish' was emitted before 'error' if the callback was asynchronous. This commit restore the previous behavior. The logic is still less then ideal, because we call the write() callback before emitting error if asynchronous, but after if synchronous. This commit do not try to change the behavior. This commit fixes a regression introduced by: #13195. Fixes: #13812 PR-URL: #13850 Reviewed-By: James M Snell <[email protected]> Reviewed-By: Anna Henningsen <[email protected]> Reviewed-By: Calvin Metcalf <[email protected]> Reviewed-By: Colin Ihrig <[email protected]>
1 parent 0bb53a7 commit 0a9e96e

3 files changed

+175
-64
lines changed

lib/_stream_writable.js

+19-11
Original file line numberDiff line numberDiff line change
@@ -374,18 +374,26 @@ function doWrite(stream, state, writev, len, chunk, encoding, cb) {
374374

375375
function onwriteError(stream, state, sync, er, cb) {
376376
--state.pendingcb;
377-
if (sync)
378-
process.nextTick(afterError, stream, state, cb, er);
379-
else
380-
afterError(stream, state, cb, er);
381-
382-
stream._writableState.errorEmitted = true;
383-
stream.emit('error', er);
384-
}
385377

386-
function afterError(stream, state, cb, err) {
387-
cb(err);
388-
finishMaybe(stream, state);
378+
if (sync) {
379+
// defer the callback if we are being called synchronously
380+
// to avoid piling up things on the stack
381+
process.nextTick(cb, er);
382+
// this can emit finish, and it will always happen
383+
// after error
384+
process.nextTick(finishMaybe, stream, state);
385+
stream._writableState.errorEmitted = true;
386+
stream.emit('error', er);
387+
} else {
388+
// the caller expect this to happen before if
389+
// it is async
390+
cb(er);
391+
stream._writableState.errorEmitted = true;
392+
stream.emit('error', er);
393+
// this can emit finish, but finish must
394+
// always follow error
395+
finishMaybe(stream, state);
396+
}
389397
}
390398

391399
function onwriteStateUpdate(state) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const assert = require('assert');
5+
const stream = require('stream');
6+
7+
// ensure consistency between the finish event when using cork()
8+
// and writev and when not using them
9+
10+
{
11+
const writable = new stream.Writable();
12+
13+
writable._write = (chunks, encoding, cb) => {
14+
cb(new Error('write test error'));
15+
};
16+
17+
let firstError = false;
18+
writable.on('finish', common.mustCall(function() {
19+
assert.strictEqual(firstError, true);
20+
}));
21+
22+
writable.on('prefinish', common.mustCall());
23+
24+
writable.on('error', common.mustCall((er) => {
25+
assert.strictEqual(er.message, 'write test error');
26+
firstError = true;
27+
}));
28+
29+
writable.end('test');
30+
}
31+
32+
{
33+
const writable = new stream.Writable();
34+
35+
writable._write = (chunks, encoding, cb) => {
36+
setImmediate(cb, new Error('write test error'));
37+
};
38+
39+
let firstError = false;
40+
writable.on('finish', common.mustCall(function() {
41+
assert.strictEqual(firstError, true);
42+
}));
43+
44+
writable.on('prefinish', common.mustCall());
45+
46+
writable.on('error', common.mustCall((er) => {
47+
assert.strictEqual(er.message, 'write test error');
48+
firstError = true;
49+
}));
50+
51+
writable.end('test');
52+
}
53+
54+
{
55+
const writable = new stream.Writable();
56+
57+
writable._write = (chunks, encoding, cb) => {
58+
cb(new Error('write test error'));
59+
};
60+
61+
writable._writev = (chunks, cb) => {
62+
cb(new Error('writev test error'));
63+
};
64+
65+
let firstError = false;
66+
writable.on('finish', common.mustCall(function() {
67+
assert.strictEqual(firstError, true);
68+
}));
69+
70+
writable.on('prefinish', common.mustCall());
71+
72+
writable.on('error', common.mustCall((er) => {
73+
assert.strictEqual(er.message, 'writev test error');
74+
firstError = true;
75+
}));
76+
77+
writable.cork();
78+
writable.write('test');
79+
80+
setImmediate(function() {
81+
writable.end('test');
82+
});
83+
}
84+
85+
{
86+
const writable = new stream.Writable();
87+
88+
writable._write = (chunks, encoding, cb) => {
89+
setImmediate(cb, new Error('write test error'));
90+
};
91+
92+
writable._writev = (chunks, cb) => {
93+
setImmediate(cb, new Error('writev test error'));
94+
};
95+
96+
let firstError = false;
97+
writable.on('finish', common.mustCall(function() {
98+
assert.strictEqual(firstError, true);
99+
}));
100+
101+
writable.on('prefinish', common.mustCall());
102+
103+
writable.on('error', common.mustCall((er) => {
104+
assert.strictEqual(er.message, 'writev test error');
105+
firstError = true;
106+
}));
107+
108+
writable.cork();
109+
writable.write('test');
110+
111+
setImmediate(function() {
112+
writable.end('test');
113+
});
114+
}
115+
116+
// Regression test for
117+
// https://github.com/nodejs/node/issues/13812
118+
119+
{
120+
const rs = new stream.Readable();
121+
rs.push('ok');
122+
rs.push(null);
123+
rs._read = () => {};
124+
125+
const ws = new stream.Writable();
126+
let firstError = false;
127+
128+
ws.on('finish', common.mustCall(function() {
129+
assert.strictEqual(firstError, true);
130+
}));
131+
ws.on('error', common.mustCall(function() {
132+
firstError = true;
133+
}));
134+
135+
ws._write = (chunk, encoding, done) => {
136+
setImmediate(done, new Error());
137+
};
138+
rs.pipe(ws);
139+
}
140+
141+
{
142+
const rs = new stream.Readable();
143+
rs.push('ok');
144+
rs.push(null);
145+
rs._read = () => {};
146+
147+
const ws = new stream.Writable();
148+
149+
ws.on('finish', common.mustNotCall());
150+
ws.on('error', common.mustCall());
151+
152+
ws._write = (chunk, encoding, done) => {
153+
done(new Error());
154+
};
155+
rs.pipe(ws);
156+
}

test/parallel/test-stream-writable-writev-finish.js

-53
This file was deleted.

0 commit comments

Comments
 (0)