|
| 1 | +'use strict'; |
| 2 | +const common = require('../common'); |
| 3 | +const stream = require('stream'); |
| 4 | + |
| 5 | +// A consumer stream with a very low highWaterMark, which starts in a state |
| 6 | +// where it buffers the chunk it receives rather than indicating that they |
| 7 | +// have been consumed. |
| 8 | +const writable = new stream.Writable({ |
| 9 | + highWaterMark: 5 |
| 10 | +}); |
| 11 | + |
| 12 | +let isCurrentlyBufferingWrites = true; |
| 13 | +const queue = []; |
| 14 | + |
| 15 | +writable._write = (chunk, encoding, cb) => { |
| 16 | + if (isCurrentlyBufferingWrites) |
| 17 | + queue.push({chunk, cb}); |
| 18 | + else |
| 19 | + cb(); |
| 20 | +}; |
| 21 | + |
| 22 | +const readable = new stream.Readable({ |
| 23 | + read() {} |
| 24 | +}); |
| 25 | + |
| 26 | +readable.pipe(writable); |
| 27 | + |
| 28 | +readable.once('pause', common.mustCall(() => { |
| 29 | + // First pause, resume manually. The next write() to writable will still |
| 30 | + // return false, because chunks are still being buffered, so it will increase |
| 31 | + // the awaitDrain counter again. |
| 32 | + process.nextTick(common.mustCall(() => { |
| 33 | + readable.resume(); |
| 34 | + })); |
| 35 | + |
| 36 | + readable.once('pause', common.mustCall(() => { |
| 37 | + // Second pause, handle all chunks from now on. Once all callbacks that |
| 38 | + // are currently queued up are handled, the awaitDrain drain counter should |
| 39 | + // fall back to 0 and all chunks that are pending on the readable side |
| 40 | + // should be flushed. |
| 41 | + isCurrentlyBufferingWrites = false; |
| 42 | + for (const queued of queue) |
| 43 | + queued.cb(); |
| 44 | + })); |
| 45 | +})); |
| 46 | + |
| 47 | +readable.push(Buffer(100)); // Fill the writable HWM, first 'pause'. |
| 48 | +readable.push(Buffer(100)); // Second 'pause'. |
| 49 | +readable.push(Buffer(100)); // Should get through to the writable. |
| 50 | +readable.push(null); |
| 51 | + |
| 52 | +writable.on('finish', common.mustCall(() => { |
| 53 | + // Everything okay, all chunks were written. |
| 54 | +})); |
0 commit comments