Skip to content

Commit d914605

Browse files
committed
stream: Bypass legacy destroy for pipeline and async iteration.
1 parent 4e17ffc commit d914605

File tree

6 files changed

+187
-13
lines changed

6 files changed

+187
-13
lines changed

lib/_http_incoming.js

+10-2
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ const {
3131
} = primordials;
3232

3333
const { Readable, finished } = require('stream');
34+
const { kDestroy } = require('internal/streams/destroy');
3435

3536
const kHeaders = Symbol('kHeaders');
3637
const kHeadersCount = Symbol('kHeadersCount');
@@ -188,13 +189,20 @@ IncomingMessage.prototype._destroy = function _destroy(err, cb) {
188189
this.socket.destroy(err);
189190
const cleanup = finished(this.socket, (e) => {
190191
cleanup();
191-
onError(this, e || err, cb);
192+
process.nextTick(onError, this, e || err, cb);
192193
});
193194
} else {
194-
onError(this, err, cb);
195+
process.nextTick(onError, this, err, cb);
195196
}
196197
};
197198

199+
IncomingMessage.prototype[kDestroy] = function (err) {
200+
if (this.req && !this.req.destroyed) {
201+
this.socket = null;
202+
}
203+
this.destroy(err);
204+
};
205+
198206
IncomingMessage.prototype._addHeaderLines = _addHeaderLines;
199207
function _addHeaderLines(headers, n) {
200208
if (headers && headers.length) {

lib/_http_outgoing.js

-1
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,6 @@ OutgoingMessage.prototype.destroy = function destroy(error) {
313313
return this;
314314
};
315315

316-
317316
// This abstract either writing directly to the socket or buffering it.
318317
OutgoingMessage.prototype._send = function _send(data, encoding, callback) {
319318
// This is a shameful hack to get the headers and first body chunk onto

lib/_http_server.js

+6-6
Original file line numberDiff line numberDiff line change
@@ -231,9 +231,7 @@ function onServerResponseClose() {
231231
// where the ServerResponse object has already been deconstructed.
232232
// Fortunately, that requires only a single if check. :-)
233233
if (this._httpMessage) {
234-
this._httpMessage.destroyed = true;
235-
this._httpMessage._closed = true;
236-
this._httpMessage.emit('close');
234+
emitCloseNT(this._httpMessage);
237235
}
238236
}
239237

@@ -837,9 +835,11 @@ function resOnFinish(req, res, socket, state, server) {
837835
}
838836

839837
function emitCloseNT(self) {
840-
self.destroyed = true;
841-
self._closed = true;
842-
self.emit('close');
838+
if (!self.destroyed) {
839+
self.destroyed = true;
840+
self._closed = true;
841+
self.emit('close');
842+
}
843843
}
844844

845845
// The following callback is issued after the headers have been read on a

lib/internal/streams/destroy.js

+54-4
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
'use strict';
22

3+
const console = require('console');
34
const {
45
aggregateTwoErrors,
56
codes: {
67
ERR_MULTIPLE_CALLBACK,
78
},
9+
AbortError,
810
} = require('internal/errors');
911
const {
1012
Symbol,
@@ -363,15 +365,63 @@ function isRequest(stream) {
363365
return stream && stream.setHeader && typeof stream.abort === 'function';
364366
}
365367

368+
const kDestroyed = Symbol('kDestroyed');
369+
370+
function emitCloseLegacy (stream) {
371+
stream.emit('close');
372+
}
373+
374+
function emitErrorCloseLegacy(stream, err) {
375+
stream.emit('error', err);
376+
process.nextTick(emitCloseLegacy, stream);
377+
}
378+
379+
function isDestroyed (stream) {
380+
return stream.destroyed || stream[kDestroyed];
381+
}
382+
383+
function isReadable (stream) {
384+
return stream.readable && !stream.readableEnded && !isDestroyed(stream);
385+
}
386+
387+
function isWritable (stream) {
388+
return stream.writable && !stream.writableEnded && !isDestroyed(stream);
389+
}
390+
366391
// Normalize destroy for legacy.
367392
function destroyer(stream, err) {
368-
if (isRequest(stream)) return stream.abort();
369-
if (isRequest(stream.req)) return stream.req.abort();
370-
if (typeof stream.destroy === 'function') return stream.destroy(err);
371-
if (typeof stream.close === 'function') return stream.close();
393+
if (isDestroyed(stream)) {
394+
return;
395+
}
396+
397+
if (!err && (isReadable(stream) || isWritable(stream))) {
398+
err = new AbortError();
399+
}
400+
401+
if (typeof stream[kDestroy] === 'function') {
402+
stream[kDestroy](err);
403+
} else if (isRequest(stream)) {
404+
stream.abort();
405+
} else if (isRequest(stream.req)) {
406+
stream.req.abort();
407+
} else if (typeof stream.destroy === 'function') {
408+
stream.destroy(err);
409+
} else if (typeof stream.close === 'function') {
410+
stream.close();
411+
} else if (err) {
412+
process.nextTick(emitErrorCloseLegacy, stream);
413+
} else {
414+
process.nextTick(emitCloseLegacy, stream);
415+
}
416+
417+
if (!stream.destroyed) {
418+
stream[kDestroyed] = true;
419+
}
372420
}
373421

374422
module.exports = {
423+
kDestroy,
424+
isDestroyed,
375425
construct,
376426
destroyer,
377427
destroy,

lib/stream.js

+2
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ const {
3030
} = require('internal/util');
3131

3232
const pipeline = require('internal/streams/pipeline');
33+
const { destroyer } = require('internal/streams/destroy');
3334
const eos = require('internal/streams/end-of-stream');
3435
const internalBuffer = require('internal/buffer');
3536

@@ -45,6 +46,7 @@ Stream.pipeline = pipeline;
4546
const { addAbortSignal } = require('internal/streams/add-abort-signal');
4647
Stream.addAbortSignal = addAbortSignal;
4748
Stream.finished = eos;
49+
Stream.destroy = destroyer;
4850

4951
ObjectDefineProperty(Stream, 'promises', {
5052
configurable: true,

test/parallel/test-stream-destroy.js

+115
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const {
5+
Writable,
6+
Readable,
7+
destroy
8+
} = require('stream');
9+
const assert = require('assert');
10+
const http = require('http');
11+
12+
{
13+
const r = new Readable({ read() {} });
14+
destroy(r);
15+
assert.strictEqual(r.destroyed, true);
16+
r.on('error', common.mustCall(err => {
17+
assert.strictEqual(err.name, 'AbortError');
18+
}));
19+
r.on('close', common.mustCall());
20+
}
21+
22+
{
23+
const r = new Readable({ read() {} });
24+
destroy(r, new Error('asd'));
25+
assert.strictEqual(r.destroyed, true);
26+
r.on('error', common.mustCall(err => {
27+
assert.strictEqual(err.message, 'asd');
28+
}));
29+
r.on('close', common.mustCall());
30+
}
31+
32+
{
33+
const w = new Writable({ write() {} });
34+
destroy(w);
35+
assert.strictEqual(w.destroyed, true);
36+
w.on('error', common.mustCall(err => {
37+
assert.strictEqual(err.name, 'AbortError');
38+
}));
39+
w.on('close', common.mustCall());
40+
}
41+
42+
{
43+
const w = new Writable({ write() {} });
44+
destroy(w, new Error('asd'));
45+
assert.strictEqual(w.destroyed, true);
46+
w.on('error', common.mustCall(err => {
47+
assert.strictEqual(err.message, 'asd');
48+
}));
49+
w.on('close', common.mustCall());
50+
}
51+
52+
{
53+
const server = http.createServer((req, res) => {
54+
destroy(req);
55+
req.on('error', common.mustCall((err) => {
56+
assert.strictEqual(err.name, 'AbortError');
57+
}));
58+
req.on('close', common.mustCall(() => {
59+
res.end('hello');
60+
}));
61+
});
62+
63+
server.listen(0, () => {
64+
const req = http.request({
65+
port: server.address().port
66+
});
67+
68+
req.write('asd');
69+
req.on('response', (res) => {
70+
const buf = [];
71+
res.on('data', (data) => buf.push(data));
72+
res.on('end', common.mustCall(() => {
73+
assert.deepStrictEqual(
74+
Buffer.concat(buf),
75+
Buffer.from('hello')
76+
);
77+
server.close();
78+
}));
79+
});
80+
});
81+
}
82+
83+
{
84+
const server = http.createServer((req, res) => {
85+
req
86+
.resume()
87+
.on('end', () => {
88+
destroy(req);
89+
})
90+
.on('error', common.mustNotCall());
91+
92+
req.on('close', common.mustCall(() => {
93+
res.end('hello');
94+
}));
95+
});
96+
97+
server.listen(0, () => {
98+
const req = http.request({
99+
port: server.address().port
100+
});
101+
102+
req.write('asd');
103+
req.on('response', (res) => {
104+
const buf = [];
105+
res.on('data', (data) => buf.push(data));
106+
res.on('end', common.mustCall(() => {
107+
assert.deepStrictEqual(
108+
Buffer.concat(buf),
109+
Buffer.from('hello')
110+
);
111+
server.close();
112+
}));
113+
});
114+
});
115+
}

0 commit comments

Comments
 (0)