Skip to content

Commit fa544c8

Browse files
mcollinatargos
authored andcommitted
stream: add support for captureRejection option
PR-URL: #27867 Reviewed-By: Benjamin Gruenbaum <[email protected]> Reviewed-By: James M Snell <[email protected]> Reviewed-By: Jeremiah Senkpiel <[email protected]> Reviewed-By: Anna Henningsen <[email protected]> Reviewed-By: Michaël Zasso <[email protected]>
1 parent 4265d57 commit fa544c8

File tree

4 files changed

+69
-4
lines changed

4 files changed

+69
-4
lines changed

lib/_stream_readable.js

+9-1
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ function Readable(options) {
180180
this._destroy = options.destroy;
181181
}
182182

183-
Stream.call(this);
183+
Stream.call(this, options);
184184
}
185185

186186
ObjectDefineProperty(Readable.prototype, 'destroyed', {
@@ -223,6 +223,14 @@ Readable.prototype._destroy = function(err, cb) {
223223
cb(err);
224224
};
225225

226+
Readable.prototype[EE.captureRejectionSymbol] = function(err) {
227+
// TODO(mcollina): remove the destroyed if once errorEmitted lands in
228+
// Readable.
229+
if (!this.destroyed) {
230+
this.destroy(err);
231+
}
232+
};
233+
226234
// Manually shove something into the read() buffer.
227235
// This returns true if the highWaterMark has not been hit yet,
228236
// similar to how Writable.write() returns true if you should

lib/_stream_writable.js

+6-1
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ module.exports = Writable;
3737
Writable.WritableState = WritableState;
3838

3939
const internalUtil = require('internal/util');
40+
const EE = require('events');
4041
const Stream = require('stream');
4142
const { Buffer } = require('buffer');
4243
const destroyImpl = require('internal/streams/destroy');
@@ -250,7 +251,7 @@ function Writable(options) {
250251
this._final = options.final;
251252
}
252253

253-
Stream.call(this);
254+
Stream.call(this, options);
254255
}
255256

256257
// Otherwise people can pipe Writable streams, which is just wrong.
@@ -782,3 +783,7 @@ Writable.prototype._undestroy = destroyImpl.undestroy;
782783
Writable.prototype._destroy = function(err, cb) {
783784
cb(err);
784785
};
786+
787+
Writable.prototype[EE.captureRejectionSymbol] = function(err) {
788+
this.destroy(err);
789+
};

lib/internal/streams/legacy.js

+2-2
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ const {
66

77
const EE = require('events');
88

9-
function Stream() {
10-
EE.call(this);
9+
function Stream(opts) {
10+
EE.call(this, opts);
1111
}
1212
ObjectSetPrototypeOf(Stream.prototype, EE.prototype);
1313
ObjectSetPrototypeOf(Stream, EE);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const stream = require('stream');
5+
const assert = require('assert');
6+
7+
{
8+
const r = new stream.Readable({
9+
captureRejections: true,
10+
read() {
11+
this.push('hello');
12+
this.push('world');
13+
this.push(null);
14+
}
15+
});
16+
17+
const err = new Error('kaboom');
18+
19+
r.on('error', common.mustCall((_err) => {
20+
assert.strictEqual(err, _err);
21+
assert.strictEqual(r.destroyed, true);
22+
}));
23+
24+
r.on('data', async () => {
25+
throw err;
26+
});
27+
}
28+
29+
{
30+
const w = new stream.Writable({
31+
captureRejections: true,
32+
highWaterMark: 1,
33+
write(chunk, enc, cb) {
34+
cb();
35+
}
36+
});
37+
38+
const err = new Error('kaboom');
39+
40+
w.write('hello', () => {
41+
w.write('world');
42+
});
43+
44+
w.on('error', common.mustCall((_err) => {
45+
assert.strictEqual(err, _err);
46+
assert.strictEqual(w.destroyed, true);
47+
}));
48+
49+
w.on('drain', common.mustCall(async () => {
50+
throw err;
51+
}, 2));
52+
}

0 commit comments

Comments
 (0)