Skip to content

Commit 53fb7af

Browse files
mcollinatargos
authored andcommitted
stream: restore flow if there are 'data' handlers after once('readable')
Fixes: #21398 See: #21696 PR-URL: #22209 Reviewed-By: James M Snell <[email protected]> Reviewed-By: Mathias Buus <[email protected]>
1 parent 107c8c0 commit 53fb7af

4 files changed

+92
-7
lines changed

doc/api/stream.md

+19-3
Original file line numberDiff line numberDiff line change
@@ -618,6 +618,12 @@ instance, when the `readable.resume()` method is called without a listener
618618
attached to the `'data'` event, or when a `'data'` event handler is removed
619619
from the stream.
620620

621+
Adding a [`'readable'`][] event handler automatically make the stream to
622+
stop flowing, and the data to be consumed via
623+
[`readable.read()`][stream-read]. If the [`'readable'`] event handler is
624+
removed, then the stream will start flowing again if there is a
625+
[`'data'`][] event handler.
626+
621627
#### Three States
622628

623629
The "two modes" of operation for a `Readable` stream are a simplified
@@ -666,12 +672,15 @@ within the streams internal buffer.
666672
The `Readable` stream API evolved across multiple Node.js versions and provides
667673
multiple methods of consuming stream data. In general, developers should choose
668674
*one* of the methods of consuming data and *should never* use multiple methods
669-
to consume data from a single stream.
675+
to consume data from a single stream. Specifically, using a combination
676+
of `on('data')`, `on('readable')`, `pipe()` or async iterators could
677+
lead to unintuitive behavior.
670678

671679
Use of the `readable.pipe()` method is recommended for most users as it has been
672680
implemented to provide the easiest way of consuming stream data. Developers that
673681
require more fine-grained control over the transfer and generation of data can
674-
use the [`EventEmitter`][] and `readable.pause()`/`readable.resume()` APIs.
682+
use the [`EventEmitter`][] and `readable.on('readable')`/`readable.read()`
683+
or the `readable.pause()`/`readable.resume()` APIs.
675684

676685
#### Class: stream.Readable
677686
<!-- YAML
@@ -825,7 +834,11 @@ result in increased throughput.
825834

826835
If both `'readable'` and [`'data'`][] are used at the same time, `'readable'`
827836
takes precedence in controlling the flow, i.e. `'data'` will be emitted
828-
only when [`stream.read()`][stream-read] is called.
837+
only when [`stream.read()`][stream-read] is called. The
838+
`readableFlowing` property would become `false`.
839+
If there are `'data'` listeners when `'readable'` is removed, the stream
840+
will start flowing, i.e. `'data'` events will be emitted without calling
841+
`.resume()`.
829842

830843
##### readable.destroy([error])
831844
<!-- YAML
@@ -887,6 +900,9 @@ readable.on('data', (chunk) => {
887900
});
888901
```
889902

903+
The `readable.pause()` method has no effect if there is a `'readable'`
904+
event listener.
905+
890906
##### readable.pipe(destination[, options])
891907
<!-- YAML
892908
added: v0.9.4

lib/_stream_readable.js

+8-1
Original file line numberDiff line numberDiff line change
@@ -810,6 +810,7 @@ Readable.prototype.on = function(ev, fn) {
810810
} else if (ev === 'readable') {
811811
if (!state.endEmitted && !state.readableListening) {
812812
state.readableListening = state.needReadable = true;
813+
state.flowing = false;
813814
state.emittedReadable = false;
814815
debug('on readable', state.length, state.reading);
815816
if (state.length) {
@@ -858,6 +859,11 @@ Readable.prototype.removeAllListeners = function(ev) {
858859

859860
function updateReadableListening(self) {
860861
self._readableState.readableListening = self.listenerCount('readable') > 0;
862+
863+
// crude way to check if we should resume
864+
if (self.listenerCount('data') > 0) {
865+
self.resume();
866+
}
861867
}
862868

863869
function nReadingNextTick(self) {
@@ -872,7 +878,8 @@ Readable.prototype.resume = function() {
872878
if (!state.flowing) {
873879
debug('resume');
874880
// we flow only if there is no one listening
875-
// for readable
881+
// for readable, but we still have to call
882+
// resume()
876883
state.flowing = !state.readableListening;
877884
resume(this, state);
878885
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const assert = require('assert');
5+
const { Readable, Writable } = require('stream');
6+
7+
// This test ensures that if have 'readable' listener
8+
// on Readable instance it will not disrupt the pipe.
9+
10+
{
11+
let receivedData = '';
12+
const w = new Writable({
13+
write: (chunk, env, callback) => {
14+
receivedData += chunk;
15+
callback();
16+
},
17+
});
18+
19+
const data = ['foo', 'bar', 'baz'];
20+
const r = new Readable({
21+
read: () => {},
22+
});
23+
24+
r.once('readable', common.mustCall());
25+
26+
r.pipe(w);
27+
r.push(data[0]);
28+
r.push(data[1]);
29+
r.push(data[2]);
30+
r.push(null);
31+
32+
w.on('finish', common.mustCall(() => {
33+
assert.strictEqual(receivedData, data.join(''));
34+
}));
35+
}
36+
37+
{
38+
let receivedData = '';
39+
const w = new Writable({
40+
write: (chunk, env, callback) => {
41+
receivedData += chunk;
42+
callback();
43+
},
44+
});
45+
46+
const data = ['foo', 'bar', 'baz'];
47+
const r = new Readable({
48+
read: () => {},
49+
});
50+
51+
r.pipe(w);
52+
r.push(data[0]);
53+
r.push(data[1]);
54+
r.push(data[2]);
55+
r.push(null);
56+
r.once('readable', common.mustCall());
57+
58+
w.on('finish', common.mustCall(() => {
59+
assert.strictEqual(receivedData, data.join(''));
60+
}));
61+
}

test/parallel/test-stream-readable-reading-readingMore.js

+4-3
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,11 @@ const Readable = require('stream').Readable;
3131
assert.strictEqual(state.reading, false);
3232
}
3333

34+
const expectedReadingMore = [true, false];
3435
readable.on('readable', common.mustCall(() => {
35-
// 'readable' always gets called before 'end'
36-
// since 'end' hasn't been emitted, more data could be incoming
37-
assert.strictEqual(state.readingMore, true);
36+
// there is only one readingMore scheduled from on('data'),
37+
// after which everything is governed by the .read() call
38+
assert.strictEqual(state.readingMore, expectedReadingMore.shift());
3839

3940
// if the stream has ended, we shouldn't be reading
4041
assert.strictEqual(state.ended, !state.reading);

0 commit comments

Comments
 (0)