@@ -869,31 +869,52 @@ ssize_t Http2Session::OnCallbackPadding(size_t frameLen,
869
869
// various callback functions. Each of these will typically result in a call
870
870
// out to JavaScript so this particular function is rather hot and can be
871
871
// quite expensive. This is a potential performance optimization target later.
872
- ssize_t Http2Session::Write (const uv_buf_t * bufs, size_t nbufs) {
873
- size_t total = 0 ;
874
- // Note that nghttp2_session_mem_recv is a synchronous operation that
875
- // will trigger a number of other callbacks. Those will, in turn have
872
+ ssize_t Http2Session::ConsumeHTTP2Data () {
873
+ CHECK_NOT_NULL (stream_buf_.base );
874
+ CHECK_LT (stream_buf_offset_, stream_buf_.len );
875
+ size_t read_len = stream_buf_.len - stream_buf_offset_;
876
+
876
877
// multiple side effects.
877
- for (size_t n = 0 ; n < nbufs; n++) {
878
- Debug (this , " receiving %d bytes [wants data? %d]" ,
879
- bufs[n].len ,
880
- nghttp2_session_want_read (session_));
881
- ssize_t ret =
882
- nghttp2_session_mem_recv (session_,
883
- reinterpret_cast <uint8_t *>(bufs[n].base ),
884
- bufs[n].len );
885
- CHECK_NE (ret, NGHTTP2_ERR_NOMEM);
886
-
887
- if (ret < 0 )
888
- return ret;
878
+ Debug (this , " receiving %d bytes [wants data? %d]" ,
879
+ read_len,
880
+ nghttp2_session_want_read (session_));
881
+ flags_ &= ~SESSION_STATE_NGHTTP2_RECV_PAUSED;
882
+ ssize_t ret =
883
+ nghttp2_session_mem_recv (session_,
884
+ reinterpret_cast <uint8_t *>(stream_buf_.base ) +
885
+ stream_buf_offset_,
886
+ read_len);
887
+ CHECK_NE (ret, NGHTTP2_ERR_NOMEM);
889
888
890
- total += ret;
889
+ if (flags_ & SESSION_STATE_NGHTTP2_RECV_PAUSED) {
890
+ CHECK_NE (flags_ & SESSION_STATE_READING_STOPPED, 0 );
891
+
892
+ CHECK_GT (ret, 0 );
893
+ CHECK_LE (static_cast <size_t >(ret), read_len);
894
+
895
+ if (static_cast <size_t >(ret) < read_len) {
896
+ // Mark the remainder of the data as available for later consumption.
897
+ stream_buf_offset_ += ret;
898
+ return ret;
899
+ }
891
900
}
901
+
902
+ // We are done processing the current input chunk.
903
+ DecrementCurrentSessionMemory (stream_buf_.len );
904
+ stream_buf_offset_ = 0 ;
905
+ stream_buf_ab_.Reset ();
906
+ free (stream_buf_allocation_.base );
907
+ stream_buf_allocation_ = uv_buf_init (nullptr , 0 );
908
+ stream_buf_ = uv_buf_init (nullptr , 0 );
909
+
910
+ if (ret < 0 )
911
+ return ret;
912
+
892
913
// Send any data that was queued up while processing the received data.
893
914
if (!IsDestroyed ()) {
894
915
SendPendingData ();
895
916
}
896
- return total ;
917
+ return ret ;
897
918
}
898
919
899
920
@@ -1196,8 +1217,18 @@ int Http2Session::OnDataChunkReceived(nghttp2_session* handle,
1196
1217
nghttp2_session_consume_stream (handle, id, avail);
1197
1218
else
1198
1219
stream->inbound_consumed_data_while_paused_ += avail;
1220
+
1221
+ // If we have a gathered a lot of data for output, try sending it now.
1222
+ if (session->outgoing_length_ > 4096 ) session->SendPendingData ();
1199
1223
} while (len != 0 );
1200
1224
1225
+ // If we are currently waiting for a write operation to finish, we should
1226
+ // tell nghttp2 that we want to wait before we process more input data.
1227
+ if (session->flags_ & SESSION_STATE_WRITE_IN_PROGRESS) {
1228
+ session->flags_ |= SESSION_STATE_NGHTTP2_RECV_PAUSED;
1229
+ return NGHTTP2_ERR_PAUSE;
1230
+ }
1231
+
1201
1232
return 0 ;
1202
1233
}
1203
1234
@@ -1289,6 +1320,7 @@ void Http2StreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
1289
1320
size_t offset = buf.base - session->stream_buf_ .base ;
1290
1321
1291
1322
// Verify that the data offset is inside the current read buffer.
1323
+ CHECK_GE (offset, session->stream_buf_offset_ );
1292
1324
CHECK_LE (offset, session->stream_buf_ .len );
1293
1325
CHECK_LE (offset + buf.len , session->stream_buf_ .len );
1294
1326
@@ -1586,6 +1618,11 @@ void Http2Session::OnStreamAfterWrite(WriteWrap* w, int status) {
1586
1618
stream_->ReadStart ();
1587
1619
}
1588
1620
1621
+ // If there is more incoming data queued up, consume it.
1622
+ if (stream_buf_offset_ > 0 ) {
1623
+ ConsumeHTTP2Data ();
1624
+ }
1625
+
1589
1626
if (!(flags_ & SESSION_STATE_WRITE_SCHEDULED)) {
1590
1627
// Schedule a new write if nghttp2 wants to send data.
1591
1628
MaybeScheduleWrite ();
@@ -1643,6 +1680,7 @@ void Http2Session::ClearOutgoing(int status) {
1643
1680
1644
1681
if (outgoing_buffers_.size () > 0 ) {
1645
1682
outgoing_storage_.clear ();
1683
+ outgoing_length_ = 0 ;
1646
1684
1647
1685
std::vector<nghttp2_stream_write> current_outgoing_buffers_;
1648
1686
current_outgoing_buffers_.swap (outgoing_buffers_);
@@ -1669,6 +1707,11 @@ void Http2Session::ClearOutgoing(int status) {
1669
1707
}
1670
1708
}
1671
1709
1710
+ void Http2Session::PushOutgoingBuffer (nghttp2_stream_write&& write) {
1711
+ outgoing_length_ += write .buf .len ;
1712
+ outgoing_buffers_.emplace_back (std::move (write ));
1713
+ }
1714
+
1672
1715
// Queue a given block of data for sending. This always creates a copy,
1673
1716
// so it is used for the cases in which nghttp2 requests sending of a
1674
1717
// small chunk of data.
@@ -1681,7 +1724,7 @@ void Http2Session::CopyDataIntoOutgoing(const uint8_t* src, size_t src_length) {
1681
1724
// of the outgoing_buffers_ vector may invalidate the pointer.
1682
1725
// The correct base pointers will be set later, before writing to the
1683
1726
// underlying socket.
1684
- outgoing_buffers_. emplace_back (nghttp2_stream_write {
1727
+ PushOutgoingBuffer (nghttp2_stream_write {
1685
1728
uv_buf_init (nullptr , src_length)
1686
1729
});
1687
1730
}
@@ -1804,13 +1847,13 @@ int Http2Session::OnSendData(
1804
1847
if (write .buf .len <= length) {
1805
1848
// This write does not suffice by itself, so we can consume it completely.
1806
1849
length -= write .buf .len ;
1807
- session->outgoing_buffers_ . emplace_back (std::move (write ));
1850
+ session->PushOutgoingBuffer (std::move (write ));
1808
1851
stream->queue_ .pop ();
1809
1852
continue ;
1810
1853
}
1811
1854
1812
1855
// Slice off `length` bytes of the first write in the queue.
1813
- session->outgoing_buffers_ . emplace_back (nghttp2_stream_write {
1856
+ session->PushOutgoingBuffer (nghttp2_stream_write {
1814
1857
uv_buf_init (write .buf .base , length)
1815
1858
});
1816
1859
write .buf .base += length;
@@ -1820,7 +1863,7 @@ int Http2Session::OnSendData(
1820
1863
1821
1864
if (frame->data .padlen > 0 ) {
1822
1865
// Send padding if that was requested.
1823
- session->outgoing_buffers_ . emplace_back (nghttp2_stream_write {
1866
+ session->PushOutgoingBuffer (nghttp2_stream_write {
1824
1867
uv_buf_init (const_cast <char *>(zero_bytes_256), frame->data .padlen - 1 )
1825
1868
});
1826
1869
}
@@ -1853,8 +1896,6 @@ void Http2Session::OnStreamRead(ssize_t nread, const uv_buf_t& buf_) {
1853
1896
Http2Scope h2scope (this );
1854
1897
CHECK_NOT_NULL (stream_);
1855
1898
Debug (this , " receiving %d bytes" , nread);
1856
- CHECK_EQ (stream_buf_allocation_.base , nullptr );
1857
- CHECK (stream_buf_ab_.IsEmpty ());
1858
1899
1859
1900
// Only pass data on if nread > 0
1860
1901
if (nread <= 0 ) {
@@ -1865,26 +1906,33 @@ void Http2Session::OnStreamRead(ssize_t nread, const uv_buf_t& buf_) {
1865
1906
return ;
1866
1907
}
1867
1908
1868
- // Shrink to the actual amount of used data.
1869
1909
uv_buf_t buf = buf_;
1870
- buf.base = Realloc (buf.base , nread);
1871
1910
1872
- IncrementCurrentSessionMemory (nread);
1873
- OnScopeLeave on_scope_leave ([&]() {
1874
- // Once finished handling this write, reset the stream buffer.
1875
- // The memory has either been free()d or was handed over to V8.
1876
- // We use `nread` instead of `buf.size()` here, because the buffer is
1877
- // cleared as part of the `.ToArrayBuffer()` call below.
1878
- DecrementCurrentSessionMemory (nread);
1911
+ statistics_.data_received += nread;
1912
+
1913
+ if (UNLIKELY (stream_buf_offset_ > 0 )) {
1914
+ // This is a very unlikely case, and should only happen if the ReadStart()
1915
+ // call in OnStreamAfterWrite() immediately provides data. If that does
1916
+ // happen, we concatenate the data we received with the already-stored
1917
+ // pending input data, slicing off the already processed part.
1918
+ char * new_buf = Malloc (stream_buf_.len - stream_buf_offset_ + nread);
1919
+ memcpy (new_buf,
1920
+ stream_buf_.base + stream_buf_offset_,
1921
+ stream_buf_.len - stream_buf_offset_);
1922
+ memcpy (new_buf + stream_buf_.len - stream_buf_offset_,
1923
+ buf.base ,
1924
+ nread);
1925
+ free (buf.base );
1926
+ nread = stream_buf_.len - stream_buf_offset_ + nread;
1927
+ buf = uv_buf_init (new_buf, nread);
1928
+ stream_buf_offset_ = 0 ;
1879
1929
stream_buf_ab_.Reset ();
1880
- free (stream_buf_allocation_.base );
1881
- stream_buf_allocation_ = uv_buf_init (nullptr , 0 );
1882
- stream_buf_ = uv_buf_init (nullptr , 0 );
1883
- });
1930
+ DecrementCurrentSessionMemory (stream_buf_offset_);
1931
+ }
1884
1932
1885
- // Make sure that there was no read previously active .
1886
- CHECK_NULL (stream_buf_ .base );
1887
- CHECK_EQ (stream_buf_. len , 0 );
1933
+ // Shrink to the actual amount of used data .
1934
+ buf. base = Realloc (buf .base , nread );
1935
+ IncrementCurrentSessionMemory (nread );
1888
1936
1889
1937
// Remember the current buffer, so that OnDataChunkReceived knows the
1890
1938
// offset of a DATA frame's data into the socket read buffer.
@@ -1903,8 +1951,7 @@ void Http2Session::OnStreamRead(ssize_t nread, const uv_buf_t& buf_) {
1903
1951
// to copy memory.
1904
1952
stream_buf_allocation_ = buf;
1905
1953
1906
- statistics_.data_received += nread;
1907
- ssize_t ret = Write (&stream_buf_, 1 );
1954
+ ssize_t ret = ConsumeHTTP2Data ();
1908
1955
1909
1956
if (UNLIKELY (ret < 0 )) {
1910
1957
Debug (this , " fatal error receiving data: %d" , ret);
0 commit comments