Skip to content

Commit 07c7f19

Browse files
calvinmetcalfjasnell
authored andcommitted
stream: add final method
Adds the ability to for write streams to have an _final method which acts similarly to the _flush method that transform streams have but is called before the finish event is emitted and if asynchronous delays the stream from finishing. The `final` option may also be passed in order to set it. PR-URL: #12828 Reviewed-By: James M Snell <[email protected]> Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Refael Ackermann <[email protected]>
1 parent 87cef63 commit 07c7f19

8 files changed

+317
-32
lines changed

doc/api/stream.md

+25-3
Original file line numberDiff line numberDiff line change
@@ -1198,7 +1198,8 @@ on the type of stream being created, as detailed in the chart below:
11981198
<p>[Writable](#stream_class_stream_writable)</p>
11991199
</td>
12001200
<td>
1201-
<p><code>[_write][stream-_write]</code>, <code>[_writev][stream-_writev]</code></p>
1201+
<p><code>[_write][stream-_write]</code>, <code>[_writev][stream-_writev]</code>,
1202+
<code>[_final][stream-_final]</code></p>
12021203
</td>
12031204
</tr>
12041205
<tr>
@@ -1209,7 +1210,8 @@ on the type of stream being created, as detailed in the chart below:
12091210
<p>[Duplex](#stream_class_stream_duplex)</p>
12101211
</td>
12111212
<td>
1212-
<p><code>[_read][stream-_read]</code>, <code>[_write][stream-_write]</code>, <code>[_writev][stream-_writev]</code></p>
1213+
<p><code>[_read][stream-_read]</code>, <code>[_write][stream-_write]</code>, <code>[_writev][stream-_writev]</code>,
1214+
<code>[_final][stream-_final]</code></p>
12131215
</td>
12141216
</tr>
12151217
<tr>
@@ -1220,7 +1222,8 @@ on the type of stream being created, as detailed in the chart below:
12201222
<p>[Transform](#stream_class_stream_transform)</p>
12211223
</td>
12221224
<td>
1223-
<p><code>[_transform][stream-_transform]</code>, <code>[_flush][stream-_flush]</code></p>
1225+
<p><code>[_transform][stream-_transform]</code>, <code>[_flush][stream-_flush]</code>,
1226+
<code>[_final][stream-_final]</code></p>
12241227
</td>
12251228
</tr>
12261229
</table>
@@ -1279,6 +1282,8 @@ constructor and implement the `writable._write()` method. The
12791282
[`stream._writev()`][stream-_writev] method.
12801283
* `destroy` {Function} Implementation for the
12811284
[`stream._destroy()`][writable-_destroy] method.
1285+
* `final` {Function} Implementation for the
1286+
[`stream._final()`][stream-_final] method.
12821287

12831288
For example:
12841289

@@ -1398,6 +1403,22 @@ added: REPLACEME
13981403
* `callback` {Function} A callback function that takes an optional error argument
13991404
which is invoked when the writable is destroyed.
14001405

1406+
#### writable.\_final(callback)
1407+
<!-- YAML
1408+
added: REPLACEME
1409+
-->
1410+
1411+
* `callback` {Function} Call this function (optionally with an error
1412+
argument) when you are done writing any remaining data.
1413+
1414+
Note: `_final()` **must not** be called directly. It MAY be implemented
1415+
by child classes, and if so, will be called by the internal Writable
1416+
class methods only.
1417+
1418+
This optional function will be called before the stream closes, delaying the
1419+
`finish` event until `callback` is called. This is useful to close resources
1420+
or write buffered data before a stream ends.
1421+
14011422
#### Errors While Writing
14021423

14031424
It is recommended that errors occurring during the processing of the
@@ -2115,6 +2136,7 @@ readable buffer so there is nothing for a user to consume.
21152136
[stream-_transform]: #stream_transform_transform_chunk_encoding_callback
21162137
[stream-_write]: #stream_writable_write_chunk_encoding_callback_1
21172138
[stream-_writev]: #stream_writable_writev_chunks_callback
2139+
[stream-_final]: #stream_writable_final_callback
21182140
[stream-end]: #stream_writable_end_chunk_encoding_callback
21192141
[stream-pause]: #stream_readable_pause
21202142
[stream-push]: #stream_readable_push_chunk_encoding

lib/_stream_writable.js

+29-6
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,12 @@ function WritableState(options, stream) {
5858
// cast to ints.
5959
this.highWaterMark = Math.floor(this.highWaterMark);
6060

61+
// if _final has been called
62+
this.finalCalled = false;
63+
64+
// if _final has been called
65+
this.finalCalled = false;
66+
6167
// drain event flag.
6268
this.needDrain = false;
6369
// at the start of calling end()
@@ -199,6 +205,9 @@ function Writable(options) {
199205

200206
if (typeof options.destroy === 'function')
201207
this._destroy = options.destroy;
208+
209+
if (typeof options.final === 'function')
210+
this._final = options.final;
202211
}
203212

204213
Stream.call(this);
@@ -520,23 +529,37 @@ function needFinish(state) {
520529
!state.finished &&
521530
!state.writing);
522531
}
523-
524-
function prefinish(stream, state) {
525-
if (!state.prefinished) {
532+
function callFinal(stream, state) {
533+
stream._final((err) => {
534+
state.pendingcb--;
535+
if (err) {
536+
stream.emit('error', err);
537+
}
526538
state.prefinished = true;
527539
stream.emit('prefinish');
540+
finishMaybe(stream, state);
541+
});
542+
}
543+
function prefinish(stream, state) {
544+
if (!state.prefinished && !state.finalCalled) {
545+
if (typeof stream._final === 'function') {
546+
state.pendingcb++;
547+
state.finalCalled = true;
548+
process.nextTick(callFinal, stream, state);
549+
} else {
550+
state.prefinished = true;
551+
stream.emit('prefinish');
552+
}
528553
}
529554
}
530555

531556
function finishMaybe(stream, state) {
532557
var need = needFinish(state);
533558
if (need) {
559+
prefinish(stream, state);
534560
if (state.pendingcb === 0) {
535-
prefinish(stream, state);
536561
state.finished = true;
537562
stream.emit('finish');
538-
} else {
539-
prefinish(stream, state);
540563
}
541564
}
542565
return need;
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,11 @@
11
'use strict';
2-
require('../common');
3-
const assert = require('assert');
2+
const common = require('../common');
43

54
const Readable = require('stream').Readable;
65

7-
let _readCalled = false;
8-
function _read(n) {
9-
_readCalled = true;
6+
const _read = common.mustCall(function _read(n) {
107
this.push(null);
11-
}
8+
});
129

1310
const r = new Readable({ read: _read });
1411
r.resume();
15-
16-
process.on('exit', function() {
17-
assert.strictEqual(r._read, _read);
18-
assert(_readCalled);
19-
});
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,25 @@
11
'use strict';
2-
require('../common');
2+
const common = require('../common');
33
const assert = require('assert');
44

55
const Transform = require('stream').Transform;
66

7-
let _transformCalled = false;
8-
function _transform(d, e, n) {
9-
_transformCalled = true;
7+
const _transform = common.mustCall(function _transform(d, e, n) {
108
n();
11-
}
9+
});
1210

13-
let _flushCalled = false;
14-
function _flush(n) {
15-
_flushCalled = true;
11+
const _final = common.mustCall(function _final(n) {
1612
n();
17-
}
13+
});
14+
15+
const _flush = common.mustCall(function _flush(n) {
16+
n();
17+
});
1818

1919
const t = new Transform({
2020
transform: _transform,
21-
flush: _flush
21+
flush: _flush,
22+
final: _final
2223
});
2324

2425
const t2 = new Transform({});
@@ -34,6 +35,5 @@ assert.throws(() => {
3435
process.on('exit', () => {
3536
assert.strictEqual(t._transform, _transform);
3637
assert.strictEqual(t._flush, _flush);
37-
assert.strictEqual(_transformCalled, true);
38-
assert.strictEqual(_flushCalled, true);
38+
assert.strictEqual(t._final, _final);
3939
});
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
'use strict';
2+
const common = require('../common');
3+
const assert = require('assert');
4+
5+
const stream = require('stream');
6+
let state = 0;
7+
8+
/*
9+
What you do
10+
var stream = new tream.Transform({
11+
transform: function transformCallback(chunk, _, next) {
12+
// part 1
13+
this.push(chunk);
14+
//part 2
15+
next();
16+
},
17+
final: function endCallback(done) {
18+
// part 1
19+
process.nextTick(function () {
20+
// part 2
21+
done();
22+
});
23+
},
24+
flush: function flushCallback(done) {
25+
// part 1
26+
process.nextTick(function () {
27+
// part 2
28+
done();
29+
});
30+
}
31+
});
32+
t.on('data', dataListener);
33+
t.on('end', endListener);
34+
t.on('finish', finishListener);
35+
t.write(1);
36+
t.write(4);
37+
t.end(7, endMethodCallback);
38+
39+
The order things are called
40+
41+
1. transformCallback part 1
42+
2. dataListener
43+
3. transformCallback part 2
44+
4. transformCallback part 1
45+
5. dataListener
46+
6. transformCallback part 2
47+
7. transformCallback part 1
48+
8. dataListener
49+
9. transformCallback part 2
50+
10. finalCallback part 1
51+
11. finalCallback part 2
52+
12. flushCallback part 1
53+
13. finishListener
54+
14. endMethodCallback
55+
15. flushCallback part 2
56+
16. endListener
57+
*/
58+
59+
const t = new stream.Transform({
60+
objectMode: true,
61+
transform: common.mustCall(function(chunk, _, next) {
62+
assert.strictEqual(++state, chunk, 'transformCallback part 1');
63+
this.push(state);
64+
assert.strictEqual(++state, chunk + 2, 'transformCallback part 2');
65+
process.nextTick(next);
66+
}, 3),
67+
final: common.mustCall(function(done) {
68+
state++;
69+
assert.strictEqual(state, 10, 'finalCallback part 1');
70+
state++;
71+
assert.strictEqual(state, 11, 'finalCallback part 2');
72+
done();
73+
}, 1),
74+
flush: common.mustCall(function(done) {
75+
state++;
76+
assert.strictEqual(state, 12, 'flushCallback part 1');
77+
process.nextTick(function() {
78+
state++;
79+
assert.strictEqual(state, 15, 'flushCallback part 2');
80+
done();
81+
});
82+
}, 1)
83+
});
84+
t.on('finish', common.mustCall(function() {
85+
state++;
86+
assert.strictEqual(state, 13, 'finishListener');
87+
}, 1));
88+
t.on('end', common.mustCall(function() {
89+
state++;
90+
assert.strictEqual(state, 16, 'end event');
91+
}, 1));
92+
t.on('data', common.mustCall(function(d) {
93+
assert.strictEqual(++state, d + 1, 'dataListener');
94+
}, 3));
95+
t.write(1);
96+
t.write(4);
97+
t.end(7, common.mustCall(function() {
98+
state++;
99+
assert.strictEqual(state, 14, 'endMethodCallback');
100+
}, 1));

0 commit comments

Comments
 (0)