@@ -341,20 +341,25 @@ function onStreamClose(code) {
341
341
342
342
stream [ kState ] . fd = - 1 ;
343
343
// Defer destroy we actually emit end.
344
- if ( stream . _readableState . endEmitted || code !== NGHTTP2_NO_ERROR ) {
344
+ if ( ! stream . readable || code !== NGHTTP2_NO_ERROR ) {
345
345
// If errored or ended, we can destroy immediately.
346
- stream [ kMaybeDestroy ] ( null , code ) ;
346
+ stream [ kMaybeDestroy ] ( code ) ;
347
347
} else {
348
348
// Wait for end to destroy.
349
349
stream . on ( 'end' , stream [ kMaybeDestroy ] ) ;
350
350
// Push a null so the stream can end whenever the client consumes
351
351
// it completely.
352
352
stream . push ( null ) ;
353
- // If the client hasn't tried to consume the stream and there is no
354
- // resume scheduled (which would indicate they would consume in the future),
355
- // then just dump the incoming data so that the stream can be destroyed.
356
- if ( ! stream [ kState ] . didRead && ! stream . _readableState . resumeScheduled )
353
+
354
+ // If the user hasn't tried to consume the stream (and this is a server
355
+ // session) then just dump the incoming data so that the stream can
356
+ // be destroyed.
357
+ if ( stream [ kSession ] [ kType ] === NGHTTP2_SESSION_SERVER &&
358
+ ! stream [ kState ] . didRead &&
359
+ stream . readableFlowing === null )
357
360
stream . resume ( ) ;
361
+ else
362
+ stream . read ( 0 ) ;
358
363
}
359
364
}
360
365
@@ -379,7 +384,7 @@ function onStreamRead(nread, buf) {
379
384
`${ sessionName ( stream [ kSession ] [ kType ] ) } ]: ending readable.` ) ;
380
385
381
386
// defer this until we actually emit end
382
- if ( stream . _readableState . endEmitted ) {
387
+ if ( ! stream . readable ) {
383
388
stream [ kMaybeDestroy ] ( ) ;
384
389
} else {
385
390
stream . on ( 'end' , stream [ kMaybeDestroy ] ) ;
@@ -469,8 +474,7 @@ function onGoawayData(code, lastStreamID, buf) {
469
474
// goaway using NGHTTP2_NO_ERROR because there was no error
470
475
// condition on this side of the session that caused the
471
476
// shutdown.
472
- session . destroy ( new ERR_HTTP2_SESSION_ERROR ( code ) ,
473
- { errorCode : NGHTTP2_NO_ERROR } ) ;
477
+ session . destroy ( new ERR_HTTP2_SESSION_ERROR ( code ) , NGHTTP2_NO_ERROR ) ;
474
478
}
475
479
}
476
480
@@ -813,6 +817,21 @@ function emitClose(self, error) {
813
817
self . emit ( 'close' ) ;
814
818
}
815
819
820
+ function finishSessionDestroy ( session , error ) {
821
+ const socket = session [ kSocket ] ;
822
+ if ( ! socket . destroyed )
823
+ socket . destroy ( error ) ;
824
+
825
+ session [ kProxySocket ] = undefined ;
826
+ session [ kSocket ] = undefined ;
827
+ session [ kHandle ] = undefined ;
828
+ socket [ kSession ] = undefined ;
829
+ socket [ kServer ] = undefined ;
830
+
831
+ // Finally, emit the close and error events (if necessary) on next tick.
832
+ process . nextTick ( emitClose , session , error ) ;
833
+ }
834
+
816
835
// Upon creation, the Http2Session takes ownership of the socket. The session
817
836
// may not be ready to use immediately if the socket is not yet fully connected.
818
837
// In that case, the Http2Session will wait for the socket to connect. Once
@@ -869,6 +888,8 @@ class Http2Session extends EventEmitter {
869
888
870
889
this [ kState ] = {
871
890
flags : SESSION_FLAGS_PENDING ,
891
+ goawayCode : null ,
892
+ goawayLastStreamID : null ,
872
893
streams : new Map ( ) ,
873
894
pendingStreams : new Set ( ) ,
874
895
pendingAck : 0 ,
@@ -1171,25 +1192,13 @@ class Http2Session extends EventEmitter {
1171
1192
if ( handle !== undefined )
1172
1193
handle . destroy ( code , socket . destroyed ) ;
1173
1194
1174
- // If there is no error , use setImmediate to destroy the socket on the
1195
+ // If the socket is alive , use setImmediate to destroy the session on the
1175
1196
// next iteration of the event loop in order to give data time to transmit.
1176
1197
// Otherwise, destroy immediately.
1177
- if ( ! socket . destroyed ) {
1178
- if ( ! error ) {
1179
- setImmediate ( socket . destroy . bind ( socket ) ) ;
1180
- } else {
1181
- socket . destroy ( error ) ;
1182
- }
1183
- }
1184
-
1185
- this [ kProxySocket ] = undefined ;
1186
- this [ kSocket ] = undefined ;
1187
- this [ kHandle ] = undefined ;
1188
- socket [ kSession ] = undefined ;
1189
- socket [ kServer ] = undefined ;
1190
-
1191
- // Finally, emit the close and error events (if necessary) on next tick.
1192
- process . nextTick ( emitClose , this , error ) ;
1198
+ if ( ! socket . destroyed )
1199
+ setImmediate ( finishSessionDestroy , this , error ) ;
1200
+ else
1201
+ finishSessionDestroy ( this , error ) ;
1193
1202
}
1194
1203
1195
1204
// Closing the session will:
@@ -1441,11 +1450,8 @@ function afterDoStreamWrite(status, handle) {
1441
1450
}
1442
1451
1443
1452
function streamOnResume ( ) {
1444
- if ( ! this . destroyed && ! this . pending ) {
1445
- if ( ! this [ kState ] . didRead )
1446
- this [ kState ] . didRead = true ;
1453
+ if ( ! this . destroyed )
1447
1454
this [ kHandle ] . readStart ( ) ;
1448
- }
1449
1455
}
1450
1456
1451
1457
function streamOnPause ( ) {
@@ -1460,6 +1466,16 @@ function afterShutdown() {
1460
1466
stream [ kMaybeDestroy ] ( ) ;
1461
1467
}
1462
1468
1469
+ function finishSendTrailers ( stream , headersList ) {
1470
+ stream [ kState ] . flags &= ~ STREAM_FLAGS_HAS_TRAILERS ;
1471
+
1472
+ const ret = stream [ kHandle ] . trailers ( headersList ) ;
1473
+ if ( ret < 0 )
1474
+ stream . destroy ( new NghttpError ( ret ) ) ;
1475
+ else
1476
+ stream [ kMaybeDestroy ] ( ) ;
1477
+ }
1478
+
1463
1479
function closeStream ( stream , code , shouldSubmitRstStream = true ) {
1464
1480
const state = stream [ kState ] ;
1465
1481
state . flags |= STREAM_FLAGS_CLOSED ;
@@ -1521,6 +1537,10 @@ class Http2Stream extends Duplex {
1521
1537
this [ kSession ] = session ;
1522
1538
session [ kState ] . pendingStreams . add ( this ) ;
1523
1539
1540
+ // Allow our logic for determining whether any reads have happened to
1541
+ // work in all situations. This is similar to what we do in _http_incoming.
1542
+ this . _readableState . readingMore = true ;
1543
+
1524
1544
this [ kTimeout ] = null ;
1525
1545
1526
1546
this [ kState ] = {
@@ -1531,7 +1551,6 @@ class Http2Stream extends Duplex {
1531
1551
trailersReady : false
1532
1552
} ;
1533
1553
1534
- this . on ( 'resume' , streamOnResume ) ;
1535
1554
this . on ( 'pause' , streamOnPause ) ;
1536
1555
}
1537
1556
@@ -1725,6 +1744,10 @@ class Http2Stream extends Duplex {
1725
1744
this . push ( null ) ;
1726
1745
return ;
1727
1746
}
1747
+ if ( ! this [ kState ] . didRead ) {
1748
+ this . _readableState . readingMore = false ;
1749
+ this [ kState ] . didRead = true ;
1750
+ }
1728
1751
if ( ! this . pending ) {
1729
1752
streamOnResume . call ( this ) ;
1730
1753
} else {
@@ -1773,13 +1796,8 @@ class Http2Stream extends Duplex {
1773
1796
throw headersList ;
1774
1797
this [ kSentTrailers ] = headers ;
1775
1798
1776
- this [ kState ] . flags &= ~ STREAM_FLAGS_HAS_TRAILERS ;
1777
-
1778
- const ret = this [ kHandle ] . trailers ( headersList ) ;
1779
- if ( ret < 0 )
1780
- this . destroy ( new NghttpError ( ret ) ) ;
1781
- else
1782
- this [ kMaybeDestroy ] ( ) ;
1799
+ // Send the trailers in setImmediate so we don't do it on nghttp2 stack.
1800
+ setImmediate ( finishSendTrailers , this , headersList ) ;
1783
1801
}
1784
1802
1785
1803
get closed ( ) {
@@ -1866,15 +1884,15 @@ class Http2Stream extends Duplex {
1866
1884
}
1867
1885
// The Http2Stream can be destroyed if it has closed and if the readable
1868
1886
// side has received the final chunk.
1869
- [ kMaybeDestroy ] ( error , code = NGHTTP2_NO_ERROR ) {
1870
- if ( error || code !== NGHTTP2_NO_ERROR ) {
1871
- this . destroy ( error ) ;
1887
+ [ kMaybeDestroy ] ( code = NGHTTP2_NO_ERROR ) {
1888
+ if ( code !== NGHTTP2_NO_ERROR ) {
1889
+ this . destroy ( ) ;
1872
1890
return ;
1873
1891
}
1874
1892
1875
1893
// TODO(mcollina): remove usage of _*State properties
1876
- if ( this . _writableState . ended && this . _writableState . pendingcb === 0 ) {
1877
- if ( this . _readableState . ended && this . closed ) {
1894
+ if ( ! this . writable ) {
1895
+ if ( ! this . readable && this . closed ) {
1878
1896
this . destroy ( ) ;
1879
1897
return ;
1880
1898
}
@@ -1887,7 +1905,7 @@ class Http2Stream extends Duplex {
1887
1905
this [ kSession ] [ kType ] === NGHTTP2_SESSION_SERVER &&
1888
1906
! ( state . flags & STREAM_FLAGS_HAS_TRAILERS ) &&
1889
1907
! state . didRead &&
1890
- ! this . _readableState . resumeScheduled ) {
1908
+ this . readableFlowing === null ) {
1891
1909
this . close ( ) ;
1892
1910
}
1893
1911
}
@@ -2477,6 +2495,10 @@ Object.defineProperty(Http2Session.prototype, 'setTimeout', setTimeout);
2477
2495
function socketOnError ( error ) {
2478
2496
const session = this [ kSession ] ;
2479
2497
if ( session !== undefined ) {
2498
+ // We can ignore ECONNRESET after GOAWAY was received as there's nothing
2499
+ // we can do and the other side is fully within its rights to do so.
2500
+ if ( error . code === 'ECONNRESET' && session [ kState ] . goawayCode !== null )
2501
+ return session . destroy ( ) ;
2480
2502
debug ( `Http2Session ${ sessionName ( session [ kType ] ) } : socket error [` +
2481
2503
`${ error . message } ]` ) ;
2482
2504
session . destroy ( error ) ;
0 commit comments