Skip to content

Commit 35b6669

Browse files
ronagtargos
authored andcommitted
stream: use finished for pump
Re-use existing compat logic for pump by using finished. PR-URL: #39203 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Benjamin Gruenbaum <[email protected]>
1 parent dfe5d11 commit 35b6669

File tree

2 files changed

+32
-73
lines changed

2 files changed

+32
-73
lines changed

Diff for: lib/internal/streams/pipeline.js

+32-40
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,16 @@ const {
2020
ERR_INVALID_RETURN_VALUE,
2121
ERR_MISSING_ARGS,
2222
ERR_STREAM_DESTROYED,
23-
ERR_STREAM_PREMATURE_CLOSE,
2423
},
2524
} = require('internal/errors');
2625

2726
const { validateCallback } = require('internal/validators');
2827

29-
function noop() {}
30-
3128
const {
3229
isIterable,
3330
isReadable,
3431
isStream,
3532
} = require('internal/streams/utils');
36-
const assert = require('internal/assert');
3733

3834
let PassThrough;
3935
let Readable;
@@ -109,62 +105,58 @@ async function* fromReadable(val) {
109105

110106
async function pump(iterable, writable, finish) {
111107
let error;
112-
let callback = noop;
108+
let onresolve = null;
109+
113110
const resume = (err) => {
114-
error = aggregateTwoErrors(error, err);
115-
const _callback = callback;
116-
callback = noop;
117-
_callback();
118-
};
119-
const onClose = () => {
120-
resume(new ERR_STREAM_PREMATURE_CLOSE());
111+
if (err) {
112+
error = err;
113+
}
114+
115+
if (onresolve) {
116+
const callback = onresolve;
117+
onresolve = null;
118+
callback();
119+
}
121120
};
122121

123-
const waitForDrain = () => new Promise((resolve) => {
124-
assert(callback === noop);
125-
if (error || writable.destroyed) {
126-
resolve();
122+
const wait = () => new Promise((resolve, reject) => {
123+
if (error) {
124+
reject(error);
127125
} else {
128-
callback = resolve;
126+
onresolve = () => {
127+
if (error) {
128+
reject(error);
129+
} else {
130+
resolve();
131+
}
132+
};
129133
}
130134
});
131135

132-
writable
133-
.on('drain', resume)
134-
.on('error', resume)
135-
.on('close', onClose);
136+
writable.on('drain', resume);
137+
const cleanup = eos(writable, { readable: false }, resume);
136138

137139
try {
138140
if (writable.writableNeedDrain) {
139-
await waitForDrain();
140-
}
141-
142-
if (error) {
143-
return;
141+
await wait();
144142
}
145143

146144
for await (const chunk of iterable) {
147145
if (!writable.write(chunk)) {
148-
await waitForDrain();
146+
await wait();
149147
}
150-
if (error) {
151-
return;
152-
}
153-
}
154-
155-
if (error) {
156-
return;
157148
}
158149

159150
writable.end();
151+
152+
await wait();
153+
154+
finish();
160155
} catch (err) {
161-
error = aggregateTwoErrors(error, err);
156+
finish(error !== err ? aggregateTwoErrors(error, err) : err);
162157
} finally {
163-
writable
164-
.off('drain', resume)
165-
.off('error', resume)
166-
.off('close', onClose);
167-
finish(error);
158+
cleanup();
159+
writable.off('drain', resume);
168160
}
169161
}
170162

Diff for: test/parallel/test-stream-pipeline.js

-33
Original file line numberDiff line numberDiff line change
@@ -1387,36 +1387,3 @@ const net = require('net');
13871387
assert.strictEqual(res, content);
13881388
}));
13891389
}
1390-
1391-
{
1392-
const writableLike = new Stream();
1393-
writableLike.writableNeedDrain = true;
1394-
1395-
pipeline(
1396-
async function *() {},
1397-
writableLike,
1398-
common.mustCall((err) => {
1399-
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
1400-
})
1401-
);
1402-
1403-
writableLike.emit('close');
1404-
}
1405-
1406-
{
1407-
const writableLike = new Stream();
1408-
writableLike.write = () => false;
1409-
1410-
pipeline(
1411-
async function *() {
1412-
yield null;
1413-
yield null;
1414-
},
1415-
writableLike,
1416-
common.mustCall((err) => {
1417-
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
1418-
})
1419-
);
1420-
1421-
writableLike.emit('close');
1422-
}

0 commit comments

Comments
 (0)