From 66853605e2e3bd4280cd5143ec4a1bbba4b6bdba Mon Sep 17 00:00:00 2001 From: Mikko Rantanen <jubjub@jubjubnest.net> Date: Sun, 9 Dec 2018 11:09:01 +0200 Subject: [PATCH 1/8] lib: ensure readable stream flows to end If a readable stream was set up with `highWaterMark 0`, the while-loop in `maybeReadMore_` function would never execute. The while loop now has an extra or-condition for the case where the stream is flowing and there are no items. The or-condition is adapted from the emit-condition of the `addChunk` function. The `addChunk` also contains a check for `state.sync`. However that part of the check was omitted here because the `maybeReadMore_` is executed using `process.nextTick`. `state.sync` is set and then unset within the `read()` function so it should never be in effect in `maybeReadMore_`. Fixes: https://github.com/nodejs/node/issues/24915 --- lib/_stream_readable.js | 3 ++- .../test-stream-readable-hwm-0-async.js | 27 +++++++++++++++++++ 2 files changed, 29 insertions(+), 1 deletion(-) create mode 100644 test/parallel/test-stream-readable-hwm-0-async.js diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index d1a17fd066076d..3e73fb2d62221c 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -570,7 +570,8 @@ function maybeReadMore(stream, state) { function maybeReadMore_(stream, state) { var len = state.length; while (!state.reading && !state.ended && - state.length < state.highWaterMark) { + (state.length < state.highWaterMark || + state.flowing && state.length === 0)) { debug('maybeReadMore read 0'); stream.read(0); if (len === state.length) diff --git a/test/parallel/test-stream-readable-hwm-0-async.js b/test/parallel/test-stream-readable-hwm-0-async.js new file mode 100644 index 00000000000000..ac64524feb33d9 --- /dev/null +++ b/test/parallel/test-stream-readable-hwm-0-async.js @@ -0,0 +1,27 @@ +'use strict'; + +const common = require('../common'); + +// This test ensures that Readable stream will call _read() for streams +// with highWaterMark === 0 upon .read(0) instead of just trying to +// emit 'readable' event. + +const { Readable } = require('stream'); + +let count = 5; + +const r = new Readable({ + // Called 6 times: First 5 return data, last one signals end of stream. + read: common.mustCall(() => { + process.nextTick(common.mustCall(() => { + if (count--) + r.push('a'); + else + r.push(null); + })); + }, 6), + highWaterMark: 0, +}); + +r.on('end', common.mustCall()); +r.on('data', common.mustCall(5)); From a7cc7a3ee85a409d3161619e12021f421c38cbb4 Mon Sep 17 00:00:00 2001 From: Mikko Rantanen <jubjub@jubjubnest.net> Date: Mon, 10 Dec 2018 23:37:21 +0200 Subject: [PATCH 2/8] lib: add parens to while condition --- lib/_stream_readable.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 3e73fb2d62221c..8ba0aca375bb64 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -571,7 +571,7 @@ function maybeReadMore_(stream, state) { var len = state.length; while (!state.reading && !state.ended && (state.length < state.highWaterMark || - state.flowing && state.length === 0)) { + (state.flowing && state.length === 0))) { debug('maybeReadMore read 0'); stream.read(0); if (len === state.length) From a2931129ad770b89097f3f00dc812a0415901e28 Mon Sep 17 00:00:00 2001 From: Mikko Rantanen <jubjub@jubjubnest.net> Date: Tue, 11 Dec 2018 19:21:42 +0200 Subject: [PATCH 3/8] lib: clarify maybeReadMore_ loop conditions --- lib/_stream_readable.js | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 8ba0aca375bb64..bb7434ad7de2fb 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -569,6 +569,30 @@ function maybeReadMore(stream, state) { function maybeReadMore_(stream, state) { var len = state.length; + + // Attempt to read more data if we should. + // + // The conditions for reading more data are (one of): + // - Not enough data buffered (state.length < state.highWaterMark). The loop + // is responsible for filling the buffer with enough data if such data + // is available. If highWaterMark is 0 and we are not in the flowing mode + // we should _not_ attempt to buffer any extra data. We'll get more data + // when the stream consumer calls read() instead. + // - No data in the buffer, and the stream is in flowing mode. In this mode + // the loop below is responsible for ensuring read() is called. Failing to + // call read here would abort the flow and there's no other mechanism for + // continuing the flow if the stream consumer has just subscribed to the + // 'data' event. + // + // In addition to the above conditions to keep reading data, the following + // conditions prevent the data from being read: + // - The stream has ended (state.ended). + // - There is already a pending 'read' operation (state.reading). This is a + // case where the the stream has called the implementation defined _read() + // method, but they are processing the call asynchronously and have _not_ + // called push() with new data. In this case we skip performing more + // read()s. The execution ends in this method again after the _read() ends + // up calling push() with more data. while (!state.reading && !state.ended && (state.length < state.highWaterMark || (state.flowing && state.length === 0))) { From 5d230ab2ff5bde1a3d22f6bd8118fa02ceba1f80 Mon Sep 17 00:00:00 2001 From: Mikko Rantanen <jubjub@jubjubnest.net> Date: Tue, 11 Dec 2018 19:59:11 +0200 Subject: [PATCH 4/8] lib: improve variable scoping in maybeReadMore --- lib/_stream_readable.js | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index bb7434ad7de2fb..19856bf6e63e93 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -568,8 +568,6 @@ function maybeReadMore(stream, state) { } function maybeReadMore_(stream, state) { - var len = state.length; - // Attempt to read more data if we should. // // The conditions for reading more data are (one of): @@ -596,13 +594,12 @@ function maybeReadMore_(stream, state) { while (!state.reading && !state.ended && (state.length < state.highWaterMark || (state.flowing && state.length === 0))) { + const len = state.length; debug('maybeReadMore read 0'); stream.read(0); if (len === state.length) // didn't get any data, stop spinning. break; - else - len = state.length; } state.readingMore = false; } From 7cb1ccca72337bba307e43ff4717f4c9b049f2a8 Mon Sep 17 00:00:00 2001 From: Mikko Rantanen <jubjub@jubjubnest.net> Date: Tue, 11 Dec 2018 21:33:04 +0200 Subject: [PATCH 5/8] lib: clarify hwm-0-async test description --- test/parallel/test-stream-readable-hwm-0-async.js | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/parallel/test-stream-readable-hwm-0-async.js b/test/parallel/test-stream-readable-hwm-0-async.js index ac64524feb33d9..866b524893d530 100644 --- a/test/parallel/test-stream-readable-hwm-0-async.js +++ b/test/parallel/test-stream-readable-hwm-0-async.js @@ -2,9 +2,9 @@ const common = require('../common'); -// This test ensures that Readable stream will call _read() for streams -// with highWaterMark === 0 upon .read(0) instead of just trying to -// emit 'readable' event. +// This test ensures that Readable stream will continue to call _read +// for streams with highWaterMark === 0 once the stream returns data +// by calling push() asynchronously. const { Readable } = require('stream'); From 30f6a505bb7a23a2374d4a3e10e38692055f7bfb Mon Sep 17 00:00:00 2001 From: Mikko Rantanen <jubjub@jubjubnest.net> Date: Tue, 11 Dec 2018 21:33:24 +0200 Subject: [PATCH 6/8] lib: add readable stream flow/no-flow mixed test --- ...test-stream-readable-flow-no-flow-mixed.js | 102 ++++++++++++++++++ 1 file changed, 102 insertions(+) create mode 100644 test/parallel/test-stream-readable-flow-no-flow-mixed.js diff --git a/test/parallel/test-stream-readable-flow-no-flow-mixed.js b/test/parallel/test-stream-readable-flow-no-flow-mixed.js new file mode 100644 index 00000000000000..44d58be5917763 --- /dev/null +++ b/test/parallel/test-stream-readable-flow-no-flow-mixed.js @@ -0,0 +1,102 @@ +'use strict'; + +const common = require('../common'); + +// This test ensures that Readable stream switches between flowing and +// non-flowing state properly when varying the 'readable' and 'data' event +// subscription. + +const assert = require('assert'); +const { Readable } = require('stream'); + +const flowingData = [ + { value: 'a' }, + { value: 'b' }, + { value: 'c', subscribeData: true }, + { value: 'd' }, + { value: 'e' }, + { value: 'f', removeReadable: true }, + { value: 'g' }, + { value: 'h' }, + { value: 'i', subscribeReadable: true }, + null, +]; + +const r = new Readable({ + read: common.mustCall(() => { + process.nextTick(() => { + r.push(flowingData.shift()); + }); + }, flowingData.length), + objectMode: true, + + // The water mark shouldn't matter but we'll want to ensure the stream won't + // buffer data before we have a chance to react to the subscribe/unsubscribe + // event controls. + highWaterMark: 0, +}); + +// Store data received through 'readable' events and 'data' events. +const actualReadable = []; +const actualData = []; + +r.on('end', common.mustCall(() => { + assert.deepStrictEqual(actualReadable, ['a', 'b', 'c', 'd', 'e', 'f']); + assert.deepStrictEqual(actualData, ['d', 'e', 'f', 'g', 'h', 'i']); +})); + +// Subscribing 'readable' should set flowing state to false. +assert.strictEqual(r.readableFlowing, null); +r.on('readable', common.mustCall(() => { + const v = r.read(); + actualReadable.push(v.value); + + if (v.subscribeData) { + + // Subsribing 'data' should not change flowing state. + assert.strictEqual(r.readableFlowing, false); + r.on('data', common.mustCall((data) => { + actualData.push(data.value); + + if (data.subscribeReadable) { + + // Re-subsribing readable should put the stream back to non-flowing + // state. + assert.strictEqual(r.readableFlowing, true); + r.on('readable', common.mustCall(() => { + // The stream is at the end, but 'readable' is signaled without the + // stream knowing this. The 'r.read()' here will result in _read + // getting executed, which will then push the final null. + // + // NOTE: The 'null' here signals non-synchronous read. It is NOT the + // same 'null' that the _read ends up pushing to signal end of + // stream. + assert.strictEqual(r.read(), null); + })); + assert.strictEqual(r.readableFlowing, false); + } + }, 6)); + assert.strictEqual(r.readableFlowing, false); + } + + if (v.removeReadable) { + // Removing 'readable' should allow the stream to flow into 'data' without + // us calling 'read()' manually. + // + // This should also cahgne the flowing state - although it is delayed into + // the next tick (within removeAllListeners). + assert.strictEqual(r.readableFlowing, false); + r.removeAllListeners('readable'); + process.nextTick(() => { + assert.strictEqual(r.readableFlowing, true); + }); + } else { + // We'll need to call r.read() to trigger the next read. + // + // It should return 'null' as the actual _read implementation is + // asynchronous but we still need to call it to trigger the push on + // next tick. + assert.strictEqual(r.read(), null); + } +}, 6)); +assert.strictEqual(r.readableFlowing, false); From 216f5e0eabc3b6e1b960e2671c449adf411ca5d9 Mon Sep 17 00:00:00 2001 From: Mikko Rantanen <jubjub@jubjubnest.net> Date: Thu, 13 Dec 2018 02:13:47 +0200 Subject: [PATCH 7/8] lib: simplify the no-flow data test --- ...test-stream-readable-flow-no-flow-mixed.js | 102 ------------------ ...test-stream-readable-hwm-0-no-flow-data.js | 95 ++++++++++++++++ 2 files changed, 95 insertions(+), 102 deletions(-) delete mode 100644 test/parallel/test-stream-readable-flow-no-flow-mixed.js create mode 100644 test/parallel/test-stream-readable-hwm-0-no-flow-data.js diff --git a/test/parallel/test-stream-readable-flow-no-flow-mixed.js b/test/parallel/test-stream-readable-flow-no-flow-mixed.js deleted file mode 100644 index 44d58be5917763..00000000000000 --- a/test/parallel/test-stream-readable-flow-no-flow-mixed.js +++ /dev/null @@ -1,102 +0,0 @@ -'use strict'; - -const common = require('../common'); - -// This test ensures that Readable stream switches between flowing and -// non-flowing state properly when varying the 'readable' and 'data' event -// subscription. - -const assert = require('assert'); -const { Readable } = require('stream'); - -const flowingData = [ - { value: 'a' }, - { value: 'b' }, - { value: 'c', subscribeData: true }, - { value: 'd' }, - { value: 'e' }, - { value: 'f', removeReadable: true }, - { value: 'g' }, - { value: 'h' }, - { value: 'i', subscribeReadable: true }, - null, -]; - -const r = new Readable({ - read: common.mustCall(() => { - process.nextTick(() => { - r.push(flowingData.shift()); - }); - }, flowingData.length), - objectMode: true, - - // The water mark shouldn't matter but we'll want to ensure the stream won't - // buffer data before we have a chance to react to the subscribe/unsubscribe - // event controls. - highWaterMark: 0, -}); - -// Store data received through 'readable' events and 'data' events. -const actualReadable = []; -const actualData = []; - -r.on('end', common.mustCall(() => { - assert.deepStrictEqual(actualReadable, ['a', 'b', 'c', 'd', 'e', 'f']); - assert.deepStrictEqual(actualData, ['d', 'e', 'f', 'g', 'h', 'i']); -})); - -// Subscribing 'readable' should set flowing state to false. -assert.strictEqual(r.readableFlowing, null); -r.on('readable', common.mustCall(() => { - const v = r.read(); - actualReadable.push(v.value); - - if (v.subscribeData) { - - // Subsribing 'data' should not change flowing state. - assert.strictEqual(r.readableFlowing, false); - r.on('data', common.mustCall((data) => { - actualData.push(data.value); - - if (data.subscribeReadable) { - - // Re-subsribing readable should put the stream back to non-flowing - // state. - assert.strictEqual(r.readableFlowing, true); - r.on('readable', common.mustCall(() => { - // The stream is at the end, but 'readable' is signaled without the - // stream knowing this. The 'r.read()' here will result in _read - // getting executed, which will then push the final null. - // - // NOTE: The 'null' here signals non-synchronous read. It is NOT the - // same 'null' that the _read ends up pushing to signal end of - // stream. - assert.strictEqual(r.read(), null); - })); - assert.strictEqual(r.readableFlowing, false); - } - }, 6)); - assert.strictEqual(r.readableFlowing, false); - } - - if (v.removeReadable) { - // Removing 'readable' should allow the stream to flow into 'data' without - // us calling 'read()' manually. - // - // This should also cahgne the flowing state - although it is delayed into - // the next tick (within removeAllListeners). - assert.strictEqual(r.readableFlowing, false); - r.removeAllListeners('readable'); - process.nextTick(() => { - assert.strictEqual(r.readableFlowing, true); - }); - } else { - // We'll need to call r.read() to trigger the next read. - // - // It should return 'null' as the actual _read implementation is - // asynchronous but we still need to call it to trigger the push on - // next tick. - assert.strictEqual(r.read(), null); - } -}, 6)); -assert.strictEqual(r.readableFlowing, false); diff --git a/test/parallel/test-stream-readable-hwm-0-no-flow-data.js b/test/parallel/test-stream-readable-hwm-0-no-flow-data.js new file mode 100644 index 00000000000000..a688ded9d87e87 --- /dev/null +++ b/test/parallel/test-stream-readable-hwm-0-no-flow-data.js @@ -0,0 +1,95 @@ +'use strict'; + +const common = require('../common'); + +// Ensure that subscribing the 'data' event will not make the stream flow. +// The 'data' event will require calling read() by hand. +// +// The test is written for the (somewhat rare) highWaterMark: 0 streams to +// specifically catch any regressions that might occur with these streams. + +const assert = require('assert'); +const { Readable } = require('stream'); + +const streamData = [ 'a', null ]; + +// Track the calls so we can assert their order later. +const calls = []; +const r = new Readable({ + read: common.mustCall(() => { + calls.push('_read:' + streamData[0]); + process.nextTick(() => { + calls.push('push:' + streamData[0]); + r.push(streamData.shift()); + }); + }, streamData.length), + highWaterMark: 0, + + // Object mode is used here just for testing convenience. It really + // shouldn't affect the order of events. Just the data and its format. + objectMode: true, +}); + +assert.strictEqual(r.readableFlowing, null); +r.on('readable', common.mustCall(() => { + calls.push('readable'); +}, 2)); +assert.strictEqual(r.readableFlowing, false); +r.on('data', common.mustCall((data) => { + calls.push('data:' + data); +}, 1)); +r.on('end', common.mustCall(() => { + calls.push('end'); +})); +assert.strictEqual(r.readableFlowing, false); + +setTimeout(() => { + + // Only the _read, push, readable calls have happened. No data must be + // emitted yet. + assert.deepStrictEqual(calls, ['_read:a', 'push:a', 'readable']); + + // Calling 'r.read()' should trigger the data event. + assert.strictEqual(r.read(), 'a'); + assert.deepStrictEqual( + calls, + ['_read:a', 'push:a', 'readable', 'data:a']); + + // The next 'read()' will return null because hwm: 0 does not buffer any + // data and the _read implementation above does the push() asynchronously. + // + // Note: This 'null' signals "no data available". It isn't the end-of-stream + // null value as the stream doesn't know yet that it is about to reach the + // end. + assert.strictEqual(r.read(), null); + setTimeout(() => { + + // There's a new 'readable' event after the data has been pushed. + // The 'end' event will be emitted only after a 'read()'. + // + // This is somewhat special for the case where the '_read' implementation + // calls 'push' asynchronously. If 'push' was synchronous, the 'end' event + // would be emitted here _before_ we call read(). + assert.deepStrictEqual( + calls, + ['_read:a', 'push:a', 'readable', 'data:a', + '_read:null', 'push:null', 'readable']); + + assert.strictEqual(r.read(), null); + + // While it isn't really specified whether the 'end' event should happen + // synchronously with read() or not, we'll assert the current behavior + // ('end' event happening on the next tick after read()) so any changes + // to it are noted and acknowledged in the future. + assert.deepStrictEqual( + calls, + ['_read:a', 'push:a', 'readable', 'data:a', + '_read:null', 'push:null', 'readable']); + process.nextTick(() => { + assert.deepStrictEqual( + calls, + ['_read:a', 'push:a', 'readable', 'data:a', + '_read:null', 'push:null', 'readable', 'end']); + }); + }, 1); +}, 1); From f2ab091a2a623a039b29bee605ff75ae3907bc4b Mon Sep 17 00:00:00 2001 From: Mikko Rantanen <jubjub@jubjubnest.net> Date: Fri, 14 Dec 2018 00:46:24 +0200 Subject: [PATCH 8/8] lib: implement review suggestions --- .../test-stream-readable-hwm-0-no-flow-data.js | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/test/parallel/test-stream-readable-hwm-0-no-flow-data.js b/test/parallel/test-stream-readable-hwm-0-no-flow-data.js index a688ded9d87e87..5f0186d720dd63 100644 --- a/test/parallel/test-stream-readable-hwm-0-no-flow-data.js +++ b/test/parallel/test-stream-readable-hwm-0-no-flow-data.js @@ -43,7 +43,13 @@ r.on('end', common.mustCall(() => { })); assert.strictEqual(r.readableFlowing, false); -setTimeout(() => { +// The stream emits the events asynchronously but that's not guaranteed to +// happen on the next tick (especially since the _read implementation above +// uses process.nextTick). +// +// We use setImmediate here to give the stream enough time to emit all the +// events it's about to emit. +setImmediate(() => { // Only the _read, push, readable calls have happened. No data must be // emitted yet. @@ -61,8 +67,11 @@ setTimeout(() => { // Note: This 'null' signals "no data available". It isn't the end-of-stream // null value as the stream doesn't know yet that it is about to reach the // end. + // + // Using setImmediate again to give the stream enough time to emit all the + // events it wants to emit. assert.strictEqual(r.read(), null); - setTimeout(() => { + setImmediate(() => { // There's a new 'readable' event after the data has been pushed. // The 'end' event will be emitted only after a 'read()'. @@ -91,5 +100,5 @@ setTimeout(() => { ['_read:a', 'push:a', 'readable', 'data:a', '_read:null', 'push:null', 'readable', 'end']); }); - }, 1); -}, 1); + }); +});