Skip to content

Commit be9f726

Browse files
committed
stream: fix _final and 'prefinish' timing
This PR fixes a few different things: The timing of 'prefinish' depends on whether or not _final is defined. In on case the event is emitted synchronously with end() and otherwise asynchronously. _final is currently unecessarily called asynchronously which forces implementors to use 'prefinish' as a hack to emulate synchronous behaviour. Furthermore, this hack is subtly broken due to the above issue. The stream should not finish if errored or destroyed synchronously during the prefinish stage. Refs: #31401 Refs: #32763 (comment)
1 parent 0bd5595 commit be9f726

6 files changed

+87
-21
lines changed

lib/_stream_transform.js

+4-4
Original file line numberDiff line numberDiff line change
@@ -99,10 +99,10 @@ function Transform(options) {
9999
this._flush = options.flush;
100100
}
101101

102-
// TODO(ronag): Unfortunately _final is invoked asynchronously.
103-
// Use `prefinish` hack. `prefinish` is emitted synchronously when
104-
// and only when `_final` is not defined. Implementing `_final`
105-
// to a Transform should be an error.
102+
// When the writable side finishes, then flush out anything remaining.
103+
// Backwards compat. Some Transform streams incorrectly implement _final
104+
// instead of or in addition to _flush. By using 'prefinish' instead of
105+
// implementing _final we continue supporting this unfortunate use case.
106106
this.on('prefinish', prefinish);
107107
}
108108

lib/_stream_writable.js

+15-8
Original file line numberDiff line numberDiff line change
@@ -629,24 +629,33 @@ function needFinish(state) {
629629
}
630630

631631
function callFinal(stream, state) {
632+
state.sync = true;
633+
state.pendingcb++;
632634
stream._final((err) => {
633635
state.pendingcb--;
634636
if (err) {
635-
errorOrDestroy(stream, err);
637+
errorOrDestroy(stream, err, state.sync);
636638
} else {
639+
// Backwards compat. Don't check needFinish() here.
640+
// Some streams assume 'finish' will be emitted
641+
// even if stream has been destroyed.
637642
state.prefinished = true;
638643
stream.emit('prefinish');
639-
finishMaybe(stream, state);
644+
// Backwards compat. Don't check state.sync here.
645+
// Some stream assume 'finish' will be emitted
646+
// asynchronously relative to _final callback.
647+
state.pendingcb++;
648+
process.nextTick(finish, stream, state);
640649
}
641650
});
651+
state.sync = false;
642652
}
643653

644654
function prefinish(stream, state) {
645655
if (!state.prefinished && !state.finalCalled) {
646656
if (typeof stream._final === 'function' && !state.destroyed) {
647-
state.pendingcb++;
648657
state.finalCalled = true;
649-
process.nextTick(callFinal, stream, state);
658+
callFinal(stream, state);
650659
} else {
651660
state.prefinished = true;
652661
stream.emit('prefinish');
@@ -655,10 +664,9 @@ function prefinish(stream, state) {
655664
}
656665

657666
function finishMaybe(stream, state, sync) {
658-
const need = needFinish(state);
659-
if (need) {
667+
if (needFinish(state)) {
660668
prefinish(stream, state);
661-
if (state.pendingcb === 0) {
669+
if (state.pendingcb === 0 && needFinish(state)) {
662670
state.pendingcb++;
663671
if (sync) {
664672
process.nextTick(finish, stream, state);
@@ -667,7 +675,6 @@ function finishMaybe(stream, state, sync) {
667675
}
668676
}
669677
}
670-
return need;
671678
}
672679

673680
function finish(stream, state) {

lib/internal/http2/core.js

+6-3
Original file line numberDiff line numberDiff line change
@@ -1710,11 +1710,14 @@ function streamOnPause() {
17101710
}
17111711

17121712
function afterShutdown(status) {
1713+
const stream = this.handle[kOwner];
1714+
if (stream) {
1715+
stream.on('finish', () => {
1716+
stream[kMaybeDestroy]();
1717+
});
1718+
}
17131719
// Currently this status value is unused
17141720
this.callback();
1715-
const stream = this.handle[kOwner];
1716-
if (stream)
1717-
stream[kMaybeDestroy]();
17181721
}
17191722

17201723
function finishSendTrailers(stream, headersList) {

test/parallel/test-stream-transform-final-sync.js

+3-3
Original file line numberDiff line numberDiff line change
@@ -82,15 +82,15 @@ const t = new stream.Transform({
8282
process.nextTick(function() {
8383
state++;
8484
// fluchCallback part 2
85-
assert.strictEqual(state, 15);
85+
assert.strictEqual(state, 13);
8686
done();
8787
});
8888
}, 1)
8989
});
9090
t.on('finish', common.mustCall(function() {
9191
state++;
9292
// finishListener
93-
assert.strictEqual(state, 13);
93+
assert.strictEqual(state, 14);
9494
}, 1));
9595
t.on('end', common.mustCall(function() {
9696
state++;
@@ -106,5 +106,5 @@ t.write(4);
106106
t.end(7, common.mustCall(function() {
107107
state++;
108108
// endMethodCallback
109-
assert.strictEqual(state, 14);
109+
assert.strictEqual(state, 15);
110110
}, 1));

test/parallel/test-stream-transform-final.js

+3-3
Original file line numberDiff line numberDiff line change
@@ -84,15 +84,15 @@ const t = new stream.Transform({
8484
process.nextTick(function() {
8585
state++;
8686
// flushCallback part 2
87-
assert.strictEqual(state, 15);
87+
assert.strictEqual(state, 13);
8888
done();
8989
});
9090
}, 1)
9191
});
9292
t.on('finish', common.mustCall(function() {
9393
state++;
9494
// finishListener
95-
assert.strictEqual(state, 13);
95+
assert.strictEqual(state, 14);
9696
}, 1));
9797
t.on('end', common.mustCall(function() {
9898
state++;
@@ -108,5 +108,5 @@ t.write(4);
108108
t.end(7, common.mustCall(function() {
109109
state++;
110110
// endMethodCallback
111-
assert.strictEqual(state, 14);
111+
assert.strictEqual(state, 15);
112112
}, 1));

test/parallel/test-stream-writable-finished.js

+56
Original file line numberDiff line numberDiff line change
@@ -41,3 +41,59 @@ const assert = require('assert');
4141
w.end();
4242
w.on('finish', common.mustCall());
4343
}
44+
45+
{
46+
// Emit prefinish synchronously
47+
48+
const w = new Writable({
49+
write(chunk, encoding, cb) {
50+
cb();
51+
}
52+
});
53+
54+
let sync = true;
55+
w.on('prefinish', common.mustCall(() => {
56+
assert.strictEqual(sync, true);
57+
}));
58+
w.end();
59+
sync = false;
60+
}
61+
62+
{
63+
// Emit prefinish synchronously w/ final
64+
65+
const w = new Writable({
66+
write(chunk, encoding, cb) {
67+
cb();
68+
},
69+
final(cb) {
70+
cb();
71+
}
72+
});
73+
74+
let sync = true;
75+
w.on('prefinish', common.mustCall(() => {
76+
assert.strictEqual(sync, true);
77+
}));
78+
w.end();
79+
sync = false;
80+
}
81+
82+
83+
{
84+
// Call _final synchronouslyl
85+
86+
let sync = true;
87+
const w = new Writable({
88+
write(chunk, encoding, cb) {
89+
cb();
90+
},
91+
final: common.mustCall((cb) => {
92+
assert.strictEqual(sync, true);
93+
cb();
94+
})
95+
});
96+
97+
w.end();
98+
sync = false;
99+
}

0 commit comments

Comments
 (0)