Skip to content

Commit 4e3f6f3

Browse files
committedJul 11, 2020
stream: cleanup and fix Readable.wrap
Cleans up Readable.wrap and also ensures destroy is called for certain events. PR-URL: #34204 Reviewed-By: Anna Henningsen <[email protected]> Reviewed-By: Luigi Pinca <[email protected]> Reviewed-By: James M Snell <[email protected]>
1 parent 3751662 commit 4e3f6f3

File tree

3 files changed

+56
-52
lines changed

3 files changed

+56
-52
lines changed
 

‎lib/_stream_readable.js

+19-52
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ const {
2626
NumberIsInteger,
2727
NumberIsNaN,
2828
ObjectDefineProperties,
29+
ObjectKeys,
2930
ObjectSetPrototypeOf,
3031
Set,
3132
SymbolAsyncIterator,
@@ -1007,83 +1008,49 @@ function flow(stream) {
10071008
// This is *not* part of the readable stream interface.
10081009
// It is an ugly unfortunate mess of history.
10091010
Readable.prototype.wrap = function(stream) {
1010-
const state = this._readableState;
10111011
let paused = false;
10121012

1013-
stream.on('end', () => {
1014-
debug('wrapped end');
1015-
if (state.decoder && !state.ended) {
1016-
const chunk = state.decoder.end();
1017-
if (chunk && chunk.length)
1018-
this.push(chunk);
1019-
}
1020-
1021-
this.push(null);
1022-
});
1013+
// TODO (ronag): Should this.destroy(err) emit
1014+
// 'error' on the wrapped stream? Would require
1015+
// a static factory method, e.g. Readable.wrap(stream).
10231016

10241017
stream.on('data', (chunk) => {
1025-
debug('wrapped data');
1026-
if (state.decoder)
1027-
chunk = state.decoder.write(chunk);
1028-
1029-
// Don't skip over falsy values in objectMode.
1030-
if (state.objectMode && (chunk === null || chunk === undefined))
1031-
return;
1032-
else if (!state.objectMode && (!chunk || !chunk.length))
1033-
return;
1034-
1035-
const ret = this.push(chunk);
1036-
if (!ret) {
1018+
if (!this.push(chunk) && stream.pause) {
10371019
paused = true;
10381020
stream.pause();
10391021
}
10401022
});
10411023

1042-
// Proxy all the other methods. Important when wrapping filters and duplexes.
1043-
for (const i in stream) {
1044-
if (this[i] === undefined && typeof stream[i] === 'function') {
1045-
this[i] = function methodWrap(method) {
1046-
return function methodWrapReturnFunction() {
1047-
return stream[method].apply(stream, arguments);
1048-
};
1049-
}(i);
1050-
}
1051-
}
1024+
stream.on('end', () => {
1025+
this.push(null);
1026+
});
10521027

10531028
stream.on('error', (err) => {
10541029
errorOrDestroy(this, err);
10551030
});
10561031

10571032
stream.on('close', () => {
1058-
// TODO(ronag): Update readable state?
1059-
this.emit('close');
1033+
this.destroy();
10601034
});
10611035

10621036
stream.on('destroy', () => {
1063-
// TODO(ronag): this.destroy()?
1064-
this.emit('destroy');
1037+
this.destroy();
10651038
});
10661039

1067-
stream.on('pause', () => {
1068-
// TODO(ronag): this.pause()?
1069-
this.emit('pause');
1070-
});
1071-
1072-
stream.on('resume', () => {
1073-
// TODO(ronag): this.resume()?
1074-
this.emit('resume');
1075-
});
1076-
1077-
// When we try to consume some more bytes, simply unpause the
1078-
// underlying stream.
1079-
this._read = (n) => {
1080-
debug('wrapped _read', n);
1081-
if (paused) {
1040+
this._read = () => {
1041+
if (paused && stream.resume) {
10821042
paused = false;
10831043
stream.resume();
10841044
}
10851045
};
10861046

1047+
// Proxy all the other methods. Important when wrapping filters and duplexes.
1048+
for (const i of ObjectKeys(stream)) {
1049+
if (this[i] === undefined && typeof stream[i] === 'function') {
1050+
this[i] = stream[i].bind(stream);
1051+
}
1052+
}
1053+
10871054
return this;
10881055
};
10891056

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
'use strict';
2+
const common = require('../common');
3+
4+
const Readable = require('_stream_readable');
5+
const EE = require('events').EventEmitter;
6+
7+
const oldStream = new EE();
8+
oldStream.pause = () => {};
9+
oldStream.resume = () => {};
10+
11+
{
12+
new Readable({
13+
autoDestroy: false,
14+
destroy: common.mustCall()
15+
})
16+
.wrap(oldStream);
17+
oldStream.emit('destroy');
18+
}
19+
20+
{
21+
new Readable({
22+
autoDestroy: false,
23+
destroy: common.mustCall()
24+
})
25+
.wrap(oldStream);
26+
oldStream.emit('close');
27+
}

‎test/parallel/test-stream2-readable-wrap.js

+10
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,16 @@ function runTest(highWaterMark, objectMode, produce) {
4444
flow();
4545
};
4646

47+
// Make sure pause is only emitted once.
48+
let pausing = false;
49+
r.on('pause', () => {
50+
assert.strictEqual(pausing, false);
51+
pausing = true;
52+
process.nextTick(() => {
53+
pausing = false;
54+
});
55+
});
56+
4757
let flowing;
4858
let chunks = 10;
4959
let oldEnded = false;

0 commit comments

Comments
 (0)
Please sign in to comment.