Skip to content

Commit 46bd862

Browse files
apapirovskiMylesBorins
authored andcommitted
http2: fix responses to long payload reqs
When a request with a long payload is received, http2 does not allow a response that does not process all the incoming payload. Add a conditional Http2Stream.close call that runs only if the user hasn't attempted to read the stream. PR-URL: #20084 Fixes: #20060 Reviewed-By: Anna Henningsen <[email protected]> Reviewed-By: James M Snell <[email protected]> Reviewed-By: Matteo Collina <[email protected]>
1 parent 9b30bc4 commit 46bd862

10 files changed

+279
-105
lines changed

lib/internal/http2/core.js

+98-83
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,7 @@ const STREAM_FLAGS_CLOSED = 0x2;
206206
const STREAM_FLAGS_HEADERS_SENT = 0x4;
207207
const STREAM_FLAGS_HEAD_REQUEST = 0x8;
208208
const STREAM_FLAGS_ABORTED = 0x10;
209+
const STREAM_FLAGS_HAS_TRAILERS = 0x20;
209210

210211
const SESSION_FLAGS_PENDING = 0x0;
211212
const SESSION_FLAGS_READY = 0x1;
@@ -330,26 +331,13 @@ function onStreamClose(code) {
330331
if (stream.destroyed)
331332
return;
332333

333-
const state = stream[kState];
334-
335334
debug(`Http2Stream ${stream[kID]} [Http2Session ` +
336335
`${sessionName(stream[kSession][kType])}]: closed with code ${code}`);
337336

338-
if (!stream.closed) {
339-
// Clear timeout and remove timeout listeners
340-
stream.setTimeout(0);
341-
stream.removeAllListeners('timeout');
337+
if (!stream.closed)
338+
closeStream(stream, code, false);
342339

343-
// Set the state flags
344-
state.flags |= STREAM_FLAGS_CLOSED;
345-
state.rstCode = code;
346-
347-
// Close the writable side of the stream
348-
abort(stream);
349-
stream.end();
350-
}
351-
352-
state.fd = -1;
340+
stream[kState].fd = -1;
353341
// Defer destroy we actually emit end.
354342
if (stream._readableState.endEmitted || code !== NGHTTP2_NO_ERROR) {
355343
// If errored or ended, we can destroy immediately.
@@ -504,7 +492,7 @@ function requestOnConnect(headers, options) {
504492

505493
// At this point, the stream should have already been destroyed during
506494
// the session.destroy() method. Do nothing else.
507-
if (session.destroyed)
495+
if (session === undefined || session.destroyed)
508496
return;
509497

510498
// If the session was closed while waiting for the connect, destroy
@@ -1412,6 +1400,9 @@ class ClientHttp2Session extends Http2Session {
14121400
if (options.endStream)
14131401
stream.end();
14141402

1403+
if (options.waitForTrailers)
1404+
stream[kState].flags |= STREAM_FLAGS_HAS_TRAILERS;
1405+
14151406
const onConnect = requestOnConnect.bind(stream, headersList, options);
14161407
if (this.connecting) {
14171408
this.on('connect', onConnect);
@@ -1445,32 +1436,70 @@ function afterDoStreamWrite(status, handle) {
14451436
}
14461437

14471438
function streamOnResume() {
1448-
if (!this.destroyed && !this.pending)
1439+
if (!this.destroyed && !this.pending) {
1440+
if (!this[kState].didRead)
1441+
this[kState].didRead = true;
14491442
this[kHandle].readStart();
1443+
}
14501444
}
14511445

14521446
function streamOnPause() {
14531447
if (!this.destroyed && !this.pending)
14541448
this[kHandle].readStop();
14551449
}
14561450

1457-
// If the writable side of the Http2Stream is still open, emit the
1458-
// 'aborted' event and set the aborted flag.
1459-
function abort(stream) {
1460-
if (!stream.aborted &&
1461-
!(stream._writableState.ended || stream._writableState.ending)) {
1462-
stream[kState].flags |= STREAM_FLAGS_ABORTED;
1463-
stream.emit('aborted');
1464-
}
1465-
}
1466-
14671451
function afterShutdown() {
14681452
this.callback();
14691453
const stream = this.handle[kOwner];
14701454
if (stream)
14711455
stream[kMaybeDestroy]();
14721456
}
14731457

1458+
function closeStream(stream, code, shouldSubmitRstStream = true) {
1459+
const state = stream[kState];
1460+
state.flags |= STREAM_FLAGS_CLOSED;
1461+
state.rstCode = code;
1462+
1463+
// Clear timeout and remove timeout listeners
1464+
stream.setTimeout(0);
1465+
stream.removeAllListeners('timeout');
1466+
1467+
const { ending, finished } = stream._writableState;
1468+
1469+
if (!ending) {
1470+
// If the writable side of the Http2Stream is still open, emit the
1471+
// 'aborted' event and set the aborted flag.
1472+
if (!stream.aborted) {
1473+
state.flags |= STREAM_FLAGS_ABORTED;
1474+
stream.emit('aborted');
1475+
}
1476+
1477+
// Close the writable side.
1478+
stream.end();
1479+
}
1480+
1481+
if (shouldSubmitRstStream) {
1482+
const finishFn = finishCloseStream.bind(stream, code);
1483+
if (!ending || finished || code !== NGHTTP2_NO_ERROR)
1484+
finishFn();
1485+
else
1486+
stream.once('finish', finishFn);
1487+
}
1488+
}
1489+
1490+
function finishCloseStream(code) {
1491+
const rstStreamFn = submitRstStream.bind(this, code);
1492+
// If the handle has not yet been assigned, queue up the request to
1493+
// ensure that the RST_STREAM frame is sent after the stream ID has
1494+
// been determined.
1495+
if (this.pending) {
1496+
this.push(null);
1497+
this.once('ready', rstStreamFn);
1498+
return;
1499+
}
1500+
rstStreamFn();
1501+
}
1502+
14741503
// An Http2Stream is a Duplex stream that is backed by a
14751504
// node::http2::Http2Stream handle implementing StreamBase.
14761505
class Http2Stream extends Duplex {
@@ -1490,6 +1519,7 @@ class Http2Stream extends Duplex {
14901519
this[kTimeout] = null;
14911520

14921521
this[kState] = {
1522+
didRead: false,
14931523
flags: STREAM_FLAGS_PENDING,
14941524
rstCode: NGHTTP2_NO_ERROR,
14951525
writeQueueSize: 0,
@@ -1756,6 +1786,8 @@ class Http2Stream extends Duplex {
17561786
throw headersList;
17571787
this[kSentTrailers] = headers;
17581788

1789+
this[kState].flags &= ~STREAM_FLAGS_HAS_TRAILERS;
1790+
17591791
const ret = this[kHandle].trailers(headersList);
17601792
if (ret < 0)
17611793
this.destroy(new NghttpError(ret));
@@ -1786,38 +1818,13 @@ class Http2Stream extends Duplex {
17861818
if (callback !== undefined && typeof callback !== 'function')
17871819
throw new ERR_INVALID_CALLBACK();
17881820

1789-
// Clear timeout and remove timeout listeners
1790-
this.setTimeout(0);
1791-
this.removeAllListeners('timeout');
1792-
1793-
// Close the writable
1794-
abort(this);
1795-
this.end();
1796-
17971821
if (this.closed)
17981822
return;
17991823

1800-
const state = this[kState];
1801-
state.flags |= STREAM_FLAGS_CLOSED;
1802-
state.rstCode = code;
1803-
1804-
if (callback !== undefined) {
1824+
if (callback !== undefined)
18051825
this.once('close', callback);
1806-
}
1807-
1808-
if (this[kHandle] === undefined)
1809-
return;
18101826

1811-
const rstStreamFn = submitRstStream.bind(this, code);
1812-
// If the handle has not yet been assigned, queue up the request to
1813-
// ensure that the RST_STREAM frame is sent after the stream ID has
1814-
// been determined.
1815-
if (this.pending) {
1816-
this.push(null);
1817-
this.once('ready', rstStreamFn);
1818-
return;
1819-
}
1820-
rstStreamFn();
1827+
closeStream(this, code);
18211828
}
18221829

18231830
// Called by this.destroy().
@@ -1832,26 +1839,19 @@ class Http2Stream extends Duplex {
18321839
debug(`Http2Stream ${this[kID] || '<pending>'} [Http2Session ` +
18331840
`${sessionName(session[kType])}]: destroying stream`);
18341841
const state = this[kState];
1835-
const code = state.rstCode =
1836-
err != null ?
1837-
NGHTTP2_INTERNAL_ERROR :
1838-
state.rstCode || NGHTTP2_NO_ERROR;
1839-
if (handle !== undefined) {
1840-
// If the handle exists, we need to close, then destroy the handle
1841-
this.close(code);
1842-
if (!this._readableState.ended && !this._readableState.ending)
1843-
this.push(null);
1842+
const code = err != null ?
1843+
NGHTTP2_INTERNAL_ERROR : (state.rstCode || NGHTTP2_NO_ERROR);
1844+
1845+
const hasHandle = handle !== undefined;
1846+
1847+
if (!this.closed)
1848+
closeStream(this, code, hasHandle);
1849+
this.push(null);
1850+
1851+
if (hasHandle) {
18441852
handle.destroy();
18451853
session[kState].streams.delete(id);
18461854
} else {
1847-
// Clear timeout and remove timeout listeners
1848-
this.setTimeout(0);
1849-
this.removeAllListeners('timeout');
1850-
1851-
state.flags |= STREAM_FLAGS_CLOSED;
1852-
abort(this);
1853-
this.end();
1854-
this.push(null);
18551855
session[kState].pendingStreams.delete(this);
18561856
}
18571857

@@ -1884,13 +1884,23 @@ class Http2Stream extends Duplex {
18841884
}
18851885

18861886
// TODO(mcollina): remove usage of _*State properties
1887-
if (this._readableState.ended &&
1888-
this._writableState.ended &&
1889-
this._writableState.pendingcb === 0 &&
1890-
this.closed) {
1891-
this.destroy();
1892-
// This should return, but eslint complains.
1893-
// return
1887+
if (this._writableState.ended && this._writableState.pendingcb === 0) {
1888+
if (this._readableState.ended && this.closed) {
1889+
this.destroy();
1890+
return;
1891+
}
1892+
1893+
// We've submitted a response from our server session, have not attempted
1894+
// to process any incoming data, and have no trailers. This means we can
1895+
// attempt to gracefully close the session.
1896+
const state = this[kState];
1897+
if (this.headersSent &&
1898+
this[kSession][kType] === NGHTTP2_SESSION_SERVER &&
1899+
!(state.flags & STREAM_FLAGS_HAS_TRAILERS) &&
1900+
!state.didRead &&
1901+
!this._readableState.resumeScheduled) {
1902+
this.close();
1903+
}
18941904
}
18951905
}
18961906
}
@@ -2095,7 +2105,6 @@ function afterOpen(session, options, headers, streamOptions, err, fd) {
20952105
}
20962106
if (this.destroyed || this.closed) {
20972107
tryClose(fd);
2098-
abort(this);
20992108
return;
21002109
}
21012110
state.fd = fd;
@@ -2224,8 +2233,10 @@ class ServerHttp2Stream extends Http2Stream {
22242233
if (options.endStream)
22252234
streamOptions |= STREAM_OPTION_EMPTY_PAYLOAD;
22262235

2227-
if (options.waitForTrailers)
2236+
if (options.waitForTrailers) {
22282237
streamOptions |= STREAM_OPTION_GET_TRAILERS;
2238+
state.flags |= STREAM_FLAGS_HAS_TRAILERS;
2239+
}
22292240

22302241
headers = processHeaders(headers);
22312242
const statusCode = headers[HTTP2_HEADER_STATUS] |= 0;
@@ -2285,8 +2296,10 @@ class ServerHttp2Stream extends Http2Stream {
22852296
}
22862297

22872298
let streamOptions = 0;
2288-
if (options.waitForTrailers)
2299+
if (options.waitForTrailers) {
22892300
streamOptions |= STREAM_OPTION_GET_TRAILERS;
2301+
this[kState].flags |= STREAM_FLAGS_HAS_TRAILERS;
2302+
}
22902303

22912304
if (typeof fd !== 'number')
22922305
throw new ERR_INVALID_ARG_TYPE('fd', 'number', fd);
@@ -2346,8 +2359,10 @@ class ServerHttp2Stream extends Http2Stream {
23462359
}
23472360

23482361
let streamOptions = 0;
2349-
if (options.waitForTrailers)
2362+
if (options.waitForTrailers) {
23502363
streamOptions |= STREAM_OPTION_GET_TRAILERS;
2364+
this[kState].flags |= STREAM_FLAGS_HAS_TRAILERS;
2365+
}
23512366

23522367
const session = this[kSession];
23532368
debug(`Http2Stream ${this[kID]} [Http2Session ` +

0 commit comments

Comments
 (0)