Skip to content

Commit 89bc571

Browse files
mcollinatargos
authored andcommitted
stream: add support for captureRejection option
PR-URL: #27867 Reviewed-By: Benjamin Gruenbaum <[email protected]> Reviewed-By: James M Snell <[email protected]> Reviewed-By: Jeremiah Senkpiel <[email protected]> Reviewed-By: Anna Henningsen <[email protected]> Reviewed-By: Michaël Zasso <[email protected]>
1 parent 220a600 commit 89bc571

File tree

4 files changed

+69
-4
lines changed

4 files changed

+69
-4
lines changed

lib/_stream_readable.js

+9-1
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ function Readable(options) {
190190
this._destroy = options.destroy;
191191
}
192192

193-
Stream.call(this);
193+
Stream.call(this, options);
194194
}
195195

196196
ObjectDefineProperty(Readable.prototype, 'destroyed', {
@@ -233,6 +233,14 @@ Readable.prototype._destroy = function(err, cb) {
233233
cb(err);
234234
};
235235

236+
Readable.prototype[EE.captureRejectionSymbol] = function(err) {
237+
// TODO(mcollina): remove the destroyed if once errorEmitted lands in
238+
// Readable.
239+
if (!this.destroyed) {
240+
this.destroy(err);
241+
}
242+
};
243+
236244
// Manually shove something into the read() buffer.
237245
// This returns true if the highWaterMark has not been hit yet,
238246
// similar to how Writable.write() returns true if you should

lib/_stream_writable.js

+6-1
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ module.exports = Writable;
3535
Writable.WritableState = WritableState;
3636

3737
const internalUtil = require('internal/util');
38+
const EE = require('events');
3839
const Stream = require('stream');
3940
const { Buffer } = require('buffer');
4041
const destroyImpl = require('internal/streams/destroy');
@@ -254,7 +255,7 @@ function Writable(options) {
254255
this._final = options.final;
255256
}
256257

257-
Stream.call(this);
258+
Stream.call(this, options);
258259
}
259260

260261
// Otherwise people can pipe Writable streams, which is just wrong.
@@ -804,3 +805,7 @@ Writable.prototype._undestroy = destroyImpl.undestroy;
804805
Writable.prototype._destroy = function(err, cb) {
805806
cb(err);
806807
};
808+
809+
Writable.prototype[EE.captureRejectionSymbol] = function(err) {
810+
this.destroy(err);
811+
};

lib/internal/streams/legacy.js

+2-2
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ const {
66

77
const EE = require('events');
88

9-
function Stream() {
10-
EE.call(this);
9+
function Stream(opts) {
10+
EE.call(this, opts);
1111
}
1212
ObjectSetPrototypeOf(Stream.prototype, EE.prototype);
1313
ObjectSetPrototypeOf(Stream, EE);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const stream = require('stream');
5+
const assert = require('assert');
6+
7+
{
8+
const r = new stream.Readable({
9+
captureRejections: true,
10+
read() {
11+
this.push('hello');
12+
this.push('world');
13+
this.push(null);
14+
}
15+
});
16+
17+
const err = new Error('kaboom');
18+
19+
r.on('error', common.mustCall((_err) => {
20+
assert.strictEqual(err, _err);
21+
assert.strictEqual(r.destroyed, true);
22+
}));
23+
24+
r.on('data', async () => {
25+
throw err;
26+
});
27+
}
28+
29+
{
30+
const w = new stream.Writable({
31+
captureRejections: true,
32+
highWaterMark: 1,
33+
write(chunk, enc, cb) {
34+
cb();
35+
}
36+
});
37+
38+
const err = new Error('kaboom');
39+
40+
w.write('hello', () => {
41+
w.write('world');
42+
});
43+
44+
w.on('error', common.mustCall((_err) => {
45+
assert.strictEqual(err, _err);
46+
assert.strictEqual(w.destroyed, true);
47+
}));
48+
49+
w.on('drain', common.mustCall(async () => {
50+
throw err;
51+
}, 2));
52+
}

0 commit comments

Comments
 (0)