Skip to content

Commit e40b78c

Browse files
ronagMylesBorins
authored andcommitted
stream: destroy wrapped streams on error
Stream should be destroyed and update state accordingly when the wrapped stream emits error. Does some additional cleanup with future TODOs that might be worth looking into. Backport-PR-URL: #35557 PR-URL: #34102 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Luigi Pinca <[email protected]> Reviewed-By: Anna Henningsen <[email protected]>
1 parent 3e297cf commit e40b78c

File tree

2 files changed

+54
-5
lines changed

2 files changed

+54
-5
lines changed

lib/_stream_readable.js

+23-5
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@ ObjectSetPrototypeOf(Readable.prototype, Stream.prototype);
6666
ObjectSetPrototypeOf(Readable, Stream);
6767

6868
const { errorOrDestroy } = destroyImpl;
69-
const kProxyEvents = ['error', 'close', 'destroy', 'pause', 'resume'];
7069

7170
function prependListener(emitter, event, fn) {
7271
// Sadly this is not cacheable as some libraries bundle their own
@@ -1055,10 +1054,29 @@ Readable.prototype.wrap = function(stream) {
10551054
}
10561055
}
10571056

1058-
// Proxy certain important events.
1059-
for (const kProxyEvent of kProxyEvents) {
1060-
stream.on(kProxyEvent, this.emit.bind(this, kProxyEvent));
1061-
}
1057+
stream.on('error', (err) => {
1058+
errorOrDestroy(this, err);
1059+
});
1060+
1061+
stream.on('close', () => {
1062+
// TODO(ronag): Update readable state?
1063+
this.emit('close');
1064+
});
1065+
1066+
stream.on('destroy', () => {
1067+
// TODO(ronag): this.destroy()?
1068+
this.emit('destroy');
1069+
});
1070+
1071+
stream.on('pause', () => {
1072+
// TODO(ronag): this.pause()?
1073+
this.emit('pause');
1074+
});
1075+
1076+
stream.on('resume', () => {
1077+
// TODO(ronag): this.resume()?
1078+
this.emit('resume');
1079+
});
10621080

10631081
// When we try to consume some more bytes, simply unpause the
10641082
// underlying stream.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
'use strict';
2+
const common = require('../common');
3+
const assert = require('assert');
4+
5+
const Readable = require('_stream_readable');
6+
const EE = require('events').EventEmitter;
7+
8+
class LegacyStream extends EE {
9+
pause() {}
10+
resume() {}
11+
}
12+
13+
{
14+
const oldStream = new LegacyStream();
15+
const r = new Readable({ autoDestroy: true })
16+
.wrap(oldStream)
17+
.on('error', common.mustCall(() => {
18+
assert.strictEqual(r.destroyed, true);
19+
}));
20+
oldStream.emit('error', new Error());
21+
}
22+
23+
{
24+
const oldStream = new LegacyStream();
25+
const r = new Readable({ autoDestroy: false })
26+
.wrap(oldStream)
27+
.on('error', common.mustCall(() => {
28+
assert.strictEqual(r.destroyed, false);
29+
}));
30+
oldStream.emit('error', new Error());
31+
}

0 commit comments

Comments
 (0)