Skip to content

Commit e8c90bf

Browse files
addaleaxBethGriggs
authored andcommitted
zlib: do not coalesce multiple .flush() calls
This is an approach to address the issue linked below. Previously, when `.write()` and `.flush()` calls to a zlib stream were interleaved synchronously (i.e. without waiting for these operations to finish), multiple flush calls would have been coalesced into a single flushing operation. This patch changes behaviour so that each `.flush()` all corresponds to one flushing operation on the underlying zlib resource, and the order of operations is as if the `.flush()` call were a `.write()` call. One test had to be removed because it specifically tested the previous behaviour. As a drive-by fix, this also makes sure that all flush callbacks are called. Previously, that was not the case. Fixes: #28478 PR-URL: #28520 Reviewed-By: Rich Trott <[email protected]> Reviewed-By: Ruben Bridgewater <[email protected]> Reviewed-By: Luigi Pinca <[email protected]>
1 parent ddb5152 commit e8c90bf

4 files changed

+79
-53
lines changed

lib/zlib.js

+22-13
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ const {
4545
} = require('buffer');
4646
const { owner_symbol } = require('internal/async_hooks').symbols;
4747

48+
const kFlushFlag = Symbol('kFlushFlag');
49+
4850
const constants = process.binding('constants').zlib;
4951
const {
5052
// Zlib flush levels
@@ -259,7 +261,6 @@ function ZlibBase(opts, mode, handle, { flush, finishFlush, fullFlush }) {
259261
this._chunkSize = chunkSize;
260262
this._defaultFlushFlag = flush;
261263
this._finishFlushFlag = finishFlush;
262-
this._nextFlush = -1;
263264
this._defaultFullFlushFlag = fullFlush;
264265
this.once('end', this.close);
265266
this._info = opts && opts.info;
@@ -304,21 +305,35 @@ ZlibBase.prototype._flush = function(callback) {
304305

305306
// If a flush is scheduled while another flush is still pending, a way to figure
306307
// out which one is the "stronger" flush is needed.
308+
// This is currently only used to figure out which flush flag to use for the
309+
// last chunk.
307310
// Roughly, the following holds:
308311
// Z_NO_FLUSH (< Z_TREES) < Z_BLOCK < Z_PARTIAL_FLUSH <
309312
// Z_SYNC_FLUSH < Z_FULL_FLUSH < Z_FINISH
310313
const flushiness = [];
311314
let i = 0;
312-
for (const flushFlag of [Z_NO_FLUSH, Z_BLOCK, Z_PARTIAL_FLUSH,
313-
Z_SYNC_FLUSH, Z_FULL_FLUSH, Z_FINISH]) {
315+
const kFlushFlagList = [Z_NO_FLUSH, Z_BLOCK, Z_PARTIAL_FLUSH,
316+
Z_SYNC_FLUSH, Z_FULL_FLUSH, Z_FINISH];
317+
for (const flushFlag of kFlushFlagList) {
314318
flushiness[flushFlag] = i++;
315319
}
316320

317321
function maxFlush(a, b) {
318322
return flushiness[a] > flushiness[b] ? a : b;
319323
}
320324

321-
const flushBuffer = Buffer.alloc(0);
325+
// Set up a list of 'special' buffers that can be written using .write()
326+
// from the .flush() code as a way of introducing flushing operations into the
327+
// write sequence.
328+
const kFlushBuffers = [];
329+
{
330+
const dummyArrayBuffer = new ArrayBuffer();
331+
for (const flushFlag of kFlushFlagList) {
332+
kFlushBuffers[flushFlag] = Buffer.from(dummyArrayBuffer);
333+
kFlushBuffers[flushFlag][kFlushFlag] = flushFlag;
334+
}
335+
}
336+
322337
ZlibBase.prototype.flush = function(kind, callback) {
323338
var ws = this._writableState;
324339

@@ -333,13 +348,8 @@ ZlibBase.prototype.flush = function(kind, callback) {
333348
} else if (ws.ending) {
334349
if (callback)
335350
this.once('end', callback);
336-
} else if (this._nextFlush !== -1) {
337-
// This means that there is a flush currently in the write queue.
338-
// We currently coalesce this flush into the pending one.
339-
this._nextFlush = maxFlush(this._nextFlush, kind);
340351
} else {
341-
this._nextFlush = kind;
342-
this.write(flushBuffer, '', callback);
352+
this.write(kFlushBuffers[kind], '', callback);
343353
}
344354
};
345355

@@ -357,9 +367,8 @@ ZlibBase.prototype._transform = function(chunk, encoding, cb) {
357367
var flushFlag = this._defaultFlushFlag;
358368
// We use a 'fake' zero-length chunk to carry information about flushes from
359369
// the public API to the actual stream implementation.
360-
if (chunk === flushBuffer) {
361-
flushFlag = this._nextFlush;
362-
this._nextFlush = -1;
370+
if (typeof chunk[kFlushFlag] === 'number') {
371+
flushFlag = chunk[kFlushFlag];
363372
}
364373

365374
// For the last chunk, also apply `_finishFlushFlag`.

test/parallel/test-zlib-flush-multiple-scheduled.js

-39
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
'use strict';
2+
const common = require('../common');
3+
const assert = require('assert');
4+
const { createGzip, createGunzip, Z_PARTIAL_FLUSH } = require('zlib');
5+
6+
// Verify that .flush() behaves like .write() in terms of ordering, e.g. in
7+
// a sequence like .write() + .flush() + .write() + .flush() each .flush() call
8+
// only affects the data written before it.
9+
// Refs: https://github.com/nodejs/node/issues/28478
10+
11+
const compress = createGzip();
12+
const decompress = createGunzip();
13+
decompress.setEncoding('utf8');
14+
15+
const events = [];
16+
const compressedChunks = [];
17+
18+
for (const chunk of ['abc', 'def', 'ghi']) {
19+
compress.write(chunk, common.mustCall(() => events.push({ written: chunk })));
20+
compress.flush(Z_PARTIAL_FLUSH, common.mustCall(() => {
21+
events.push('flushed');
22+
const chunk = compress.read();
23+
if (chunk !== null)
24+
compressedChunks.push(chunk);
25+
}));
26+
}
27+
28+
compress.end(common.mustCall(() => {
29+
events.push('compress end');
30+
writeToDecompress();
31+
}));
32+
33+
function writeToDecompress() {
34+
// Write the compressed chunks to a decompressor, one by one, in order to
35+
// verify that the flushes actually worked.
36+
const chunk = compressedChunks.shift();
37+
if (chunk === undefined) return decompress.end();
38+
decompress.write(chunk, common.mustCall(() => {
39+
events.push({ read: decompress.read() });
40+
writeToDecompress();
41+
}));
42+
}
43+
44+
process.on('exit', () => {
45+
assert.deepStrictEqual(events, [
46+
{ written: 'abc' },
47+
'flushed',
48+
{ written: 'def' },
49+
'flushed',
50+
{ written: 'ghi' },
51+
'flushed',
52+
'compress end',
53+
{ read: 'abc' },
54+
{ read: 'def' },
55+
{ read: 'ghi' }
56+
]);
57+
});

test/parallel/test-zlib-write-after-flush.js

-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ for (const [ createCompress, createDecompress ] of [
3939
gunz.on('data', (c) => output += c);
4040
gunz.on('end', common.mustCall(() => {
4141
assert.strictEqual(output, input);
42-
assert.strictEqual(gzip._nextFlush, -1);
4342
}));
4443

4544
// Make sure that flush/write doesn't trigger an assert failure

0 commit comments

Comments
 (0)