Skip to content

Commit c950e76

Browse files
lundibundiMylesBorins
authored andcommitted
http2: wait for session socket writable end on close/destroy
This slightly alters the behaviour of session close by first using .end() on a session socket to finish writing the data and only then calls .destroy() to make sure the Readable side is closed. This allows the socket to finish transmitting data, receive proper FIN packet and avoid ECONNRESET errors upon graceful close. onStreamClose now directly calls stream.destroy() instead of kMaybeDestroy because the latter will first check that the stream has writableFinished set. And that may not be true as we have just (synchronously) called .end() on the stream if it was not closed and that doesn't give it enough time to finish. Furthermore there is no point in waiting for 'finish' as the other party have already closed the stream and we won't be able to write anyway. This also changes a few tests to correctly handle graceful session close. This includes: * not reading request data (on client side) * not reading push stream data (on client side) * relying on socket.destroy() (on client) to finish server session due to the destroy of the socket without closing the server session. As the goaway itself is *not* a session close. Added few 'close' event mustCall checks. Backport-PR-URL: #34845 PR-URL: #30854 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Anna Henningsen <[email protected]> Reviewed-By: James M Snell <[email protected]> Reviewed-By: Rich Trott <[email protected]>
1 parent 1cdf5d8 commit c950e76

10 files changed

+101
-45
lines changed

lib/internal/http2/core.js

+71-36
Original file line numberDiff line numberDiff line change
@@ -501,7 +501,10 @@ function onStreamClose(code) {
501501
if (!stream || stream.destroyed)
502502
return false;
503503

504-
debugStreamObj(stream, 'closed with code %d', code);
504+
debugStreamObj(
505+
stream, 'closed with code %d, closed %s, readable %s',
506+
code, stream.closed, stream.readable
507+
);
505508

506509
if (!stream.closed)
507510
closeStream(stream, code, kNoRstStream);
@@ -510,7 +513,7 @@ function onStreamClose(code) {
510513
// Defer destroy we actually emit end.
511514
if (!stream.readable || code !== NGHTTP2_NO_ERROR) {
512515
// If errored or ended, we can destroy immediately.
513-
stream[kMaybeDestroy](code);
516+
stream.destroy();
514517
} else {
515518
// Wait for end to destroy.
516519
stream.on('end', stream[kMaybeDestroy]);
@@ -1021,22 +1024,76 @@ function emitClose(self, error) {
10211024
self.emit('close');
10221025
}
10231026

1024-
function finishSessionDestroy(session, error) {
1025-
debugSessionObj(session, 'finishSessionDestroy');
1026-
1027+
function cleanupSession(session) {
10271028
const socket = session[kSocket];
1028-
if (!socket.destroyed)
1029-
socket.destroy(error);
1030-
1029+
const handle = session[kHandle];
10311030
session[kProxySocket] = undefined;
10321031
session[kSocket] = undefined;
10331032
session[kHandle] = undefined;
10341033
session[kNativeFields] = new Uint8Array(kSessionUint8FieldCount);
1035-
socket[kSession] = undefined;
1036-
socket[kServer] = undefined;
1034+
if (handle)
1035+
handle.ondone = null;
1036+
if (socket) {
1037+
socket[kSession] = undefined;
1038+
socket[kServer] = undefined;
1039+
}
1040+
}
1041+
1042+
function finishSessionClose(session, error) {
1043+
debugSessionObj(session, 'finishSessionClose');
1044+
1045+
const socket = session[kSocket];
1046+
cleanupSession(session);
1047+
1048+
if (socket && !socket.destroyed) {
1049+
// Always wait for writable side to finish.
1050+
socket.end((err) => {
1051+
debugSessionObj(session, 'finishSessionClose socket end', err);
1052+
// Due to the way the underlying stream is handled in Http2Session we
1053+
// won't get graceful Readable end from the other side even if it was sent
1054+
// as the stream is already considered closed and will neither be read
1055+
// from nor keep the event loop alive.
1056+
// Therefore destroy the socket immediately.
1057+
// Fixing this would require some heavy juggling of ReadStart/ReadStop
1058+
// mostly on Windows as on Unix it will be fine with just ReadStart
1059+
// after this 'ondone' callback.
1060+
socket.destroy(error);
1061+
emitClose(session, error);
1062+
});
1063+
} else {
1064+
process.nextTick(emitClose, session, error);
1065+
}
1066+
}
1067+
1068+
function closeSession(session, code, error) {
1069+
debugSessionObj(session, 'start closing/destroying');
1070+
1071+
const state = session[kState];
1072+
state.flags |= SESSION_FLAGS_DESTROYED;
1073+
state.destroyCode = code;
1074+
1075+
// Clear timeout and remove timeout listeners.
1076+
session.setTimeout(0);
1077+
session.removeAllListeners('timeout');
1078+
1079+
// Destroy any pending and open streams
1080+
if (state.pendingStreams.size > 0 || state.streams.size > 0) {
1081+
const cancel = new ERR_HTTP2_STREAM_CANCEL(error);
1082+
state.pendingStreams.forEach((stream) => stream.destroy(cancel));
1083+
state.streams.forEach((stream) => stream.destroy(error));
1084+
}
10371085

1038-
// Finally, emit the close and error events (if necessary) on next tick.
1039-
process.nextTick(emitClose, session, error);
1086+
// Disassociate from the socket and server.
1087+
const socket = session[kSocket];
1088+
const handle = session[kHandle];
1089+
1090+
// Destroy the handle if it exists at this point.
1091+
if (handle !== undefined) {
1092+
handle.ondone = finishSessionClose.bind(null, session, error);
1093+
handle.destroy(code, socket.destroyed);
1094+
} else {
1095+
finishSessionClose(session, error);
1096+
}
10401097
}
10411098

10421099
// Upon creation, the Http2Session takes ownership of the socket. The session
@@ -1363,6 +1420,7 @@ class Http2Session extends EventEmitter {
13631420
destroy(error = NGHTTP2_NO_ERROR, code) {
13641421
if (this.destroyed)
13651422
return;
1423+
13661424
debugSessionObj(this, 'destroying');
13671425

13681426
if (typeof error === 'number') {
@@ -1374,30 +1432,7 @@ class Http2Session extends EventEmitter {
13741432
if (code === undefined && error != null)
13751433
code = NGHTTP2_INTERNAL_ERROR;
13761434

1377-
const state = this[kState];
1378-
state.flags |= SESSION_FLAGS_DESTROYED;
1379-
state.destroyCode = code;
1380-
1381-
// Clear timeout and remove timeout listeners
1382-
this.setTimeout(0);
1383-
this.removeAllListeners('timeout');
1384-
1385-
// Destroy any pending and open streams
1386-
const cancel = new ERR_HTTP2_STREAM_CANCEL(error);
1387-
state.pendingStreams.forEach((stream) => stream.destroy(cancel));
1388-
state.streams.forEach((stream) => stream.destroy(error));
1389-
1390-
// Disassociate from the socket and server
1391-
const socket = this[kSocket];
1392-
const handle = this[kHandle];
1393-
1394-
// Destroy the handle if it exists at this point
1395-
if (handle !== undefined) {
1396-
handle.ondone = finishSessionDestroy.bind(null, this, error);
1397-
handle.destroy(code, socket.destroyed);
1398-
} else {
1399-
finishSessionDestroy(this, error);
1400-
}
1435+
closeSession(this, code, error);
14011436
}
14021437

14031438
// Closing the session will:

src/node_http2.cc

+1-1
Original file line numberDiff line numberDiff line change
@@ -1827,7 +1827,7 @@ void Http2Session::OnStreamRead(ssize_t nread, const uv_buf_t& buf_) {
18271827
Context::Scope context_scope(env()->context());
18281828
Http2Scope h2scope(this);
18291829
CHECK_NOT_NULL(stream_);
1830-
Debug(this, "receiving %d bytes", nread);
1830+
Debug(this, "receiving %d bytes, offset %d", nread, stream_buf_offset_);
18311831
AllocatedBuffer buf(env(), buf_);
18321832

18331833
// Only pass data on if nread > 0

test/parallel/test-http2-capture-rejection.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,6 @@ events.captureRejections = true;
7272
}));
7373
}
7474

75-
7675
{
7776
// Test error thrown in 'request' event
7877

@@ -136,6 +135,7 @@ events.captureRejections = true;
136135
const session = connect(`http://localhost:${port}`);
137136

138137
const req = session.request();
138+
req.resume();
139139

140140
session.on('stream', common.mustCall(async (stream) => {
141141
session.close();

test/parallel/test-http2-client-destroy.js

+1
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ const Countdown = require('../common/countdown');
145145
server.on('stream', common.mustNotCall());
146146
server.listen(0, common.mustCall(() => {
147147
const client = h2.connect(`http://localhost:${server.address().port}`);
148+
client.on('close', common.mustCall());
148149
const socket = client[kSocket];
149150
socket.on('close', common.mustCall(() => {
150151
assert(socket.destroyed);

test/parallel/test-http2-client-stream-destroy-before-connect.js

+7-2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ if (!common.hasCrypto)
66
const assert = require('assert');
77
const h2 = require('http2');
88
const NGHTTP2_INTERNAL_ERROR = h2.constants.NGHTTP2_INTERNAL_ERROR;
9+
const Countdown = require('../common/countdown');
910

1011
const server = h2.createServer();
1112

@@ -27,6 +28,11 @@ server.on('stream', (stream) => {
2728

2829
server.listen(0, common.mustCall(() => {
2930
const client = h2.connect(`http://localhost:${server.address().port}`);
31+
const countdown = new Countdown(2, () => {
32+
server.close();
33+
client.close();
34+
});
35+
client.on('connect', () => countdown.dec());
3036

3137
const req = client.request();
3238
req.destroy(new Error('test'));
@@ -39,8 +45,7 @@ server.listen(0, common.mustCall(() => {
3945
req.on('close', common.mustCall(() => {
4046
assert.strictEqual(req.rstCode, NGHTTP2_INTERNAL_ERROR);
4147
assert.strictEqual(req.rstCode, NGHTTP2_INTERNAL_ERROR);
42-
server.close();
43-
client.close();
48+
countdown.dec();
4449
}));
4550

4651
req.on('response', common.mustNotCall());

test/parallel/test-http2-compat-client-upload-reject.js

+2
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,11 @@ fs.readFile(loc, common.mustCall((err, data) => {
2323
res.end();
2424
});
2525
}));
26+
server.on('close', common.mustCall());
2627

2728
server.listen(0, common.mustCall(() => {
2829
const client = http2.connect(`http://localhost:${server.address().port}`);
30+
client.on('close', common.mustCall());
2931

3032
const req = client.request({ ':method': 'POST' });
3133
req.on('response', common.mustCall((headers) => {

test/parallel/test-http2-create-client-connect.js

+3-1
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,13 @@ const URL = url.URL;
3838
const client =
3939
h2.connect.apply(null, i)
4040
.on('connect', common.mustCall(() => maybeClose(client)));
41+
client.on('close', common.mustCall());
4142
});
4243

4344
// Will fail because protocol does not match the server.
44-
h2.connect({ port: port, protocol: 'https:' })
45+
const client = h2.connect({ port: port, protocol: 'https:' })
4546
.on('error', common.mustCall(() => serverClose.dec()));
47+
client.on('close', common.mustCall());
4648
}));
4749
}
4850

test/parallel/test-http2-goaway-opaquedata.js

+6-2
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,24 @@ const http2 = require('http2');
88

99
const server = http2.createServer();
1010
const data = Buffer.from([0x1, 0x2, 0x3, 0x4, 0x5]);
11+
let session;
1112

1213
server.on('stream', common.mustCall((stream) => {
13-
stream.session.goaway(0, 0, data);
14+
session = stream.session;
15+
session.on('close', common.mustCall());
16+
session.goaway(0, 0, data);
1417
stream.respond();
1518
stream.end();
1619
}));
20+
server.on('close', common.mustCall());
1721

1822
server.listen(0, () => {
19-
2023
const client = http2.connect(`http://localhost:${server.address().port}`);
2124
client.once('goaway', common.mustCall((code, lastStreamID, buf) => {
2225
assert.deepStrictEqual(code, 0);
2326
assert.deepStrictEqual(lastStreamID, 1);
2427
assert.deepStrictEqual(data, buf);
28+
session.close();
2529
server.close();
2630
}));
2731
const req = client.request();

test/parallel/test-http2-ping-settings-heapdump.js

+7-2
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,12 @@ for (const variant of ['ping', 'settings']) {
3030
}));
3131

3232
server.listen(0, common.mustCall(() => {
33-
http2.connect(`http://localhost:${server.address().port}`,
34-
common.mustCall());
33+
const client = http2.connect(`http://localhost:${server.address().port}`,
34+
common.mustCall());
35+
client.on('error', (err) => {
36+
// We destroy the session so it's possible to get ECONNRESET here.
37+
if (err.code !== 'ECONNRESET')
38+
throw err;
39+
});
3540
}));
3641
}

test/parallel/test-http2-server-push-stream.js

+2
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ server.listen(0, common.mustCall(() => {
5555
assert.strictEqual(headers['x-push-data'], 'pushed by server');
5656
}));
5757
stream.on('aborted', common.mustNotCall());
58+
// We have to read the data of the push stream to end gracefully.
59+
stream.resume();
5860
}));
5961

6062
let data = '';

0 commit comments

Comments
 (0)