Skip to content

Commit cb32f69

Browse files
ronagtargos
authored andcommitted
stream: cleanup async handling
Cleanup async stream method handling. PR-URL: #39329 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: James M Snell <[email protected]>
1 parent 1fc6382 commit cb32f69

File tree

4 files changed

+63
-113
lines changed

4 files changed

+63
-113
lines changed

Diff for: lib/internal/streams/destroy.js

+36-94
Original file line numberDiff line numberDiff line change
@@ -69,12 +69,16 @@ function destroy(err, cb) {
6969

7070
function _destroy(self, err, cb) {
7171
let called = false;
72-
const result = self._destroy(err || null, (err) => {
73-
const r = self._readableState;
74-
const w = self._writableState;
7572

73+
function onDestroy(err) {
74+
if (called) {
75+
return;
76+
}
7677
called = true;
7778

79+
const r = self._readableState;
80+
const w = self._writableState;
81+
7882
checkError(err, w, r);
7983

8084
if (w) {
@@ -93,64 +97,24 @@ function _destroy(self, err, cb) {
9397
} else {
9498
process.nextTick(emitCloseNT, self);
9599
}
96-
});
97-
if (result !== undefined && result !== null) {
98-
try {
100+
}
101+
try {
102+
const result = self._destroy(err || null, onDestroy);
103+
if (result != null) {
99104
const then = result.then;
100105
if (typeof then === 'function') {
101106
then.call(
102107
result,
103108
function() {
104-
if (called)
105-
return;
106-
107-
const r = self._readableState;
108-
const w = self._writableState;
109-
110-
if (w) {
111-
w.closed = true;
112-
}
113-
if (r) {
114-
r.closed = true;
115-
}
116-
117-
if (typeof cb === 'function') {
118-
process.nextTick(cb);
119-
}
120-
121-
process.nextTick(emitCloseNT, self);
109+
process.nextTick(onDestroy, null);
122110
},
123111
function(err) {
124-
const r = self._readableState;
125-
const w = self._writableState;
126-
err.stack; // eslint-disable-line no-unused-expressions
127-
128-
called = true;
129-
130-
if (w && !w.errored) {
131-
w.errored = err;
132-
}
133-
if (r && !r.errored) {
134-
r.errored = err;
135-
}
136-
137-
if (w) {
138-
w.closed = true;
139-
}
140-
if (r) {
141-
r.closed = true;
142-
}
143-
144-
if (typeof cb === 'function') {
145-
process.nextTick(cb, err);
146-
}
147-
148-
process.nextTick(emitErrorCloseNT, self, err);
112+
process.nextTick(onDestroy, err);
149113
});
150114
}
151-
} catch (err) {
152-
process.nextTick(emitErrorNT, self, err);
153115
}
116+
} catch (err) {
117+
onDestroy(err);
154118
}
155119
}
156120

@@ -284,74 +248,52 @@ function construct(stream, cb) {
284248
}
285249

286250
function constructNT(stream) {
287-
const r = stream._readableState;
288-
const w = stream._writableState;
289-
// With duplex streams we use the writable side for state.
290-
const s = w || r;
291-
292251
let called = false;
293-
const result = stream._construct((err) => {
252+
253+
function onConstruct(err) {
254+
if (called) {
255+
errorOrDestroy(stream, err ?? new ERR_MULTIPLE_CALLBACK());
256+
return;
257+
}
258+
called = true;
259+
260+
const r = stream._readableState;
261+
const w = stream._writableState;
262+
const s = w || r;
263+
294264
if (r) {
295265
r.constructed = true;
296266
}
297267
if (w) {
298268
w.constructed = true;
299269
}
300270

301-
if (called) {
302-
err = new ERR_MULTIPLE_CALLBACK();
303-
} else {
304-
called = true;
305-
}
306-
307271
if (s.destroyed) {
308272
stream.emit(kDestroy, err);
309273
} else if (err) {
310274
errorOrDestroy(stream, err, true);
311275
} else {
312276
process.nextTick(emitConstructNT, stream);
313277
}
314-
});
315-
if (result !== undefined && result !== null) {
316-
try {
278+
}
279+
280+
try {
281+
const result = stream._construct(onConstruct);
282+
if (result != null) {
317283
const then = result.then;
318284
if (typeof then === 'function') {
319285
then.call(
320286
result,
321287
function() {
322-
// If the callback was invoked, do nothing further.
323-
if (called)
324-
return;
325-
if (r) {
326-
r.constructed = true;
327-
}
328-
if (w) {
329-
w.constructed = true;
330-
}
331-
if (s.destroyed) {
332-
process.nextTick(() => stream.emit(kDestroy));
333-
} else {
334-
process.nextTick(emitConstructNT, stream);
335-
}
288+
process.nextTick(onConstruct, null);
336289
},
337290
function(err) {
338-
if (r) {
339-
r.constructed = true;
340-
}
341-
if (w) {
342-
w.constructed = true;
343-
}
344-
called = true;
345-
if (s.destroyed) {
346-
process.nextTick(() => stream.emit(kDestroy, err));
347-
} else {
348-
process.nextTick(errorOrDestroy, stream, err);
349-
}
291+
process.nextTick(onConstruct, err);
350292
});
351293
}
352-
} catch (err) {
353-
process.nextTick(emitErrorNT, stream, err);
354294
}
295+
} catch (err) {
296+
onConstruct(err);
355297
}
356298
}
357299

Diff for: lib/internal/streams/readable.js

+2
Original file line numberDiff line numberDiff line change
@@ -479,8 +479,10 @@ Readable.prototype.read = function(n) {
479479
// If the length is currently zero, then we *need* a readable event.
480480
if (state.length === 0)
481481
state.needReadable = true;
482+
482483
// Call internal read method
483484
this._read(state.highWaterMark);
485+
484486
state.sync = false;
485487
// If _read pushed data synchronously, then `reading` will be false,
486488
// and we need to re-evaluate how much data we can return to the user.

Diff for: lib/internal/streams/writable.js

+22-19
Original file line numberDiff line numberDiff line change
@@ -660,9 +660,15 @@ function needFinish(state) {
660660
}
661661

662662
function callFinal(stream, state) {
663-
state.sync = true;
664-
state.pendingcb++;
665-
const result = stream._final((err) => {
663+
let called = false;
664+
665+
function onFinish(err) {
666+
if (called) {
667+
errorOrDestroy(stream, err ?? ERR_MULTIPLE_CALLBACK());
668+
return;
669+
}
670+
called = true;
671+
666672
state.pendingcb--;
667673
if (err) {
668674
const onfinishCallbacks = state[kOnFinished].splice(0);
@@ -679,33 +685,30 @@ function callFinal(stream, state) {
679685
state.pendingcb++;
680686
process.nextTick(finish, stream, state);
681687
}
682-
});
683-
if (result !== undefined && result !== null) {
684-
try {
688+
}
689+
690+
state.sync = true;
691+
state.pendingcb++;
692+
693+
try {
694+
const result = stream._final(onFinish);
695+
if (result != null) {
685696
const then = result.then;
686697
if (typeof then === 'function') {
687698
then.call(
688699
result,
689700
function() {
690-
if (state.prefinished || !needFinish(state))
691-
return;
692-
state.prefinish = true;
693-
process.nextTick(() => stream.emit('prefinish'));
694-
state.pendingcb++;
695-
process.nextTick(finish, stream, state);
701+
process.nextTick(onFinish, null);
696702
},
697703
function(err) {
698-
const onfinishCallbacks = state[kOnFinished].splice(0);
699-
for (let i = 0; i < onfinishCallbacks.length; i++) {
700-
process.nextTick(onfinishCallbacks[i], err);
701-
}
702-
process.nextTick(errorOrDestroy, stream, err, state.sync);
704+
process.nextTick(onFinish, err);
703705
});
704706
}
705-
} catch (err) {
706-
process.nextTick(errorOrDestroy, stream, err, state.sync);
707707
}
708+
} catch (err) {
709+
onFinish(stream, state, err);
708710
}
711+
709712
state.sync = false;
710713
}
711714

Diff for: test/parallel/test-stream-construct-async-error.js

+3
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,9 @@ const assert = require('assert');
9898

9999
const foo = new Foo();
100100
foo.write('test', common.mustCall());
101+
foo.on('error', common.expectsError({
102+
code: 'ERR_MULTIPLE_CALLBACK'
103+
}));
101104
}
102105

103106
{

0 commit comments

Comments
 (0)