Skip to content

Commit ffae5f3

Browse files
ronagrichardlau
authored andcommitted
stream: save error in state
Useful for future PR's to resolve situations where e.g. finished() is invoked on an already errored streams. PR-URL: #34103 Backport-PR-URL: #34887 Refs: #34680 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Denys Otrishko <[email protected]> Reviewed-By: Anna Henningsen <[email protected]> Reviewed-By: Luigi Pinca <[email protected]>
1 parent bbf7b92 commit ffae5f3

6 files changed

+90
-33
lines changed

lib/_stream_readable.js

+5-2
Original file line numberDiff line numberDiff line change
@@ -145,8 +145,11 @@ function ReadableState(options, stream, isDuplex) {
145145
// Has it been destroyed.
146146
this.destroyed = false;
147147

148-
// Indicates whether the stream has errored.
149-
this.errored = false;
148+
// Indicates whether the stream has errored. When true no further
149+
// _read calls, 'data' or 'readable' events should occur. This is needed
150+
// since when autoDestroy is disabled we need a way to tell whether the
151+
// stream has failed.
152+
this.errored = null;
150153

151154
// Indicates whether the stream has finished destroying.
152155
this.closed = false;

lib/_stream_writable.js

+14-2
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ function WritableState(options, stream, isDuplex) {
171171
// Indicates whether the stream has errored. When true all write() calls
172172
// should return false. This is needed since when autoDestroy
173173
// is disabled we need a way to tell whether the stream has failed.
174-
this.errored = false;
174+
this.errored = null;
175175

176176
// Indicates whether the stream has finished destroying.
177177
this.closed = false;
@@ -407,7 +407,19 @@ function onwrite(stream, er) {
407407
state.writelen = 0;
408408

409409
if (er) {
410-
state.errored = true;
410+
// Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
411+
er.stack;
412+
413+
if (!state.errored) {
414+
state.errored = er;
415+
}
416+
417+
// In case of duplex streams we need to notify the readable side of the
418+
// error.
419+
if (stream._readableState && !stream._readableState.errored) {
420+
stream._readableState.errored = er;
421+
}
422+
411423
if (sync) {
412424
process.nextTick(onwriteError, stream, state, er, cb);
413425
} else {

lib/internal/streams/destroy.js

+23-14
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,14 @@ function destroy(err, cb) {
1515
}
1616

1717
if (err) {
18-
if (w) {
19-
w.errored = true;
18+
// Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
19+
err.stack;
20+
21+
if (w && !w.errored) {
22+
w.errored = err;
2023
}
21-
if (r) {
22-
r.errored = true;
24+
if (r && !r.errored) {
25+
r.errored = err;
2326
}
2427
}
2528

@@ -35,11 +38,14 @@ function destroy(err, cb) {
3538

3639
this._destroy(err || null, (err) => {
3740
if (err) {
38-
if (w) {
39-
w.errored = true;
41+
// Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
42+
err.stack;
43+
44+
if (w && !w.errored) {
45+
w.errored = err;
4046
}
41-
if (r) {
42-
r.errored = true;
47+
if (r && !r.errored) {
48+
r.errored = err;
4349
}
4450
}
4551

@@ -108,7 +114,7 @@ function undestroy() {
108114
r.closed = false;
109115
r.closeEmitted = false;
110116
r.destroyed = false;
111-
r.errored = false;
117+
r.errored = null;
112118
r.errorEmitted = false;
113119
r.reading = false;
114120
r.ended = false;
@@ -118,7 +124,7 @@ function undestroy() {
118124
if (w) {
119125
w.closed = false;
120126
w.destroyed = false;
121-
w.errored = false;
127+
w.errored = null;
122128
w.ended = false;
123129
w.ending = false;
124130
w.finalCalled = false;
@@ -145,11 +151,14 @@ function errorOrDestroy(stream, err, sync) {
145151
if ((r && r.autoDestroy) || (w && w.autoDestroy))
146152
stream.destroy(err);
147153
else if (err) {
148-
if (w) {
149-
w.errored = true;
154+
// Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
155+
err.stack;
156+
157+
if (w && !w.errored) {
158+
w.errored = err;
150159
}
151-
if (r) {
152-
r.errored = true;
160+
if (r && !r.errored) {
161+
r.errored = err;
153162
}
154163

155164
if (sync) {

test/parallel/test-stream-readable-destroy.js

+22-7
Original file line numberDiff line numberDiff line change
@@ -134,13 +134,13 @@ const assert = require('assert');
134134
read.on('error', common.mustCall((err) => {
135135
assert.strictEqual(ticked, true);
136136
assert.strictEqual(read._readableState.errorEmitted, true);
137-
assert.strictEqual(read._readableState.errored, true);
137+
assert.strictEqual(read._readableState.errored, expected);
138138
assert.strictEqual(err, expected);
139139
}));
140140

141141
read.destroy();
142142
assert.strictEqual(read._readableState.errorEmitted, false);
143-
assert.strictEqual(read._readableState.errored, true);
143+
assert.strictEqual(read._readableState.errored, expected);
144144
assert.strictEqual(read.destroyed, true);
145145
ticked = true;
146146
}
@@ -190,15 +190,15 @@ const assert = require('assert');
190190
assert.strictEqual(err, expected);
191191
}));
192192

193-
assert.strictEqual(read._readableState.errored, false);
193+
assert.strictEqual(read._readableState.errored, null);
194194
assert.strictEqual(read._readableState.errorEmitted, false);
195195

196196
read.destroy(expected, common.mustCall(function(err) {
197-
assert.strictEqual(read._readableState.errored, true);
197+
assert.strictEqual(read._readableState.errored, expected);
198198
assert.strictEqual(err, expected);
199199
}));
200200
assert.strictEqual(read._readableState.errorEmitted, false);
201-
assert.strictEqual(read._readableState.errored, true);
201+
assert.strictEqual(read._readableState.errored, expected);
202202
ticked = true;
203203
}
204204

@@ -223,14 +223,14 @@ const assert = require('assert');
223223

224224
readable.destroy();
225225
assert.strictEqual(readable.destroyed, true);
226-
assert.strictEqual(readable._readableState.errored, false);
226+
assert.strictEqual(readable._readableState.errored, null);
227227
assert.strictEqual(readable._readableState.errorEmitted, false);
228228

229229
// Test case where `readable.destroy()` is called again with an error before
230230
// the `_destroy()` callback is called.
231231
readable.destroy(new Error('kaboom 2'));
232232
assert.strictEqual(readable._readableState.errorEmitted, false);
233-
assert.strictEqual(readable._readableState.errored, false);
233+
assert.strictEqual(readable._readableState.errored, null);
234234

235235
ticked = true;
236236
}
@@ -253,3 +253,18 @@ const assert = require('assert');
253253
assert.strictEqual(read.destroyed, true);
254254
read.read();
255255
}
256+
257+
{
258+
const read = new Readable({
259+
autoDestroy: false,
260+
read() {
261+
this.push(null);
262+
this.push('asd');
263+
}
264+
});
265+
266+
read.on('error', common.mustCall(() => {
267+
assert(read._readableState.errored);
268+
}));
269+
read.resume();
270+
}

test/parallel/test-stream-writable-destroy.js

+20-4
Original file line numberDiff line numberDiff line change
@@ -167,9 +167,10 @@ const assert = require('assert');
167167
assert.strictEqual(write._writableState.errorEmitted, true);
168168
}));
169169

170-
write.destroy(new Error('kaboom 1'));
170+
const expected = new Error('kaboom 1');
171+
write.destroy(expected);
171172
write.destroy(new Error('kaboom 2'));
172-
assert.strictEqual(write._writableState.errored, true);
173+
assert.strictEqual(write._writableState.errored, expected);
173174
assert.strictEqual(write._writableState.errorEmitted, false);
174175
assert.strictEqual(write.destroyed, true);
175176
ticked = true;
@@ -200,14 +201,14 @@ const assert = require('assert');
200201

201202
writable.destroy();
202203
assert.strictEqual(writable.destroyed, true);
203-
assert.strictEqual(writable._writableState.errored, false);
204+
assert.strictEqual(writable._writableState.errored, null);
204205
assert.strictEqual(writable._writableState.errorEmitted, false);
205206

206207
// Test case where `writable.destroy()` is called again with an error before
207208
// the `_destroy()` callback is called.
208209
writable.destroy(new Error('kaboom 2'));
209210
assert.strictEqual(writable._writableState.errorEmitted, false);
210-
assert.strictEqual(writable._writableState.errored, false);
211+
assert.strictEqual(writable._writableState.errored, null);
211212

212213
ticked = true;
213214
}
@@ -401,3 +402,18 @@ const assert = require('assert');
401402
}));
402403
write.destroy();
403404
}
405+
406+
{
407+
const write = new Writable({
408+
autoDestroy: false,
409+
write(chunk, enc, cb) {
410+
cb();
411+
cb();
412+
}
413+
});
414+
415+
write.on('error', common.mustCall(() => {
416+
assert(write._writableState.errored);
417+
}));
418+
write.write('asd');
419+
}

test/parallel/test-stream2-readable-wrap-error.js

+6-4
Original file line numberDiff line numberDiff line change
@@ -10,23 +10,25 @@ oldStream.pause = () => {};
1010
oldStream.resume = () => {};
1111

1212
{
13+
const err = new Error();
1314
const r = new Readable({ autoDestroy: true })
1415
.wrap(oldStream)
1516
.on('error', common.mustCall(() => {
1617
assert.strictEqual(r._readableState.errorEmitted, true);
17-
assert.strictEqual(r._readableState.errored, true);
18+
assert.strictEqual(r._readableState.errored, err);
1819
assert.strictEqual(r.destroyed, true);
1920
}));
20-
oldStream.emit('error', new Error());
21+
oldStream.emit('error', err);
2122
}
2223

2324
{
25+
const err = new Error();
2426
const r = new Readable({ autoDestroy: false })
2527
.wrap(oldStream)
2628
.on('error', common.mustCall(() => {
2729
assert.strictEqual(r._readableState.errorEmitted, true);
28-
assert.strictEqual(r._readableState.errored, true);
30+
assert.strictEqual(r._readableState.errored, err);
2931
assert.strictEqual(r.destroyed, false);
3032
}));
31-
oldStream.emit('error', new Error());
33+
oldStream.emit('error', err);
3234
}

0 commit comments

Comments
 (0)