Skip to content

Commit f750a74

Browse files
committed
stream: fix legacy pipe error handling
Fixes: #35237
1 parent fb88257 commit f750a74

File tree

5 files changed

+38
-27
lines changed

5 files changed

+38
-27
lines changed

lib/internal/streams/legacy.js

+23-4
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
'use strict';
22

33
const {
4+
ArrayIsArray,
45
ObjectSetPrototypeOf,
56
} = primordials;
67

@@ -58,12 +59,12 @@ Stream.prototype.pipe = function(dest, options) {
5859
function onerror(er) {
5960
cleanup();
6061
if (EE.listenerCount(this, 'error') === 0) {
61-
throw er; // Unhandled stream error in pipe.
62+
this.emit('error', er);
6263
}
6364
}
6465

65-
source.on('error', onerror);
66-
dest.on('error', onerror);
66+
prependListener(source, 'error', onerror);
67+
prependListener(dest, 'error', onerror);
6768

6869
// Remove all the event listeners that were added.
6970
function cleanup() {
@@ -92,4 +93,22 @@ Stream.prototype.pipe = function(dest, options) {
9293
return dest;
9394
};
9495

95-
module.exports = Stream;
96+
function prependListener(emitter, event, fn) {
97+
// Sadly this is not cacheable as some libraries bundle their own
98+
// event emitter implementation with them.
99+
if (typeof emitter.prependListener === 'function')
100+
return emitter.prependListener(event, fn);
101+
102+
// This is a hack to make sure that our error handler is attached before any
103+
// userland ones. NEVER DO THIS. This is here only because this code needs
104+
// to continue to work with older versions of Node.js that do not include
105+
// the prependListener() method. The goal is to eventually remove this hack.
106+
if (!emitter._events || !emitter._events[event])
107+
emitter.on(event, fn);
108+
else if (ArrayIsArray(emitter._events[event]))
109+
emitter._events[event].unshift(fn);
110+
else
111+
emitter._events[event] = [fn, emitter._events[event]];
112+
}
113+
114+
module.exports = { Stream, prependListener };

lib/internal/streams/readable.js

+1-20
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
'use strict';
2323

2424
const {
25-
ArrayIsArray,
2625
NumberIsInteger,
2726
NumberIsNaN,
2827
ObjectDefineProperties,
@@ -38,7 +37,7 @@ module.exports = Readable;
3837
Readable.ReadableState = ReadableState;
3938

4039
const EE = require('events');
41-
const Stream = require('internal/streams/legacy');
40+
const { Stream, prependListener } = require('internal/streams/legacy');
4241
const { Buffer } = require('buffer');
4342

4443
let debug = require('internal/util/debuglog').debuglog('stream', (fn) => {
@@ -69,24 +68,6 @@ function nop() {}
6968

7069
const { errorOrDestroy } = destroyImpl;
7170

72-
function prependListener(emitter, event, fn) {
73-
// Sadly this is not cacheable as some libraries bundle their own
74-
// event emitter implementation with them.
75-
if (typeof emitter.prependListener === 'function')
76-
return emitter.prependListener(event, fn);
77-
78-
// This is a hack to make sure that our error handler is attached before any
79-
// userland ones. NEVER DO THIS. This is here only because this code needs
80-
// to continue to work with older versions of Node.js that do not include
81-
// the prependListener() method. The goal is to eventually remove this hack.
82-
if (!emitter._events || !emitter._events[event])
83-
emitter.on(event, fn);
84-
else if (ArrayIsArray(emitter._events[event]))
85-
emitter._events[event].unshift(fn);
86-
else
87-
emitter._events[event] = [fn, emitter._events[event]];
88-
}
89-
9071
function ReadableState(options, stream, isDuplex) {
9172
// Duplex streams are both readable and writable, but share
9273
// the same options object.

lib/internal/streams/writable.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ module.exports = Writable;
3838
Writable.WritableState = WritableState;
3939

4040
const EE = require('events');
41-
const Stream = require('internal/streams/legacy');
41+
const Stream = require('internal/streams/legacy').Stream;
4242
const { Buffer } = require('buffer');
4343
const destroyImpl = require('internal/streams/destroy');
4444
const {

lib/stream.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ const internalBuffer = require('internal/buffer');
3636
// Lazy loaded
3737
let promises = null;
3838

39-
const Stream = module.exports = require('internal/streams/legacy');
39+
const Stream = module.exports = require('internal/streams/legacy').Stream;
4040
Stream.Readable = require('internal/streams/readable');
4141
Stream.Writable = require('internal/streams/writable');
4242
Stream.Duplex = require('internal/streams/duplex');

test/parallel/test-stream-pipe-error-handling.js

+12-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
'use strict';
2323
const common = require('../common');
2424
const assert = require('assert');
25-
const Stream = require('stream').Stream;
25+
const { Stream, PassThrough } = require('stream');
2626

2727
{
2828
const source = new Stream();
@@ -108,3 +108,14 @@ const Stream = require('stream').Stream;
108108
w.removeListener('error', () => {});
109109
removed = true;
110110
}
111+
112+
{
113+
const destination = new PassThrough();
114+
destination.once('error', common.mustCall());
115+
116+
const stream = new Stream();
117+
stream
118+
.pipe(destination);
119+
120+
destination.destroy(new Error('this should be handled'));
121+
}

0 commit comments

Comments
 (0)