@@ -292,20 +292,25 @@ function onStreamClose(code) {
292
292
tryClose ( stream [ kState ] . fd ) ;
293
293
294
294
// Defer destroy we actually emit end.
295
- if ( stream . _readableState . endEmitted || code !== NGHTTP2_NO_ERROR ) {
295
+ if ( ! stream . readable || code !== NGHTTP2_NO_ERROR ) {
296
296
// If errored or ended, we can destroy immediately.
297
- stream [ kMaybeDestroy ] ( null , code ) ;
297
+ stream [ kMaybeDestroy ] ( code ) ;
298
298
} else {
299
299
// Wait for end to destroy.
300
300
stream . on ( 'end' , stream [ kMaybeDestroy ] ) ;
301
301
// Push a null so the stream can end whenever the client consumes
302
302
// it completely.
303
303
stream . push ( null ) ;
304
- // If the client hasn't tried to consume the stream and there is no
305
- // resume scheduled (which would indicate they would consume in the future),
306
- // then just dump the incoming data so that the stream can be destroyed.
307
- if ( ! stream [ kState ] . didRead && ! stream . _readableState . resumeScheduled )
304
+
305
+ // If the user hasn't tried to consume the stream (and this is a server
306
+ // session) then just dump the incoming data so that the stream can
307
+ // be destroyed.
308
+ if ( stream [ kSession ] [ kType ] === NGHTTP2_SESSION_SERVER &&
309
+ ! stream [ kState ] . didRead &&
310
+ stream . _readableState . flowing === null )
308
311
stream . resume ( ) ;
312
+ else
313
+ stream . read ( 0 ) ;
309
314
}
310
315
}
311
316
@@ -330,7 +335,7 @@ function onStreamRead(nread, buf) {
330
335
`${ sessionName ( stream [ kSession ] [ kType ] ) } ]: ending readable.` ) ;
331
336
332
337
// defer this until we actually emit end
333
- if ( stream . _readableState . endEmitted ) {
338
+ if ( ! stream . readable ) {
334
339
stream [ kMaybeDestroy ] ( ) ;
335
340
} else {
336
341
stream . on ( 'end' , stream [ kMaybeDestroy ] ) ;
@@ -421,7 +426,7 @@ function onGoawayData(code, lastStreamID, buf) {
421
426
// condition on this side of the session that caused the
422
427
// shutdown.
423
428
session . destroy ( new errors . Error ( 'ERR_HTTP2_SESSION_ERROR' , code ) ,
424
- { errorCode : NGHTTP2_NO_ERROR } ) ;
429
+ NGHTTP2_NO_ERROR ) ;
425
430
}
426
431
}
427
432
@@ -772,6 +777,21 @@ function emitClose(self, error) {
772
777
self . emit ( 'close' ) ;
773
778
}
774
779
780
+ function finishSessionDestroy ( session , error ) {
781
+ const socket = session [ kSocket ] ;
782
+ if ( ! socket . destroyed )
783
+ socket . destroy ( error ) ;
784
+
785
+ session [ kProxySocket ] = undefined ;
786
+ session [ kSocket ] = undefined ;
787
+ session [ kHandle ] = undefined ;
788
+ socket [ kSession ] = undefined ;
789
+ socket [ kServer ] = undefined ;
790
+
791
+ // Finally, emit the close and error events (if necessary) on next tick.
792
+ process . nextTick ( emitClose , session , error ) ;
793
+ }
794
+
775
795
// Upon creation, the Http2Session takes ownership of the socket. The session
776
796
// may not be ready to use immediately if the socket is not yet fully connected.
777
797
// In that case, the Http2Session will wait for the socket to connect. Once
@@ -828,6 +848,8 @@ class Http2Session extends EventEmitter {
828
848
829
849
this [ kState ] = {
830
850
flags : SESSION_FLAGS_PENDING ,
851
+ goawayCode : null ,
852
+ goawayLastStreamID : null ,
831
853
streams : new Map ( ) ,
832
854
pendingStreams : new Set ( ) ,
833
855
pendingAck : 0 ,
@@ -1130,25 +1152,13 @@ class Http2Session extends EventEmitter {
1130
1152
if ( handle !== undefined )
1131
1153
handle . destroy ( code , socket . destroyed ) ;
1132
1154
1133
- // If there is no error , use setImmediate to destroy the socket on the
1155
+ // If the socket is alive , use setImmediate to destroy the session on the
1134
1156
// next iteration of the event loop in order to give data time to transmit.
1135
1157
// Otherwise, destroy immediately.
1136
- if ( ! socket . destroyed ) {
1137
- if ( ! error ) {
1138
- setImmediate ( socket . destroy . bind ( socket ) ) ;
1139
- } else {
1140
- socket . destroy ( error ) ;
1141
- }
1142
- }
1143
-
1144
- this [ kProxySocket ] = undefined ;
1145
- this [ kSocket ] = undefined ;
1146
- this [ kHandle ] = undefined ;
1147
- socket [ kSession ] = undefined ;
1148
- socket [ kServer ] = undefined ;
1149
-
1150
- // Finally, emit the close and error events (if necessary) on next tick.
1151
- process . nextTick ( emitClose , this , error ) ;
1158
+ if ( ! socket . destroyed )
1159
+ setImmediate ( finishSessionDestroy , this , error ) ;
1160
+ else
1161
+ finishSessionDestroy ( this , error ) ;
1152
1162
}
1153
1163
1154
1164
// Closing the session will:
@@ -1422,11 +1432,8 @@ function afterDoStreamWrite(status, handle, req) {
1422
1432
}
1423
1433
1424
1434
function streamOnResume ( ) {
1425
- if ( ! this . destroyed && ! this . pending ) {
1426
- if ( ! this [ kState ] . didRead )
1427
- this [ kState ] . didRead = true ;
1435
+ if ( ! this . destroyed )
1428
1436
this [ kHandle ] . readStart ( ) ;
1429
- }
1430
1437
}
1431
1438
1432
1439
function streamOnPause ( ) {
@@ -1441,6 +1448,16 @@ function afterShutdown() {
1441
1448
stream [ kMaybeDestroy ] ( ) ;
1442
1449
}
1443
1450
1451
+ function finishSendTrailers ( stream , headersList ) {
1452
+ stream [ kState ] . flags &= ~ STREAM_FLAGS_HAS_TRAILERS ;
1453
+
1454
+ const ret = stream [ kHandle ] . trailers ( headersList ) ;
1455
+ if ( ret < 0 )
1456
+ stream . destroy ( new NghttpError ( ret ) ) ;
1457
+ else
1458
+ stream [ kMaybeDestroy ] ( ) ;
1459
+ }
1460
+
1444
1461
function closeStream ( stream , code , shouldSubmitRstStream = true ) {
1445
1462
const state = stream [ kState ] ;
1446
1463
state . flags |= STREAM_FLAGS_CLOSED ;
@@ -1502,6 +1519,10 @@ class Http2Stream extends Duplex {
1502
1519
this [ kSession ] = session ;
1503
1520
session [ kState ] . pendingStreams . add ( this ) ;
1504
1521
1522
+ // Allow our logic for determining whether any reads have happened to
1523
+ // work in all situations. This is similar to what we do in _http_incoming.
1524
+ this . _readableState . readingMore = true ;
1525
+
1505
1526
this [ kState ] = {
1506
1527
didRead : false ,
1507
1528
flags : STREAM_FLAGS_PENDING ,
@@ -1510,7 +1531,6 @@ class Http2Stream extends Duplex {
1510
1531
trailersReady : false
1511
1532
} ;
1512
1533
1513
- this . on ( 'resume' , streamOnResume ) ;
1514
1534
this . on ( 'pause' , streamOnPause ) ;
1515
1535
}
1516
1536
@@ -1717,6 +1737,10 @@ class Http2Stream extends Duplex {
1717
1737
this . push ( null ) ;
1718
1738
return ;
1719
1739
}
1740
+ if ( ! this [ kState ] . didRead ) {
1741
+ this . _readableState . readingMore = false ;
1742
+ this [ kState ] . didRead = true ;
1743
+ }
1720
1744
if ( ! this . pending ) {
1721
1745
streamOnResume . call ( this ) ;
1722
1746
} else {
@@ -1765,13 +1789,8 @@ class Http2Stream extends Duplex {
1765
1789
throw headersList ;
1766
1790
this [ kSentTrailers ] = headers ;
1767
1791
1768
- this [ kState ] . flags &= ~ STREAM_FLAGS_HAS_TRAILERS ;
1769
-
1770
- const ret = this [ kHandle ] . trailers ( headersList ) ;
1771
- if ( ret < 0 )
1772
- this . destroy ( new NghttpError ( ret ) ) ;
1773
- else
1774
- this [ kMaybeDestroy ] ( ) ;
1792
+ // Send the trailers in setImmediate so we don't do it on nghttp2 stack.
1793
+ setImmediate ( finishSendTrailers , this , headersList ) ;
1775
1794
}
1776
1795
1777
1796
get closed ( ) {
@@ -1861,15 +1880,15 @@ class Http2Stream extends Duplex {
1861
1880
}
1862
1881
// The Http2Stream can be destroyed if it has closed and if the readable
1863
1882
// side has received the final chunk.
1864
- [ kMaybeDestroy ] ( error , code = NGHTTP2_NO_ERROR ) {
1865
- if ( error || code !== NGHTTP2_NO_ERROR ) {
1866
- this . destroy ( error ) ;
1883
+ [ kMaybeDestroy ] ( code = NGHTTP2_NO_ERROR ) {
1884
+ if ( code !== NGHTTP2_NO_ERROR ) {
1885
+ this . destroy ( ) ;
1867
1886
return ;
1868
1887
}
1869
1888
1870
1889
// TODO(mcollina): remove usage of _*State properties
1871
- if ( this . _writableState . ended && this . _writableState . pendingcb === 0 ) {
1872
- if ( this . _readableState . ended && this . closed ) {
1890
+ if ( ! this . writable ) {
1891
+ if ( ! this . readable && this . closed ) {
1873
1892
this . destroy ( ) ;
1874
1893
return ;
1875
1894
}
@@ -1882,7 +1901,7 @@ class Http2Stream extends Duplex {
1882
1901
this [ kSession ] [ kType ] === NGHTTP2_SESSION_SERVER &&
1883
1902
! ( state . flags & STREAM_FLAGS_HAS_TRAILERS ) &&
1884
1903
! state . didRead &&
1885
- ! this . _readableState . resumeScheduled ) {
1904
+ this . _readableState . flowing === null ) {
1886
1905
this . close ( ) ;
1887
1906
}
1888
1907
}
@@ -2445,6 +2464,10 @@ Object.defineProperty(Http2Session.prototype, 'setTimeout', setTimeout);
2445
2464
function socketOnError ( error ) {
2446
2465
const session = this [ kSession ] ;
2447
2466
if ( session !== undefined ) {
2467
+ // We can ignore ECONNRESET after GOAWAY was received as there's nothing
2468
+ // we can do and the other side is fully within its rights to do so.
2469
+ if ( error . code === 'ECONNRESET' && session [ kState ] . goawayCode !== null )
2470
+ return session . destroy ( ) ;
2448
2471
debug ( `Http2Session ${ sessionName ( session [ kType ] ) } : socket error [` +
2449
2472
`${ error . message } ]` ) ;
2450
2473
session . destroy ( error ) ;
0 commit comments