diff --git a/doc/api/stream.md b/doc/api/stream.md index 9f233cd325132f..86286906b7f297 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -762,6 +762,9 @@ changes: description: > 'readable' is always emitted in the next tick after .push() is called + - version: REPLACEME + pr-url: https://github.com/nodejs/node/pull/18994 + description: Using 'readable' requires calling .read(). --> The `'readable'` event is emitted when there is data available to be read from @@ -770,10 +773,16 @@ cause some amount of data to be read into an internal buffer. ```javascript const readable = getReadableStreamSomehow(); -readable.on('readable', () => { +readable.on('readable', function() { // there is some data to read now + let data; + + while (data = this.read()) { + console.log(data); + } }); ``` + The `'readable'` event will also be emitted once the end of the stream data has been reached but before the `'end'` event is emitted. @@ -806,6 +815,10 @@ In general, the `readable.pipe()` and `'data'` event mechanisms are easier to understand than the `'readable'` event. However, handling `'readable'` might result in increased throughput. +If both `'readable'` and [`'data'`][] are used at the same time, `'readable'` +takes precedence in controlling the flow, i.e. `'data'` will be emitted +only when [`stream.read()`][stream-read] is called. + ##### readable.destroy([error]) <!-- YAML added: v8.0.0 @@ -997,6 +1010,10 @@ the status of the `highWaterMark`. ##### readable.resume() <!-- YAML added: v0.9.4 +changes: + - version: REPLACEME + pr-url: https://github.com/nodejs/node/pull/18994 + description: Resume has no effect if there is a 'readable' event listening --> * Returns: {this} @@ -1016,6 +1033,9 @@ getReadableStreamSomehow() }); ``` +The `readable.resume()` method has no effect if there is a `'readable'` +event listener. + ##### readable.setEncoding(encoding) <!-- YAML added: v0.9.4 diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 7c6671fcd0760d..5b044e79c1b18d 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -223,6 +223,7 @@ Readable.prototype.unshift = function(chunk) { }; function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) { + debug('readableAddChunk', chunk); var state = stream._readableState; if (chunk === null) { state.reading = false; @@ -799,20 +800,24 @@ Readable.prototype.unpipe = function(dest) { // Ensure readable listeners eventually get something Readable.prototype.on = function(ev, fn) { const res = Stream.prototype.on.call(this, ev, fn); + const state = this._readableState; if (ev === 'data') { - // Start flowing on next tick if stream isn't explicitly paused - if (this._readableState.flowing !== false) + // update readableListening so that resume() may be a no-op + // a few lines down. This is needed to support once('readable'). + state.readableListening = this.listenerCount('readable') > 0; + + // Try start flowing on next tick if stream isn't explicitly paused + if (state.flowing !== false) this.resume(); } else if (ev === 'readable') { - const state = this._readableState; if (!state.endEmitted && !state.readableListening) { state.readableListening = state.needReadable = true; state.emittedReadable = false; - if (!state.reading) { - process.nextTick(nReadingNextTick, this); - } else if (state.length) { + if (state.length) { emitReadable(this); + } else if (!state.reading) { + process.nextTick(nReadingNextTick, this); } } } @@ -821,6 +826,42 @@ Readable.prototype.on = function(ev, fn) { }; Readable.prototype.addListener = Readable.prototype.on; +Readable.prototype.removeListener = function(ev, fn) { + const res = Stream.prototype.removeListener.call(this, ev, fn); + + if (ev === 'readable') { + // We need to check if there is someone still listening to + // to readable and reset the state. However this needs to happen + // after readable has been emitted but before I/O (nextTick) to + // support once('readable', fn) cycles. This means that calling + // resume within the same tick will have no + // effect. + process.nextTick(updateReadableListening, this); + } + + return res; +}; + +Readable.prototype.removeAllListeners = function(ev) { + const res = Stream.prototype.removeAllListeners.call(this, ev); + + if (ev === 'readable' || ev === undefined) { + // We need to check if there is someone still listening to + // to readable and reset the state. However this needs to happen + // after readable has been emitted but before I/O (nextTick) to + // support once('readable', fn) cycles. This means that calling + // resume within the same tick will have no + // effect. + process.nextTick(updateReadableListening, this); + } + + return res; +}; + +function updateReadableListening(self) { + self._readableState.readableListening = self.listenerCount('readable') > 0; +} + function nReadingNextTick(self) { debug('readable nexttick read 0'); self.read(0); @@ -832,7 +873,9 @@ Readable.prototype.resume = function() { var state = this._readableState; if (!state.flowing) { debug('resume'); - state.flowing = true; + // we flow only if there is no one listening + // for readable + state.flowing = !state.readableListening; resume(this, state); } return this; diff --git a/test/parallel/test-http-readable-data-event.js b/test/parallel/test-http-readable-data-event.js new file mode 100644 index 00000000000000..74c9a59b268083 --- /dev/null +++ b/test/parallel/test-http-readable-data-event.js @@ -0,0 +1,52 @@ +'use strict'; + +const common = require('../common'); +const assert = require('assert'); +const http = require('http'); +const helloWorld = 'Hello World!'; +const helloAgainLater = 'Hello again later!'; + +const server = http.createServer((req, res) => { + res.writeHead(200, { + 'Content-Length': '' + (helloWorld.length + helloAgainLater.length) + }); + res.write(helloWorld); + + // we need to make sure the data is flushed + setTimeout(() => { + res.end(helloAgainLater); + }, common.platformTimeout(10)); +}).listen(0, function() { + const opts = { + hostname: 'localhost', + port: server.address().port, + path: '/' + }; + + const expectedData = [helloWorld, helloAgainLater]; + const expectedRead = [helloWorld, null, helloAgainLater, null]; + + const req = http.request(opts, (res) => { + res.on('error', common.mustNotCall); + + res.on('readable', common.mustCall(() => { + let data; + + do { + data = res.read(); + assert.strictEqual(data, expectedRead.shift()); + } while (data !== null); + }, 2)); + + res.setEncoding('utf8'); + res.on('data', common.mustCall((data) => { + assert.strictEqual(data, expectedData.shift()); + }, 2)); + + res.on('end', common.mustCall(() => { + server.close(); + })); + }); + + req.end(); +}); diff --git a/test/parallel/test-stream-readable-reading-readingMore.js b/test/parallel/test-stream-readable-reading-readingMore.js index 0af2eeb71f2b1b..21361abc346555 100644 --- a/test/parallel/test-stream-readable-reading-readingMore.js +++ b/test/parallel/test-stream-readable-reading-readingMore.js @@ -3,64 +3,166 @@ const common = require('../common'); const assert = require('assert'); const Readable = require('stream').Readable; -const readable = new Readable({ - read(size) {} -}); +{ + const readable = new Readable({ + read(size) {} + }); -const state = readable._readableState; + const state = readable._readableState; -// Starting off with false initially. -assert.strictEqual(state.reading, false); -assert.strictEqual(state.readingMore, false); + // Starting off with false initially. + assert.strictEqual(state.reading, false); + assert.strictEqual(state.readingMore, false); -readable.on('data', common.mustCall((data) => { - // while in a flowing state, should try to read more. - if (readable.readableFlowing) + readable.on('data', common.mustCall((data) => { + // while in a flowing state with a 'readable' listener + // we should not be reading more + if (readable.readableFlowing) + assert.strictEqual(state.readingMore, true); + + // reading as long as we've not ended + assert.strictEqual(state.reading, !state.ended); + }, 2)); + + function onStreamEnd() { + // End of stream; state.reading is false + // And so should be readingMore. + assert.strictEqual(state.readingMore, false); + assert.strictEqual(state.reading, false); + } + + readable.on('readable', common.mustCall(() => { + // 'readable' always gets called before 'end' + // since 'end' hasn't been emitted, more data could be incoming assert.strictEqual(state.readingMore, true); - // reading as long as we've not ended - assert.strictEqual(state.reading, !state.ended); -}, 2)); + // if the stream has ended, we shouldn't be reading + assert.strictEqual(state.ended, !state.reading); + + const data = readable.read(); + if (data === null) // reached end of stream + process.nextTick(common.mustCall(onStreamEnd, 1)); + }, 2)); + + readable.on('end', common.mustCall(onStreamEnd)); + readable.push('pushed'); + + readable.read(6); + + // reading + assert.strictEqual(state.reading, true); + assert.strictEqual(state.readingMore, true); + + // add chunk to front + readable.unshift('unshifted'); -function onStreamEnd() { - // End of stream; state.reading is false - // And so should be readingMore. + // end + readable.push(null); +} + +{ + const readable = new Readable({ + read(size) {} + }); + + const state = readable._readableState; + + // Starting off with false initially. + assert.strictEqual(state.reading, false); assert.strictEqual(state.readingMore, false); + + readable.on('data', common.mustCall((data) => { + // while in a flowing state without a 'readable' listener + // we should be reading more + if (readable.readableFlowing) + assert.strictEqual(state.readingMore, true); + + // reading as long as we've not ended + assert.strictEqual(state.reading, !state.ended); + }, 2)); + + function onStreamEnd() { + // End of stream; state.reading is false + // And so should be readingMore. + assert.strictEqual(state.readingMore, false); + assert.strictEqual(state.reading, false); + } + + readable.on('end', common.mustCall(onStreamEnd)); + readable.push('pushed'); + + // stop emitting 'data' events + assert.strictEqual(state.flowing, true); + readable.pause(); + + // paused + assert.strictEqual(state.reading, false); + assert.strictEqual(state.flowing, false); + + readable.resume(); assert.strictEqual(state.reading, false); + assert.strictEqual(state.flowing, true); + + // add chunk to front + readable.unshift('unshifted'); + + // end + readable.push(null); } -readable.on('readable', common.mustCall(() => { - // 'readable' always gets called before 'end' - // since 'end' hasn't been emitted, more data could be incoming - assert.strictEqual(state.readingMore, true); +{ + const readable = new Readable({ + read(size) {} + }); - // if the stream has ended, we shouldn't be reading - assert.strictEqual(state.ended, !state.reading); + const state = readable._readableState; - const data = readable.read(); - if (data === null) // reached end of stream - process.nextTick(common.mustCall(onStreamEnd, 1)); -}, 2)); + // Starting off with false initially. + assert.strictEqual(state.reading, false); + assert.strictEqual(state.readingMore, false); + + const onReadable = common.mustNotCall; + + readable.on('readable', onReadable); -readable.on('end', common.mustCall(onStreamEnd)); + readable.on('data', common.mustCall((data) => { + // reading as long as we've not ended + assert.strictEqual(state.reading, !state.ended); + }, 2)); -readable.push('pushed'); + readable.removeListener('readable', onReadable); -// stop emitting 'data' events -readable.pause(); + function onStreamEnd() { + // End of stream; state.reading is false + // And so should be readingMore. + assert.strictEqual(state.readingMore, false); + assert.strictEqual(state.reading, false); + } -// read() should only be called while operating in paused mode -readable.read(6); + readable.on('end', common.mustCall(onStreamEnd)); + readable.push('pushed'); -// reading -assert.strictEqual(state.reading, true); -assert.strictEqual(state.readingMore, true); + // we are still not flowing, we will be resuming in the next tick + assert.strictEqual(state.flowing, false); -// resume emitting 'data' events -readable.resume(); + // wait for nextTick, so the readableListener flag resets + process.nextTick(function() { + readable.resume(); -// add chunk to front -readable.unshift('unshifted'); + // stop emitting 'data' events + assert.strictEqual(state.flowing, true); + readable.pause(); -// end -readable.push(null); + // paused + assert.strictEqual(state.flowing, false); + + readable.resume(); + assert.strictEqual(state.flowing, true); + + // add chunk to front + readable.unshift('unshifted'); + + // end + readable.push(null); + }); +} diff --git a/test/parallel/test-stream3-pause-then-read.js b/test/parallel/test-stream3-pause-then-read.js index 4e1b71d74df10b..5de8d8ec512c2b 100644 --- a/test/parallel/test-stream3-pause-then-read.js +++ b/test/parallel/test-stream3-pause-then-read.js @@ -35,6 +35,7 @@ let expectEndingData = expectTotalData; const r = new Readable({ highWaterMark: 1000 }); let chunks = totalChunks; r._read = function(n) { + console.log('_read called', chunks); if (!(chunks % 2)) setImmediate(push); else if (!(chunks % 3)) @@ -49,6 +50,7 @@ function push() { if (chunk) { totalPushed += chunk.length; } + console.log('chunks', chunks); r.push(chunk); } @@ -64,6 +66,7 @@ function readn(n, then) { expectEndingData -= n; (function read() { const c = r.read(n); + console.error('c', c); if (!c) r.once('readable', read); else {