Skip to content

Commit 1e0f331

Browse files
committed
stream: always defer 'readable' with nextTick
Emit 'readable' always in the next tick, resulting in a single call to _read() per microtick. This removes the need for the user to implement buffering if they wanted to call this.push() multiple times in an asynchronous fashion, as this.push() triggers this._read() call. PR-URL: #17979 Fixes: #3203 Reviewed-By: James M Snell <[email protected]> Reviewed-By: Anna Henningsen <[email protected]> Reviewed-By: Benjamin Gruenbaum <[email protected]>
1 parent 800caac commit 1e0f331

9 files changed

+263
-57
lines changed

doc/api/stream.md

+15
Original file line numberDiff line numberDiff line change
@@ -747,6 +747,12 @@ The listener callback will be passed a single `Error` object.
747747
##### Event: 'readable'
748748
<!-- YAML
749749
added: v0.9.4
750+
changes:
751+
- version: REPLACEME
752+
pr-url: https://github.com/nodejs/node/pull/17979
753+
description: >
754+
'readable' is always emitted in the next tick after
755+
.push() is called
750756
-->
751757

752758
The `'readable'` event is emitted when there is data available to be read from
@@ -1647,6 +1653,13 @@ const myReadable = new Readable({
16471653
```
16481654

16491655
#### readable.\_read(size)
1656+
<!-- YAML
1657+
added: v0.9.4
1658+
changes:
1659+
- version: REPLACEME
1660+
pr-url: https://github.com/nodejs/node/pull/17979
1661+
description: call _read() only once per microtick
1662+
-->
16501663

16511664
* `size` {number} Number of bytes to read asynchronously
16521665

@@ -1666,6 +1679,8 @@ additional data onto the queue.
16661679

16671680
*Note*: Once the `readable._read()` method has been called, it will not be
16681681
called again until the [`readable.push()`][stream-push] method is called.
1682+
`readable._read()` is guaranteed to be called only once within a
1683+
synchronous execution, i.e. a microtick.
16691684

16701685
The `size` argument is advisory. For implementations where a "read" is a
16711686
single operation that returns data can use the `size` argument to determine how

lib/_stream_readable.js

+14-8
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,6 @@ function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) {
267267
function addChunk(stream, state, chunk, addToFront) {
268268
if (state.flowing && state.length === 0 && !state.sync) {
269269
stream.emit('data', chunk);
270-
stream.read(0);
271270
} else {
272271
// update the buffer info.
273272
state.length += state.objectMode ? 1 : chunk.length;
@@ -496,7 +495,11 @@ function onEofChunk(stream, state) {
496495
state.ended = true;
497496

498497
// emit 'readable' now to make sure it gets picked up.
499-
emitReadable(stream);
498+
state.needReadable = false;
499+
if (!state.emittedReadable) {
500+
state.emittedReadable = true;
501+
emitReadable_(stream);
502+
}
500503
}
501504

502505
// Don't emit readable right away in sync mode, because this can trigger
@@ -508,16 +511,15 @@ function emitReadable(stream) {
508511
if (!state.emittedReadable) {
509512
debug('emitReadable', state.flowing);
510513
state.emittedReadable = true;
511-
if (state.sync)
512-
process.nextTick(emitReadable_, stream);
513-
else
514-
emitReadable_(stream);
514+
process.nextTick(emitReadable_, stream);
515515
}
516516
}
517517

518518
function emitReadable_(stream) {
519+
var state = stream._readableState;
519520
debug('emit readable');
520521
stream.emit('readable');
522+
state.needReadable = !state.flowing && !state.ended;
521523
flow(stream);
522524
}
523525

@@ -537,7 +539,7 @@ function maybeReadMore(stream, state) {
537539

538540
function maybeReadMore_(stream, state) {
539541
var len = state.length;
540-
while (!state.reading && !state.flowing && !state.ended &&
542+
while (!state.reading && !state.ended &&
541543
state.length < state.highWaterMark) {
542544
debug('maybeReadMore read 0');
543545
stream.read(0);
@@ -644,6 +646,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
644646
debug('ondata');
645647
increasedAwaitDrain = false;
646648
var ret = dest.write(chunk);
649+
debug('dest.write', ret);
647650
if (false === ret && !increasedAwaitDrain) {
648651
// If the user unpiped during `dest.write()`, it is possible
649652
// to get stuck in a permanently paused state if that write
@@ -824,8 +827,8 @@ function resume(stream, state) {
824827
}
825828

826829
function resume_(stream, state) {
830+
debug('resume', state.reading);
827831
if (!state.reading) {
828-
debug('resume read 0');
829832
stream.read(0);
830833
}
831834

@@ -1087,13 +1090,16 @@ function copyFromBuffer(n, list) {
10871090
function endReadable(stream) {
10881091
var state = stream._readableState;
10891092

1093+
debug('endReadable', state.endEmitted);
10901094
if (!state.endEmitted) {
10911095
state.ended = true;
10921096
process.nextTick(endReadableNT, state, stream);
10931097
}
10941098
}
10951099

10961100
function endReadableNT(state, stream) {
1101+
debug('endReadableNT', state.endEmitted, state.length);
1102+
10971103
// Check that we didn't get one last unshift.
10981104
if (!state.endEmitted && state.length === 0) {
10991105
state.endEmitted = true;

test/parallel/test-net-end-close.js

+8-4
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,22 @@ const uv = process.binding('uv');
88
const s = new net.Socket({
99
handle: {
1010
readStart: function() {
11-
process.nextTick(() => this.onread(uv.UV_EOF, null));
11+
setImmediate(() => this.onread(uv.UV_EOF, null));
1212
},
13-
close: (cb) => process.nextTick(cb)
13+
close: (cb) => setImmediate(cb)
1414
},
1515
writable: false
1616
});
1717
assert.strictEqual(s, s.resume());
1818

1919
const events = [];
2020

21-
s.on('end', () => events.push('end'));
22-
s.on('close', () => events.push('close'));
21+
s.on('end', () => {
22+
events.push('end');
23+
});
24+
s.on('close', () => {
25+
events.push('close');
26+
});
2327

2428
process.on('exit', () => {
2529
assert.deepStrictEqual(events, [ 'end', 'close' ]);

test/parallel/test-stream-pipe-await-drain-push-while-write.js

+12-20
Original file line numberDiff line numberDiff line change
@@ -3,32 +3,24 @@ const common = require('../common');
33
const stream = require('stream');
44
const assert = require('assert');
55

6-
const awaitDrainStates = [
7-
1, // after first chunk before callback
8-
1, // after second chunk before callback
9-
0 // resolving chunk pushed after first chunk, awaitDrain is decreased
10-
];
11-
12-
// A writable stream which pushes data onto the stream which pipes into it,
13-
// but only the first time it's written to. Since it's not paused at this time,
14-
// a second write will occur. If the pipe increases awaitDrain twice, we'll
15-
// never get subsequent chunks because 'drain' is only emitted once.
166
const writable = new stream.Writable({
177
write: common.mustCall(function(chunk, encoding, cb) {
18-
if (chunk.length === 32 * 1024) { // first chunk
19-
const beforePush = readable._readableState.awaitDrain;
20-
readable.push(Buffer.alloc(34 * 1024)); // above hwm
21-
// We should check if awaitDrain counter is increased.
22-
const afterPush = readable._readableState.awaitDrain;
23-
assert.strictEqual(afterPush - beforePush, 1,
24-
'Counter is not increased for awaitDrain');
25-
}
26-
278
assert.strictEqual(
28-
awaitDrainStates.shift(),
299
readable._readableState.awaitDrain,
10+
0,
3011
'State variable awaitDrain is not correct.'
3112
);
13+
14+
if (chunk.length === 32 * 1024) { // first chunk
15+
readable.push(Buffer.alloc(34 * 1024)); // above hwm
16+
// We should check if awaitDrain counter is increased in the next
17+
// tick, because awaitDrain is incremented after this method finished
18+
process.nextTick(() => {
19+
assert.strictEqual(readable._readableState.awaitDrain, 1,
20+
'Counter is not increased for awaitDrain');
21+
});
22+
}
23+
3224
cb();
3325
}, 3)
3426
});

test/parallel/test-stream-readable-emittedReadable.js

+10-7
Original file line numberDiff line numberDiff line change
@@ -10,30 +10,33 @@ const readable = new Readable({
1010
// Initialized to false.
1111
assert.strictEqual(readable._readableState.emittedReadable, false);
1212

13+
const expected = [Buffer.from('foobar'), Buffer.from('quo'), null];
1314
readable.on('readable', common.mustCall(() => {
1415
// emittedReadable should be true when the readable event is emitted
1516
assert.strictEqual(readable._readableState.emittedReadable, true);
16-
readable.read();
17+
assert.deepStrictEqual(readable.read(), expected.shift());
1718
// emittedReadable is reset to false during read()
1819
assert.strictEqual(readable._readableState.emittedReadable, false);
19-
}, 4));
20+
}, 3));
2021

2122
// When the first readable listener is just attached,
2223
// emittedReadable should be false
2324
assert.strictEqual(readable._readableState.emittedReadable, false);
2425

25-
// Each one of these should trigger a readable event.
26+
// These trigger a single 'readable', as things are batched up
2627
process.nextTick(common.mustCall(() => {
2728
readable.push('foo');
2829
}));
2930
process.nextTick(common.mustCall(() => {
3031
readable.push('bar');
3132
}));
32-
process.nextTick(common.mustCall(() => {
33+
34+
// these triggers two readable events
35+
setImmediate(common.mustCall(() => {
3336
readable.push('quo');
34-
}));
35-
process.nextTick(common.mustCall(() => {
36-
readable.push(null);
37+
process.nextTick(common.mustCall(() => {
38+
readable.push(null);
39+
}));
3740
}));
3841

3942
const noRead = new Readable({

test/parallel/test-stream-readable-needReadable.js

+12-11
Original file line numberDiff line numberDiff line change
@@ -38,16 +38,17 @@ asyncReadable.on('readable', common.mustCall(() => {
3838
// then we need to notify the reader on future changes.
3939
assert.strictEqual(asyncReadable._readableState.needReadable, true);
4040
}
41-
}, 3));
41+
}, 2));
4242

4343
process.nextTick(common.mustCall(() => {
4444
asyncReadable.push('foooo');
4545
}));
4646
process.nextTick(common.mustCall(() => {
4747
asyncReadable.push('bar');
4848
}));
49-
process.nextTick(common.mustCall(() => {
49+
setImmediate(common.mustCall(() => {
5050
asyncReadable.push(null);
51+
assert.strictEqual(asyncReadable._readableState.needReadable, false);
5152
}));
5253

5354
const flowing = new Readable({
@@ -84,13 +85,13 @@ slowProducer.on('readable', common.mustCall(() => {
8485

8586
process.nextTick(common.mustCall(() => {
8687
slowProducer.push('foo');
87-
}));
88-
process.nextTick(common.mustCall(() => {
89-
slowProducer.push('foo');
90-
}));
91-
process.nextTick(common.mustCall(() => {
92-
slowProducer.push('foo');
93-
}));
94-
process.nextTick(common.mustCall(() => {
95-
slowProducer.push(null);
88+
process.nextTick(common.mustCall(() => {
89+
slowProducer.push('foo');
90+
process.nextTick(common.mustCall(() => {
91+
slowProducer.push('foo');
92+
process.nextTick(common.mustCall(() => {
93+
slowProducer.push(null);
94+
}));
95+
}));
96+
}));
9697
}));

0 commit comments

Comments
 (0)