Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 607439b

Browse files
committedMay 2, 2021
stream: Bypass legacy destroy for pipeline and async iteration.
1 parent 369f239 commit 607439b

File tree

4 files changed

+45
-5
lines changed

4 files changed

+45
-5
lines changed
 

‎lib/_http_incoming.js

+9
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ const {
3333
} = primordials;
3434

3535
const { Readable, finished } = require('stream');
36+
const { kDestroy, destroy } = require('internal/stream')
3637

3738
const kHeaders = Symbol('kHeaders');
3839
const kHeadersCount = Symbol('kHeadersCount');
@@ -172,6 +173,14 @@ IncomingMessage.prototype._read = function _read(n) {
172173
readStart(this.socket);
173174
};
174175

176+
IncomingMessage.prototype[kDestroy] = function (err) {
177+
if (!err && !this.res.destroyed) {
178+
this.socket = null
179+
}
180+
181+
this.destroy(err)
182+
}
183+
175184
// It's possible that the socket will be destroyed, and removed from
176185
// any messages, before ever calling this. In that case, just skip
177186
// it, since something else is destroying this connection anyway.

‎lib/_http_outgoing.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ const {
7878
} = require('internal/errors');
7979
const { validateString } = require('internal/validators');
8080
const { isUint8Array } = require('internal/util/types');
81+
const { kDestroy } = require('internal/stream')
8182

8283
let debug = require('internal/util/debuglog').debuglog('http', (fn) => {
8384
debug = fn;
@@ -298,7 +299,6 @@ OutgoingMessage.prototype.setTimeout = function setTimeout(msecs, callback) {
298299
return this;
299300
};
300301

301-
302302
// It's possible that the socket will be destroyed, and removed from
303303
// any messages, before ever calling this. In that case, just skip
304304
// it, since something else is destroying this connection anyway.

‎lib/internal/streams/destroy.js

+33-4
Original file line numberDiff line numberDiff line change
@@ -366,15 +366,44 @@ function isRequest(stream) {
366366
return stream && stream.setHeader && typeof stream.abort === 'function';
367367
}
368368

369+
const kDestroyed = Symbol('kDestroyed');
370+
369371
// Normalize destroy for legacy.
370372
function destroyer(stream, err) {
371-
if (isRequest(stream)) return stream.abort();
372-
if (isRequest(stream.req)) return stream.req.abort();
373-
if (typeof stream.destroy === 'function') return stream.destroy(err);
374-
if (typeof stream.close === 'function') return stream.close();
373+
if (stream[kDestroyed]) {
374+
return
375+
}
376+
377+
if (typeof stream[kDestroy] === 'function') {
378+
stream[kDestroy]();
379+
} else if (isRequest(stream)) {
380+
stream.abort();
381+
} else if (isRequest(stream.req)) {
382+
stream.req.abort();
383+
} else if (typeof stream.destroy === 'function') {
384+
stream.destroy(err);
385+
} else if (typeof stream.close === 'function') {
386+
stream.close();
387+
} else if (err) {
388+
process.nextTick((stream, err) => {
389+
stream.emit('error', err);
390+
process.nextTick((stream) => {
391+
stream.emit('close');
392+
}, stream);
393+
}, stream, err);
394+
} else {
395+
process.nextTick((stream) => {
396+
stream.emit('close');
397+
}, stream);
398+
}
399+
400+
if (!stream.destroyed) {
401+
stream[kDestroyed] = true;
402+
}
375403
}
376404

377405
module.exports = {
406+
kDestroy,
378407
construct,
379408
destroyer,
380409
destroy,

‎lib/stream.js

+2
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ const {
3030
} = require('internal/util');
3131

3232
const pipeline = require('internal/streams/pipeline');
33+
const { destroyer } = require('internal/streams/destroy');
3334
const eos = require('internal/streams/end-of-stream');
3435
const internalBuffer = require('internal/buffer');
3536

@@ -46,6 +47,7 @@ Stream.pipeline = pipeline;
4647
const { addAbortSignal } = require('internal/streams/add-abort-signal');
4748
Stream.addAbortSignal = addAbortSignal;
4849
Stream.finished = eos;
50+
Stream.destroy = destroyer;
4951

5052
function lazyLoadPromises() {
5153
if (promises === null) promises = require('stream/promises');

0 commit comments

Comments
 (0)
Please sign in to comment.