Skip to content

Commit cc2393c

Browse files
ntedgiRafaelGSS
authored andcommitted
lib: promise version of streams.finished call clean up
implement autoCleanup logic. update docs add autoCleanup description ref: #44556 PR-URL: #44862 Refs: #44556 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Robert Nagy <[email protected]>
1 parent 17ef1bb commit cc2393c

File tree

3 files changed

+72
-13
lines changed

3 files changed

+72
-13
lines changed

doc/api/stream.md

+5
Original file line numberDiff line numberDiff line change
@@ -2363,6 +2363,7 @@ changes:
23632363
-->
23642364

23652365
* `stream` {Stream} A readable and/or writable stream.
2366+
23662367
* `options` {Object}
23672368
* `error` {boolean} If set to `false`, then a call to `emit('error', err)` is
23682369
not treated as finished. **Default:** `true`.
@@ -2376,8 +2377,12 @@ changes:
23762377
underlying stream will _not_ be aborted if the signal is aborted. The
23772378
callback will get called with an `AbortError`. All registered
23782379
listeners added by this function will also be removed.
2380+
* `cleanup` {boolean} remove all registered stream listeners.
2381+
**Default:** `false`.
2382+
23792383
* `callback` {Function} A callback function that takes an optional error
23802384
argument.
2385+
23812386
* Returns: {Function} A cleanup function which removes all registered
23822387
listeners.
23832388

lib/internal/streams/end-of-stream.js

+13-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ const {
1919
validateAbortSignal,
2020
validateFunction,
2121
validateObject,
22+
validateBoolean
2223
} = require('internal/validators');
2324

2425
const { Promise } = primordials;
@@ -243,8 +244,19 @@ function eos(stream, options, callback) {
243244
}
244245

245246
function finished(stream, opts) {
247+
let autoCleanup = false;
248+
if (opts === null) {
249+
opts = kEmptyObject;
250+
}
251+
if (opts?.cleanup) {
252+
validateBoolean(opts.cleanup, 'cleanup');
253+
autoCleanup = opts.cleanup;
254+
}
246255
return new Promise((resolve, reject) => {
247-
eos(stream, opts, (err) => {
256+
const cleanup = eos(stream, opts, (err) => {
257+
if (autoCleanup) {
258+
cleanup();
259+
}
248260
if (err) {
249261
reject(err);
250262
} else {

test/parallel/test-stream-promises.js

+54-12
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,10 @@
33
const common = require('../common');
44
const stream = require('stream');
55
const {
6-
Readable,
7-
Writable,
8-
promises,
6+
Readable, Writable, promises,
97
} = stream;
108
const {
11-
finished,
12-
pipeline,
9+
finished, pipeline,
1310
} = require('stream/promises');
1411
const fs = require('fs');
1512
const assert = require('assert');
@@ -24,14 +21,11 @@ assert.strictEqual(finished, promisify(stream.finished));
2421
{
2522
let finished = false;
2623
const processed = [];
27-
const expected = [
28-
Buffer.from('a'),
29-
Buffer.from('b'),
30-
Buffer.from('c'),
31-
];
24+
const expected = [Buffer.from('a'), Buffer.from('b'), Buffer.from('c')];
3225

3326
const read = new Readable({
34-
read() { }
27+
read() {
28+
}
3529
});
3630

3731
const write = new Writable({
@@ -59,7 +53,8 @@ assert.strictEqual(finished, promisify(stream.finished));
5953
// pipeline error
6054
{
6155
const read = new Readable({
62-
read() { }
56+
read() {
57+
}
6358
});
6459

6560
const write = new Writable({
@@ -101,3 +96,50 @@ assert.strictEqual(finished, promisify(stream.finished));
10196
code: 'ENOENT'
10297
}).then(common.mustCall());
10398
}
99+
100+
{
101+
const streamObj = new Readable();
102+
assert.throws(() => {
103+
// Passing cleanup option not as boolean
104+
// should throw error
105+
finished(streamObj, { cleanup: 2 });
106+
}, { code: 'ERR_INVALID_ARG_TYPE' });
107+
}
108+
109+
// Below code should not throw any errors as the
110+
// streamObj is `Stream` and cleanup is boolean
111+
{
112+
const streamObj = new Readable();
113+
finished(streamObj, { cleanup: true });
114+
}
115+
116+
117+
// Cleanup function should not be called when cleanup is set to false
118+
// listenerCount should be 1 after calling finish
119+
{
120+
const streamObj = new Writable();
121+
assert.strictEqual(streamObj.listenerCount('end'), 0);
122+
finished(streamObj, { cleanup: false }).then(() => {
123+
assert.strictEqual(streamObj.listenerCount('end'), 1);
124+
});
125+
}
126+
127+
// Cleanup function should be called when cleanup is set to true
128+
// listenerCount should be 0 after calling finish
129+
{
130+
const streamObj = new Writable();
131+
assert.strictEqual(streamObj.listenerCount('end'), 0);
132+
finished(streamObj, { cleanup: true }).then(() => {
133+
assert.strictEqual(streamObj.listenerCount('end'), 0);
134+
});
135+
}
136+
137+
// Cleanup function should not be called when cleanup has not been set
138+
// listenerCount should be 1 after calling finish
139+
{
140+
const streamObj = new Writable();
141+
assert.strictEqual(streamObj.listenerCount('end'), 0);
142+
finished(streamObj).then(() => {
143+
assert.strictEqual(streamObj.listenerCount('end'), 1);
144+
});
145+
}

0 commit comments

Comments
 (0)