@@ -154,6 +154,7 @@ const STREAM_FLAGS_CLOSED = 0x2;
154
154
const STREAM_FLAGS_HEADERS_SENT = 0x4 ;
155
155
const STREAM_FLAGS_HEAD_REQUEST = 0x8 ;
156
156
const STREAM_FLAGS_ABORTED = 0x10 ;
157
+ const STREAM_FLAGS_HAS_TRAILERS = 0x20 ;
157
158
158
159
const SESSION_FLAGS_PENDING = 0x0 ;
159
160
const SESSION_FLAGS_READY = 0x1 ;
@@ -278,27 +279,14 @@ function onStreamClose(code) {
278
279
if ( stream . destroyed )
279
280
return ;
280
281
281
- const state = stream [ kState ] ;
282
-
283
282
debug ( `Http2Stream ${ stream [ kID ] } [Http2Session ` +
284
283
`${ sessionName ( stream [ kSession ] [ kType ] ) } ]: closed with code ${ code } ` ) ;
285
284
286
- if ( ! stream . closed ) {
287
- // Unenroll from timeouts
288
- unenroll ( stream ) ;
289
- stream . removeAllListeners ( 'timeout' ) ;
290
-
291
- // Set the state flags
292
- state . flags |= STREAM_FLAGS_CLOSED ;
293
- state . rstCode = code ;
294
-
295
- // Close the writable side of the stream
296
- abort ( stream ) ;
297
- stream . end ( ) ;
298
- }
285
+ if ( ! stream . closed )
286
+ closeStream ( stream , code , false ) ;
299
287
300
- if ( state . fd !== undefined )
301
- tryClose ( state . fd ) ;
288
+ if ( stream [ kState ] . fd !== undefined )
289
+ tryClose ( stream [ kState ] . fd ) ;
302
290
303
291
// Defer destroy we actually emit end.
304
292
if ( stream . _readableState . endEmitted || code !== NGHTTP2_NO_ERROR ) {
@@ -454,7 +442,7 @@ function requestOnConnect(headers, options) {
454
442
455
443
// At this point, the stream should have already been destroyed during
456
444
// the session.destroy() method. Do nothing else.
457
- if ( session . destroyed )
445
+ if ( session === undefined || session . destroyed )
458
446
return ;
459
447
460
448
// If the session was closed while waiting for the connect, destroy
@@ -1369,6 +1357,9 @@ class ClientHttp2Session extends Http2Session {
1369
1357
if ( options . endStream )
1370
1358
stream . end ( ) ;
1371
1359
1360
+ if ( options . waitForTrailers )
1361
+ stream [ kState ] . flags |= STREAM_FLAGS_HAS_TRAILERS ;
1362
+
1372
1363
const onConnect = requestOnConnect . bind ( stream , headersList , options ) ;
1373
1364
if ( this . connecting ) {
1374
1365
this . on ( 'connect' , onConnect ) ;
@@ -1425,32 +1416,70 @@ function afterDoStreamWrite(status, handle, req) {
1425
1416
}
1426
1417
1427
1418
function streamOnResume ( ) {
1428
- if ( ! this . destroyed && ! this . pending )
1419
+ if ( ! this . destroyed && ! this . pending ) {
1420
+ if ( ! this [ kState ] . didRead )
1421
+ this [ kState ] . didRead = true ;
1429
1422
this [ kHandle ] . readStart ( ) ;
1423
+ }
1430
1424
}
1431
1425
1432
1426
function streamOnPause ( ) {
1433
1427
if ( ! this . destroyed && ! this . pending )
1434
1428
this [ kHandle ] . readStop ( ) ;
1435
1429
}
1436
1430
1437
- // If the writable side of the Http2Stream is still open, emit the
1438
- // 'aborted' event and set the aborted flag.
1439
- function abort ( stream ) {
1440
- if ( ! stream . aborted &&
1441
- ! ( stream . _writableState . ended || stream . _writableState . ending ) ) {
1442
- stream [ kState ] . flags |= STREAM_FLAGS_ABORTED ;
1443
- stream . emit ( 'aborted' ) ;
1444
- }
1445
- }
1446
-
1447
1431
function afterShutdown ( ) {
1448
1432
this . callback ( ) ;
1449
1433
const stream = this . handle [ kOwner ] ;
1450
1434
if ( stream )
1451
1435
stream [ kMaybeDestroy ] ( ) ;
1452
1436
}
1453
1437
1438
+ function closeStream ( stream , code , shouldSubmitRstStream = true ) {
1439
+ const state = stream [ kState ] ;
1440
+ state . flags |= STREAM_FLAGS_CLOSED ;
1441
+ state . rstCode = code ;
1442
+
1443
+ // Clear timeout and remove timeout listeners
1444
+ stream . setTimeout ( 0 ) ;
1445
+ stream . removeAllListeners ( 'timeout' ) ;
1446
+
1447
+ const { ending, finished } = stream . _writableState ;
1448
+
1449
+ if ( ! ending ) {
1450
+ // If the writable side of the Http2Stream is still open, emit the
1451
+ // 'aborted' event and set the aborted flag.
1452
+ if ( ! stream . aborted ) {
1453
+ state . flags |= STREAM_FLAGS_ABORTED ;
1454
+ stream . emit ( 'aborted' ) ;
1455
+ }
1456
+
1457
+ // Close the writable side.
1458
+ stream . end ( ) ;
1459
+ }
1460
+
1461
+ if ( shouldSubmitRstStream ) {
1462
+ const finishFn = finishCloseStream . bind ( stream , code ) ;
1463
+ if ( ! ending || finished || code !== NGHTTP2_NO_ERROR )
1464
+ finishFn ( ) ;
1465
+ else
1466
+ stream . once ( 'finish' , finishFn ) ;
1467
+ }
1468
+ }
1469
+
1470
+ function finishCloseStream ( code ) {
1471
+ const rstStreamFn = submitRstStream . bind ( this , code ) ;
1472
+ // If the handle has not yet been assigned, queue up the request to
1473
+ // ensure that the RST_STREAM frame is sent after the stream ID has
1474
+ // been determined.
1475
+ if ( this . pending ) {
1476
+ this . push ( null ) ;
1477
+ this . once ( 'ready' , rstStreamFn ) ;
1478
+ return ;
1479
+ }
1480
+ rstStreamFn ( ) ;
1481
+ }
1482
+
1454
1483
// An Http2Stream is a Duplex stream that is backed by a
1455
1484
// node::http2::Http2Stream handle implementing StreamBase.
1456
1485
class Http2Stream extends Duplex {
@@ -1468,6 +1497,7 @@ class Http2Stream extends Duplex {
1468
1497
session [ kState ] . pendingStreams . add ( this ) ;
1469
1498
1470
1499
this [ kState ] = {
1500
+ didRead : false ,
1471
1501
flags : STREAM_FLAGS_PENDING ,
1472
1502
rstCode : NGHTTP2_NO_ERROR ,
1473
1503
writeQueueSize : 0 ,
@@ -1749,6 +1779,8 @@ class Http2Stream extends Duplex {
1749
1779
throw headersList ;
1750
1780
this [ kSentTrailers ] = headers ;
1751
1781
1782
+ this [ kState ] . flags &= ~ STREAM_FLAGS_HAS_TRAILERS ;
1783
+
1752
1784
const ret = this [ kHandle ] . trailers ( headersList ) ;
1753
1785
if ( ret < 0 )
1754
1786
this . destroy ( new NghttpError ( ret ) ) ;
@@ -1779,38 +1811,13 @@ class Http2Stream extends Duplex {
1779
1811
if ( callback !== undefined && typeof callback !== 'function' )
1780
1812
throw new errors . TypeError ( 'ERR_INVALID_CALLBACK' ) ;
1781
1813
1782
- // Unenroll the timeout.
1783
- unenroll ( this ) ;
1784
- this . removeAllListeners ( 'timeout' ) ;
1785
-
1786
- // Close the writable
1787
- abort ( this ) ;
1788
- this . end ( ) ;
1789
-
1790
1814
if ( this . closed )
1791
1815
return ;
1792
1816
1793
- const state = this [ kState ] ;
1794
- state . flags |= STREAM_FLAGS_CLOSED ;
1795
- state . rstCode = code ;
1796
-
1797
- if ( callback !== undefined ) {
1817
+ if ( callback !== undefined )
1798
1818
this . once ( 'close' , callback ) ;
1799
- }
1800
1819
1801
- if ( this [ kHandle ] === undefined )
1802
- return ;
1803
-
1804
- const rstStreamFn = submitRstStream . bind ( this , code ) ;
1805
- // If the handle has not yet been assigned, queue up the request to
1806
- // ensure that the RST_STREAM frame is sent after the stream ID has
1807
- // been determined.
1808
- if ( this . pending ) {
1809
- this . push ( null ) ;
1810
- this . once ( 'ready' , rstStreamFn ) ;
1811
- return ;
1812
- }
1813
- rstStreamFn ( ) ;
1820
+ closeStream ( this , code ) ;
1814
1821
}
1815
1822
1816
1823
// Called by this.destroy().
@@ -1825,24 +1832,19 @@ class Http2Stream extends Duplex {
1825
1832
debug ( `Http2Stream ${ this [ kID ] || '<pending>' } [Http2Session ` +
1826
1833
`${ sessionName ( session [ kType ] ) } ]: destroying stream` ) ;
1827
1834
const state = this [ kState ] ;
1828
- const code = state . rstCode =
1829
- err != null ?
1830
- NGHTTP2_INTERNAL_ERROR :
1831
- state . rstCode || NGHTTP2_NO_ERROR ;
1832
- if ( handle !== undefined ) {
1833
- // If the handle exists, we need to close, then destroy the handle
1834
- this . close ( code ) ;
1835
- if ( ! this . _readableState . ended && ! this . _readableState . ending )
1836
- this . push ( null ) ;
1835
+ const code = err != null ?
1836
+ NGHTTP2_INTERNAL_ERROR : ( state . rstCode || NGHTTP2_NO_ERROR ) ;
1837
+
1838
+ const hasHandle = handle !== undefined ;
1839
+
1840
+ if ( ! this . closed )
1841
+ closeStream ( this , code , hasHandle ) ;
1842
+ this . push ( null ) ;
1843
+
1844
+ if ( hasHandle ) {
1837
1845
handle . destroy ( ) ;
1838
1846
session [ kState ] . streams . delete ( id ) ;
1839
1847
} else {
1840
- unenroll ( this ) ;
1841
- this . removeAllListeners ( 'timeout' ) ;
1842
- state . flags |= STREAM_FLAGS_CLOSED ;
1843
- abort ( this ) ;
1844
- this . end ( ) ;
1845
- this . push ( null ) ;
1846
1848
session [ kState ] . pendingStreams . delete ( this ) ;
1847
1849
}
1848
1850
@@ -1878,13 +1880,23 @@ class Http2Stream extends Duplex {
1878
1880
}
1879
1881
1880
1882
// TODO(mcollina): remove usage of _*State properties
1881
- if ( this . _readableState . ended &&
1882
- this . _writableState . ended &&
1883
- this . _writableState . pendingcb === 0 &&
1884
- this . closed ) {
1885
- this . destroy ( ) ;
1886
- // This should return, but eslint complains.
1887
- // return
1883
+ if ( this . _writableState . ended && this . _writableState . pendingcb === 0 ) {
1884
+ if ( this . _readableState . ended && this . closed ) {
1885
+ this . destroy ( ) ;
1886
+ return ;
1887
+ }
1888
+
1889
+ // We've submitted a response from our server session, have not attempted
1890
+ // to process any incoming data, and have no trailers. This means we can
1891
+ // attempt to gracefully close the session.
1892
+ const state = this [ kState ] ;
1893
+ if ( this . headersSent &&
1894
+ this [ kSession ] [ kType ] === NGHTTP2_SESSION_SERVER &&
1895
+ ! ( state . flags & STREAM_FLAGS_HAS_TRAILERS ) &&
1896
+ ! state . didRead &&
1897
+ ! this . _readableState . resumeScheduled ) {
1898
+ this . close ( ) ;
1899
+ }
1888
1900
}
1889
1901
}
1890
1902
}
@@ -2045,7 +2057,6 @@ function afterOpen(session, options, headers, streamOptions, err, fd) {
2045
2057
}
2046
2058
if ( this . destroyed || this . closed ) {
2047
2059
tryClose ( fd ) ;
2048
- abort ( this ) ;
2049
2060
return ;
2050
2061
}
2051
2062
state . fd = fd ;
@@ -2181,8 +2192,10 @@ class ServerHttp2Stream extends Http2Stream {
2181
2192
if ( options . endStream )
2182
2193
streamOptions |= STREAM_OPTION_EMPTY_PAYLOAD ;
2183
2194
2184
- if ( options . waitForTrailers )
2195
+ if ( options . waitForTrailers ) {
2185
2196
streamOptions |= STREAM_OPTION_GET_TRAILERS ;
2197
+ state . flags |= STREAM_FLAGS_HAS_TRAILERS ;
2198
+ }
2186
2199
2187
2200
headers = processHeaders ( headers ) ;
2188
2201
const statusCode = headers [ HTTP2_HEADER_STATUS ] |= 0 ;
@@ -2248,8 +2261,10 @@ class ServerHttp2Stream extends Http2Stream {
2248
2261
}
2249
2262
2250
2263
let streamOptions = 0 ;
2251
- if ( options . waitForTrailers )
2264
+ if ( options . waitForTrailers ) {
2252
2265
streamOptions |= STREAM_OPTION_GET_TRAILERS ;
2266
+ this [ kState ] . flags |= STREAM_FLAGS_HAS_TRAILERS ;
2267
+ }
2253
2268
2254
2269
if ( typeof fd !== 'number' )
2255
2270
throw new errors . TypeError ( 'ERR_INVALID_ARG_TYPE' ,
@@ -2315,8 +2330,10 @@ class ServerHttp2Stream extends Http2Stream {
2315
2330
}
2316
2331
2317
2332
let streamOptions = 0 ;
2318
- if ( options . waitForTrailers )
2333
+ if ( options . waitForTrailers ) {
2319
2334
streamOptions |= STREAM_OPTION_GET_TRAILERS ;
2335
+ this [ kState ] . flags |= STREAM_FLAGS_HAS_TRAILERS ;
2336
+ }
2320
2337
2321
2338
const session = this [ kSession ] ;
2322
2339
debug ( `Http2Stream ${ this [ kID ] } [Http2Session ` +
0 commit comments