Skip to content

Commit 8914f7b

Browse files
mscdexjasnell
authored andcommitted
stream: improve readable push performance
PR-URL: #13113 Reviewed-By: James M Snell <[email protected]> Reviewed-By: Matteo Collina <[email protected]>
1 parent f146fe4 commit 8914f7b

File tree

2 files changed

+81
-72
lines changed

2 files changed

+81
-72
lines changed

benchmark/streams/readable-boundaryread.js

+7-5
Original file line numberDiff line numberDiff line change
@@ -4,20 +4,22 @@ const common = require('../common');
44
const Readable = require('stream').Readable;
55

66
const bench = common.createBenchmark(main, {
7-
n: [200e1]
7+
n: [200e1],
8+
type: ['string', 'buffer']
89
});
910

1011
function main(conf) {
1112
const n = +conf.n;
12-
const b = new Buffer(32);
1313
const s = new Readable();
14-
function noop() {}
15-
s._read = noop;
14+
var data = 'a'.repeat(32);
15+
if (conf.type === 'buffer')
16+
data = Buffer.from(data);
17+
s._read = function() {};
1618

1719
bench.start();
1820
for (var k = 0; k < n; ++k) {
1921
for (var i = 0; i < 1e4; ++i)
20-
s.push(b);
22+
s.push(data);
2123
while (s.read(32));
2224
}
2325
bench.end(n);

lib/_stream_readable.js

+74-67
Original file line numberDiff line numberDiff line change
@@ -141,81 +141,97 @@ function Readable(options) {
141141
// write() some more.
142142
Readable.prototype.push = function(chunk, encoding) {
143143
var state = this._readableState;
144-
145-
if (!state.objectMode && typeof chunk === 'string') {
146-
encoding = encoding || state.defaultEncoding;
147-
if (encoding !== state.encoding) {
148-
chunk = Buffer.from(chunk, encoding);
149-
encoding = '';
144+
var skipChunkCheck;
145+
146+
if (!state.objectMode) {
147+
if (typeof chunk === 'string') {
148+
encoding = encoding || state.defaultEncoding;
149+
if (encoding !== state.encoding) {
150+
chunk = Buffer.from(chunk, encoding);
151+
encoding = '';
152+
}
153+
skipChunkCheck = true;
150154
}
155+
} else {
156+
skipChunkCheck = true;
151157
}
152158

153-
return readableAddChunk(this, state, chunk, encoding, false);
159+
return readableAddChunk(this, chunk, encoding, false, skipChunkCheck);
154160
};
155161

156162
// Unshift should *always* be something directly out of read()
157163
Readable.prototype.unshift = function(chunk) {
158-
var state = this._readableState;
159-
return readableAddChunk(this, state, chunk, '', true);
160-
};
161-
162-
Readable.prototype.isPaused = function() {
163-
return this._readableState.flowing === false;
164+
return readableAddChunk(this, chunk, null, true, false);
164165
};
165166

166-
function readableAddChunk(stream, state, chunk, encoding, addToFront) {
167-
var er = chunkInvalid(state, chunk);
168-
if (er) {
169-
stream.emit('error', er);
170-
} else if (chunk === null) {
167+
function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) {
168+
var state = stream._readableState;
169+
if (chunk === null) {
171170
state.reading = false;
172171
onEofChunk(stream, state);
173-
} else if (state.objectMode || chunk && chunk.length > 0) {
174-
if (state.ended && !addToFront) {
175-
const e = new Error('stream.push() after EOF');
176-
stream.emit('error', e);
177-
} else if (state.endEmitted && addToFront) {
178-
const e = new Error('stream.unshift() after end event');
179-
stream.emit('error', e);
180-
} else {
181-
var skipAdd;
182-
if (state.decoder && !addToFront && !encoding) {
183-
chunk = state.decoder.write(chunk);
184-
skipAdd = (!state.objectMode && chunk.length === 0);
185-
}
186-
187-
if (!addToFront)
172+
} else {
173+
var er;
174+
if (!skipChunkCheck)
175+
er = chunkInvalid(state, chunk);
176+
if (er) {
177+
stream.emit('error', er);
178+
} else if (state.objectMode || chunk && chunk.length > 0) {
179+
if (addToFront) {
180+
if (state.endEmitted)
181+
stream.emit('error', new Error('stream.unshift() after end event'));
182+
else
183+
addChunk(stream, state, chunk, true);
184+
} else if (state.ended) {
185+
stream.emit('error', new Error('stream.push() after EOF'));
186+
} else {
188187
state.reading = false;
189-
190-
// Don't add to the buffer if we've decoded to an empty string chunk and
191-
// we're not in object mode
192-
if (!skipAdd) {
193-
// if we want the data now, just emit it.
194-
if (state.flowing && state.length === 0 && !state.sync) {
195-
stream.emit('data', chunk);
196-
stream.read(0);
197-
} else {
198-
// update the buffer info.
199-
state.length += state.objectMode ? 1 : chunk.length;
200-
if (addToFront)
201-
state.buffer.unshift(chunk);
188+
if (state.decoder && !encoding) {
189+
chunk = state.decoder.write(chunk);
190+
if (state.objectMode || chunk.length !== 0)
191+
addChunk(stream, state, chunk, false);
202192
else
203-
state.buffer.push(chunk);
204-
205-
if (state.needReadable)
206-
emitReadable(stream);
193+
maybeReadMore(stream, state);
194+
} else {
195+
addChunk(stream, state, chunk, false);
207196
}
208197
}
209-
210-
maybeReadMore(stream, state);
198+
} else if (!addToFront) {
199+
state.reading = false;
211200
}
212-
} else if (!addToFront) {
213-
state.reading = false;
214201
}
215202

216203
return needMoreData(state);
217204
}
218205

206+
function addChunk(stream, state, chunk, addToFront) {
207+
if (state.flowing && state.length === 0 && !state.sync) {
208+
stream.emit('data', chunk);
209+
stream.read(0);
210+
} else {
211+
// update the buffer info.
212+
state.length += state.objectMode ? 1 : chunk.length;
213+
if (addToFront)
214+
state.buffer.unshift(chunk);
215+
else
216+
state.buffer.push(chunk);
217+
218+
if (state.needReadable)
219+
emitReadable(stream);
220+
}
221+
maybeReadMore(stream, state);
222+
}
223+
224+
function chunkInvalid(state, chunk) {
225+
var er;
226+
if (!(chunk instanceof Buffer) &&
227+
typeof chunk !== 'string' &&
228+
chunk !== undefined &&
229+
!state.objectMode) {
230+
er = new TypeError('Invalid non-string/buffer chunk');
231+
}
232+
return er;
233+
}
234+
219235

220236
// if it's past the high water mark, we can push in some more.
221237
// Also, if we have no data yet, we can stand some
@@ -231,6 +247,10 @@ function needMoreData(state) {
231247
state.length === 0);
232248
}
233249

250+
Readable.prototype.isPaused = function() {
251+
return this._readableState.flowing === false;
252+
};
253+
234254
// backwards compatibility.
235255
Readable.prototype.setEncoding = function(enc) {
236256
if (!StringDecoder)
@@ -402,19 +422,6 @@ Readable.prototype.read = function(n) {
402422
return ret;
403423
};
404424

405-
function chunkInvalid(state, chunk) {
406-
var er = null;
407-
if (!(chunk instanceof Buffer) &&
408-
typeof chunk !== 'string' &&
409-
chunk !== null &&
410-
chunk !== undefined &&
411-
!state.objectMode) {
412-
er = new TypeError('Invalid non-string/buffer chunk');
413-
}
414-
return er;
415-
}
416-
417-
418425
function onEofChunk(stream, state) {
419426
if (state.ended) return;
420427
if (state.decoder) {

0 commit comments

Comments
 (0)