Skip to content

Commit ea87809

Browse files
committed
stream: fix _final and 'prefinish' timing
This PR fixes a few different things: The timing of 'prefinish' depends on whether or not _final is defined. In on case the event is emitted synchronously with end() and otherwise asynchronously. _final is currently unecessarily called asynchronously which forces implementors to use 'prefinish' as a hack to emulate synchronous behaviour. Furthermore, this hack is subtly broken due to the above issue. Refs: #31401 Refs: #32763 (comment) PR-URL: #32780 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Rich Trott <[email protected]>
1 parent d08bd41 commit ea87809

6 files changed

+88
-23
lines changed

lib/_stream_transform.js

+4-4
Original file line numberDiff line numberDiff line change
@@ -99,10 +99,10 @@ function Transform(options) {
9999
this._flush = options.flush;
100100
}
101101

102-
// TODO(ronag): Unfortunately _final is invoked asynchronously.
103-
// Use `prefinish` hack. `prefinish` is emitted synchronously when
104-
// and only when `_final` is not defined. Implementing `_final`
105-
// to a Transform should be an error.
102+
// When the writable side finishes, then flush out anything remaining.
103+
// Backwards compat. Some Transform streams incorrectly implement _final
104+
// instead of or in addition to _flush. By using 'prefinish' instead of
105+
// implementing _final we continue supporting this unfortunate use case.
106106
this.on('prefinish', prefinish);
107107
}
108108

lib/_stream_writable.js

+15-9
Original file line numberDiff line numberDiff line change
@@ -635,24 +635,30 @@ function needFinish(state) {
635635
}
636636

637637
function callFinal(stream, state) {
638+
state.sync = true;
639+
state.pendingcb++;
638640
stream._final((err) => {
639641
state.pendingcb--;
640642
if (err) {
641-
errorOrDestroy(stream, err);
642-
} else {
643+
errorOrDestroy(stream, err, state.sync);
644+
} else if (needFinish(state)) {
643645
state.prefinished = true;
644646
stream.emit('prefinish');
645-
finishMaybe(stream, state);
647+
// Backwards compat. Don't check state.sync here.
648+
// Some streams assume 'finish' will be emitted
649+
// asynchronously relative to _final callback.
650+
state.pendingcb++;
651+
process.nextTick(finish, stream, state);
646652
}
647653
});
654+
state.sync = false;
648655
}
649656

650657
function prefinish(stream, state) {
651658
if (!state.prefinished && !state.finalCalled) {
652659
if (typeof stream._final === 'function' && !state.destroyed) {
653-
state.pendingcb++;
654660
state.finalCalled = true;
655-
process.nextTick(callFinal, stream, state);
661+
callFinal(stream, state);
656662
} else {
657663
state.prefinished = true;
658664
stream.emit('prefinish');
@@ -661,10 +667,9 @@ function prefinish(stream, state) {
661667
}
662668

663669
function finishMaybe(stream, state, sync) {
664-
const need = needFinish(state);
665-
if (need) {
670+
if (needFinish(state)) {
666671
prefinish(stream, state);
667-
if (state.pendingcb === 0) {
672+
if (state.pendingcb === 0 && needFinish(state)) {
668673
state.pendingcb++;
669674
if (sync) {
670675
process.nextTick(finish, stream, state);
@@ -673,14 +678,15 @@ function finishMaybe(stream, state, sync) {
673678
}
674679
}
675680
}
676-
return need;
677681
}
678682

679683
function finish(stream, state) {
680684
state.pendingcb--;
681685
if (state.errorEmitted)
682686
return;
683687

688+
// TODO(ronag): This could occur after 'close' is emitted.
689+
684690
state.finished = true;
685691
stream.emit('finish');
686692

lib/internal/http2/core.js

+6-3
Original file line numberDiff line numberDiff line change
@@ -1710,11 +1710,14 @@ function streamOnPause() {
17101710
}
17111711

17121712
function afterShutdown(status) {
1713+
const stream = this.handle[kOwner];
1714+
if (stream) {
1715+
stream.on('finish', () => {
1716+
stream[kMaybeDestroy]();
1717+
});
1718+
}
17131719
// Currently this status value is unused
17141720
this.callback();
1715-
const stream = this.handle[kOwner];
1716-
if (stream)
1717-
stream[kMaybeDestroy]();
17181721
}
17191722

17201723
function finishSendTrailers(stream, headersList) {

test/parallel/test-stream-transform-final-sync.js

+3-3
Original file line numberDiff line numberDiff line change
@@ -82,15 +82,15 @@ const t = new stream.Transform({
8282
process.nextTick(function() {
8383
state++;
8484
// fluchCallback part 2
85-
assert.strictEqual(state, 15);
85+
assert.strictEqual(state, 13);
8686
done();
8787
});
8888
}, 1)
8989
});
9090
t.on('finish', common.mustCall(function() {
9191
state++;
9292
// finishListener
93-
assert.strictEqual(state, 13);
93+
assert.strictEqual(state, 14);
9494
}, 1));
9595
t.on('end', common.mustCall(function() {
9696
state++;
@@ -106,5 +106,5 @@ t.write(4);
106106
t.end(7, common.mustCall(function() {
107107
state++;
108108
// endMethodCallback
109-
assert.strictEqual(state, 14);
109+
assert.strictEqual(state, 15);
110110
}, 1));

test/parallel/test-stream-transform-final.js

+3-3
Original file line numberDiff line numberDiff line change
@@ -84,15 +84,15 @@ const t = new stream.Transform({
8484
process.nextTick(function() {
8585
state++;
8686
// flushCallback part 2
87-
assert.strictEqual(state, 15);
87+
assert.strictEqual(state, 13);
8888
done();
8989
});
9090
}, 1)
9191
});
9292
t.on('finish', common.mustCall(function() {
9393
state++;
9494
// finishListener
95-
assert.strictEqual(state, 13);
95+
assert.strictEqual(state, 14);
9696
}, 1));
9797
t.on('end', common.mustCall(function() {
9898
state++;
@@ -108,5 +108,5 @@ t.write(4);
108108
t.end(7, common.mustCall(function() {
109109
state++;
110110
// endMethodCallback
111-
assert.strictEqual(state, 14);
111+
assert.strictEqual(state, 15);
112112
}, 1));

test/parallel/test-stream-writable-finished.js

+57-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ const assert = require('assert');
3030
}
3131

3232
{
33-
// Emit finish asynchronously
33+
// Emit finish asynchronously.
3434

3535
const w = new Writable({
3636
write(chunk, encoding, cb) {
@@ -41,3 +41,59 @@ const assert = require('assert');
4141
w.end();
4242
w.on('finish', common.mustCall());
4343
}
44+
45+
{
46+
// Emit prefinish synchronously.
47+
48+
const w = new Writable({
49+
write(chunk, encoding, cb) {
50+
cb();
51+
}
52+
});
53+
54+
let sync = true;
55+
w.on('prefinish', common.mustCall(() => {
56+
assert.strictEqual(sync, true);
57+
}));
58+
w.end();
59+
sync = false;
60+
}
61+
62+
{
63+
// Emit prefinish synchronously w/ final.
64+
65+
const w = new Writable({
66+
write(chunk, encoding, cb) {
67+
cb();
68+
},
69+
final(cb) {
70+
cb();
71+
}
72+
});
73+
74+
let sync = true;
75+
w.on('prefinish', common.mustCall(() => {
76+
assert.strictEqual(sync, true);
77+
}));
78+
w.end();
79+
sync = false;
80+
}
81+
82+
83+
{
84+
// Call _final synchronously.
85+
86+
let sync = true;
87+
const w = new Writable({
88+
write(chunk, encoding, cb) {
89+
cb();
90+
},
91+
final: common.mustCall((cb) => {
92+
assert.strictEqual(sync, true);
93+
cb();
94+
})
95+
});
96+
97+
w.end();
98+
sync = false;
99+
}

0 commit comments

Comments
 (0)