Skip to content

Commit 18cbde5

Browse files
addaleaxtargos
authored andcommitted
zlib: simplify flushing mechanism
Previously, flushing on zlib streams was implemented through stream 'drain' handlers. This has a number of downsides; in particular, it is complex, and could lead to unpredictable behaviour, since it meant that in a sequence like ```js compressor.write('abc'); compressor.flush(); waitForMoreDataAsynchronously(() => { compressor.write('def'); }); ``` it was not fully deterministic whether the flush happens after the second chunk is written or the first one. This commit replaces this mechanism by one that piggy-backs along the stream’s write queue, using a “special” `Buffer` instance that signals that a flush is currently due. PR-URL: #23186 Reviewed-By: James M Snell <[email protected]>
1 parent 70abcf2 commit 18cbde5

File tree

3 files changed

+21
-31
lines changed

3 files changed

+21
-31
lines changed

lib/zlib.js

+19-29
Original file line numberDiff line numberDiff line change
@@ -311,10 +311,9 @@ function Zlib(opts, mode) {
311311
this._level = level;
312312
this._strategy = strategy;
313313
this._chunkSize = chunkSize;
314-
this._flushFlag = flush;
315-
this._scheduledFlushFlag = Z_NO_FLUSH;
316-
this._origFlushFlag = flush;
314+
this._defaultFlushFlag = flush;
317315
this._finishFlushFlag = finishFlush;
316+
this._nextFlush = -1;
318317
this._info = opts && opts.info;
319318
this.once('end', this.close);
320319
}
@@ -398,6 +397,7 @@ function maxFlush(a, b) {
398397
return flushiness[a] > flushiness[b] ? a : b;
399398
}
400399

400+
const flushBuffer = Buffer.alloc(0);
401401
Zlib.prototype.flush = function flush(kind, callback) {
402402
var ws = this._writableState;
403403

@@ -412,21 +412,13 @@ Zlib.prototype.flush = function flush(kind, callback) {
412412
} else if (ws.ending) {
413413
if (callback)
414414
this.once('end', callback);
415-
} else if (ws.needDrain) {
416-
const alreadyHadFlushScheduled = this._scheduledFlushFlag !== Z_NO_FLUSH;
417-
this._scheduledFlushFlag = maxFlush(kind, this._scheduledFlushFlag);
418-
419-
// If a callback was passed, always register a new `drain` + flush handler,
420-
// mostly because that's simpler and flush callbacks piling up is a rare
421-
// thing anyway.
422-
if (!alreadyHadFlushScheduled || callback) {
423-
const drainHandler = () => this.flush(this._scheduledFlushFlag, callback);
424-
this.once('drain', drainHandler);
425-
}
415+
} else if (this._nextFlush !== -1) {
416+
// This means that there is a flush currently in the write queue.
417+
// We currently coalesce this flush into the pending one.
418+
this._nextFlush = maxFlush(this._nextFlush, kind);
426419
} else {
427-
this._flushFlag = kind;
428-
this.write(Buffer.alloc(0), '', callback);
429-
this._scheduledFlushFlag = Z_NO_FLUSH;
420+
this._nextFlush = kind;
421+
this.write(flushBuffer, '', callback);
430422
}
431423
};
432424

@@ -436,20 +428,18 @@ Zlib.prototype.close = function close(callback) {
436428
};
437429

438430
Zlib.prototype._transform = function _transform(chunk, encoding, cb) {
439-
// If it's the last chunk, or a final flush, we use the Z_FINISH flush flag
440-
// (or whatever flag was provided using opts.finishFlush).
441-
// If it's explicitly flushing at some other time, then we use
442-
// Z_FULL_FLUSH. Otherwise, use the original opts.flush flag.
443-
var flushFlag;
431+
var flushFlag = this._defaultFlushFlag;
432+
// We use a 'fake' zero-length chunk to carry information about flushes from
433+
// the public API to the actual stream implementation.
434+
if (chunk === flushBuffer) {
435+
flushFlag = this._nextFlush;
436+
this._nextFlush = -1;
437+
}
438+
439+
// For the last chunk, also apply `_finishFlushFlag`.
444440
var ws = this._writableState;
445441
if ((ws.ending || ws.ended) && ws.length === chunk.byteLength) {
446-
flushFlag = this._finishFlushFlag;
447-
} else {
448-
flushFlag = this._flushFlag;
449-
// once we've flushed the last of the queue, stop flushing and
450-
// go back to the normal behavior.
451-
if (chunk.byteLength >= ws.length)
452-
this._flushFlag = this._origFlushFlag;
442+
flushFlag = maxFlush(flushFlag, this._finishFlushFlag);
453443
}
454444
processChunk(this, chunk, flushFlag, cb);
455445
};

test/parallel/test-zlib-flush-drain.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -44,5 +44,5 @@ process.once('exit', function() {
4444
assert.strictEqual(
4545
drainCount, 1);
4646
assert.strictEqual(
47-
flushCount, 2);
47+
flushCount, 1);
4848
});

test/parallel/test-zlib-write-after-flush.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ gunz.setEncoding('utf8');
3535
gunz.on('data', (c) => output += c);
3636
gunz.on('end', common.mustCall(() => {
3737
assert.strictEqual(output, input);
38-
assert.strictEqual(gzip._flushFlag, zlib.constants.Z_NO_FLUSH);
38+
assert.strictEqual(gzip._nextFlush, -1);
3939
}));
4040

4141
// make sure that flush/write doesn't trigger an assert failure

0 commit comments

Comments
 (0)