9
9
10
10
namespace node {
11
11
12
+ using v8::ArrayBuffer;
12
13
using v8::Boolean ;
13
14
using v8::Context;
14
15
using v8::Float64Array;
@@ -978,7 +979,6 @@ inline int Http2Session::OnStreamClose(nghttp2_session* handle,
978
979
// Intentionally ignore the callback if the stream does not exist or has
979
980
// already been destroyed
980
981
if (stream != nullptr && !stream->IsDestroyed ()) {
981
- stream->AddChunk (nullptr , 0 );
982
982
stream->Close (code);
983
983
// It is possible for the stream close to occur before the stream is
984
984
// ever passed on to the javascript side. If that happens, skip straight
@@ -989,9 +989,8 @@ inline int Http2Session::OnStreamClose(nghttp2_session* handle,
989
989
stream->object ()->Get (context, env->onstreamclose_string ())
990
990
.ToLocalChecked ();
991
991
if (fn->IsFunction ()) {
992
- Local<Value> argv[2 ] = {
993
- Integer::NewFromUnsigned (isolate, code),
994
- Boolean::New (isolate, stream->HasDataChunks (true ))
992
+ Local<Value> argv[] = {
993
+ Integer::NewFromUnsigned (isolate, code)
995
994
};
996
995
stream->MakeCallback (fn.As <Function>(), arraysize (argv), argv);
997
996
} else {
@@ -1028,6 +1027,8 @@ inline int Http2Session::OnDataChunkReceived(nghttp2_session* handle,
1028
1027
Http2Session* session = static_cast <Http2Session*>(user_data);
1029
1028
DEBUG_HTTP2SESSION2 (session, " buffering data chunk for stream %d, size: "
1030
1029
" %d, flags: %d" , id, len, flags);
1030
+ Environment* env = session->env ();
1031
+ HandleScope scope (env->isolate ());
1031
1032
// We should never actually get a 0-length chunk so this check is
1032
1033
// only a precaution at this point.
1033
1034
if (len > 0 ) {
@@ -1039,8 +1040,25 @@ inline int Http2Session::OnDataChunkReceived(nghttp2_session* handle,
1039
1040
// If the stream has been destroyed, ignore this chunk
1040
1041
if (stream->IsDestroyed ())
1041
1042
return 0 ;
1043
+
1042
1044
stream->statistics_ .received_bytes += len;
1043
- stream->AddChunk (data, len);
1045
+
1046
+ // There is a single large array buffer for the entire data read from the
1047
+ // network; create a slice of that array buffer and emit it as the
1048
+ // received data buffer.
1049
+ CHECK (!session->stream_buf_ab_ .IsEmpty ());
1050
+ size_t offset = reinterpret_cast <const char *>(data) - session->stream_buf_ ;
1051
+ // Verify that the data offset is inside the current read buffer.
1052
+ CHECK_LE (offset, session->stream_buf_size_ );
1053
+
1054
+ Local<Object> buf =
1055
+ Buffer::New (env, session->stream_buf_ab_ , offset, len).ToLocalChecked ();
1056
+
1057
+ stream->EmitData (len, buf, Local<Object>());
1058
+ if (!stream->IsReading ())
1059
+ stream->inbound_consumed_data_while_paused_ += len;
1060
+ else
1061
+ nghttp2_session_consume_stream (handle, id, len);
1044
1062
}
1045
1063
return 0 ;
1046
1064
}
@@ -1226,9 +1244,8 @@ inline void Http2Session::HandlePriorityFrame(const nghttp2_frame* frame) {
1226
1244
1227
1245
1228
1246
// Called by OnFrameReceived when a complete DATA frame has been received.
1229
- // If we know that this is the last DATA frame (because the END_STREAM flag
1230
- // is set), then we'll terminate the readable side of the StreamBase. If
1231
- // the StreamBase is flowing, we'll push the chunks of data out to JS land.
1247
+ // If we know that this was the last DATA frame (because the END_STREAM flag
1248
+ // is set), then we'll terminate the readable side of the StreamBase.
1232
1249
inline void Http2Session::HandleDataFrame (const nghttp2_frame* frame) {
1233
1250
int32_t id = GetFrameID (frame);
1234
1251
DEBUG_HTTP2SESSION2 (this , " handling data frame for stream %d" , id);
@@ -1239,11 +1256,8 @@ inline void Http2Session::HandleDataFrame(const nghttp2_frame* frame) {
1239
1256
return ;
1240
1257
1241
1258
if (frame->hd .flags & NGHTTP2_FLAG_END_STREAM) {
1242
- stream->AddChunk ( nullptr , 0 );
1259
+ stream->EmitData (UV_EOF, Local<Object>(), Local<Object>() );
1243
1260
}
1244
-
1245
- if (stream->IsReading ())
1246
- stream->FlushDataChunks ();
1247
1261
}
1248
1262
1249
1263
@@ -1618,45 +1632,67 @@ void Http2Session::OnStreamAllocImpl(size_t suggested_size,
1618
1632
uv_buf_t * buf,
1619
1633
void * ctx) {
1620
1634
Http2Session* session = static_cast <Http2Session*>(ctx);
1621
- buf->base = session->stream_alloc ();
1622
- buf->len = kAllocBufferSize ;
1635
+ CHECK_EQ (session->stream_buf_ , nullptr );
1636
+ CHECK_EQ (session->stream_buf_size_ , 0 );
1637
+ buf->base = session->stream_buf_ = Malloc (suggested_size);
1638
+ buf->len = session->stream_buf_size_ = suggested_size;
1639
+ session->IncrementCurrentSessionMemory (suggested_size);
1623
1640
}
1624
1641
1625
1642
// Callback used to receive inbound data from the i/o stream
1626
1643
void Http2Session::OnStreamReadImpl (ssize_t nread,
1627
- const uv_buf_t * bufs ,
1644
+ const uv_buf_t * buf ,
1628
1645
uv_handle_type pending,
1629
1646
void * ctx) {
1630
1647
Http2Session* session = static_cast <Http2Session*>(ctx);
1631
1648
Http2Scope h2scope (session);
1632
1649
CHECK_NE (session->stream_ , nullptr );
1633
1650
DEBUG_HTTP2SESSION2 (session, " receiving %d bytes" , nread);
1634
- if (nread < 0 ) {
1635
- uv_buf_t tmp_buf;
1636
- tmp_buf.base = nullptr ;
1637
- tmp_buf.len = 0 ;
1638
- session->prev_read_cb_ .fn (nread,
1639
- &tmp_buf,
1640
- pending,
1641
- session->prev_read_cb_ .ctx );
1642
- return ;
1643
- }
1644
- if (bufs->len > 0 ) {
1651
+ if (nread <= 0 ) {
1652
+ free (session->stream_buf_ );
1653
+ if (nread < 0 ) {
1654
+ uv_buf_t tmp_buf = uv_buf_init (nullptr , 0 );
1655
+ session->prev_read_cb_ .fn (nread,
1656
+ &tmp_buf,
1657
+ pending,
1658
+ session->prev_read_cb_ .ctx );
1659
+ }
1660
+ } else {
1645
1661
// Only pass data on if nread > 0
1646
- uv_buf_t buf[] { uv_buf_init ((*bufs).base , nread) };
1662
+
1663
+ // Verify that currently: There is memory allocated into which
1664
+ // the data has been read, and that memory buffer is at least as large
1665
+ // as the amount of data we have read, but we have not yet made an
1666
+ // ArrayBuffer out of it.
1667
+ CHECK_NE (session->stream_buf_ , nullptr );
1668
+ CHECK_EQ (session->stream_buf_ , buf->base );
1669
+ CHECK_EQ (session->stream_buf_size_ , buf->len );
1670
+ CHECK_GE (session->stream_buf_size_ , static_cast <size_t >(nread));
1671
+ CHECK (session->stream_buf_ab_ .IsEmpty ());
1672
+
1673
+ Environment* env = session->env ();
1674
+ Isolate* isolate = env->isolate ();
1675
+ HandleScope scope (isolate);
1676
+ Local<Context> context = env->context ();
1677
+ Context::Scope context_scope (context);
1678
+
1679
+ // Create an array buffer for the read data. DATA frames will be emitted
1680
+ // as slices of this array buffer to avoid having to copy memory.
1681
+ session->stream_buf_ab_ =
1682
+ ArrayBuffer::New (isolate,
1683
+ session->stream_buf_ ,
1684
+ session->stream_buf_size_ ,
1685
+ v8::ArrayBufferCreationMode::kInternalized );
1686
+
1687
+ uv_buf_t buf_ = uv_buf_init (buf->base , nread);
1647
1688
session->statistics_ .data_received += nread;
1648
- ssize_t ret = session->Write (buf , 1 );
1689
+ ssize_t ret = session->Write (&buf_ , 1 );
1649
1690
1650
1691
// Note: if ssize_t is not defined (e.g. on Win32), nghttp2 will typedef
1651
1692
// ssize_t to int. Cast here so that the < 0 check actually works on
1652
1693
// Windows.
1653
1694
if (static_cast <int >(ret) < 0 ) {
1654
1695
DEBUG_HTTP2SESSION2 (session, " fatal error receiving data: %d" , ret);
1655
- Environment* env = session->env ();
1656
- Isolate* isolate = env->isolate ();
1657
- HandleScope scope (isolate);
1658
- Local<Context> context = env->context ();
1659
- Context::Scope context_scope (context);
1660
1696
1661
1697
Local<Value> argv[1 ] = {
1662
1698
Integer::New (isolate, ret),
@@ -1667,6 +1703,13 @@ void Http2Session::OnStreamReadImpl(ssize_t nread,
1667
1703
nghttp2_session_want_read (**session));
1668
1704
}
1669
1705
}
1706
+
1707
+ // Since we are finished handling this write, reset the stream buffer.
1708
+ // The memory has either been free()d or was handed over to V8.
1709
+ session->DecrementCurrentSessionMemory (session->stream_buf_size_ );
1710
+ session->stream_buf_ = nullptr ;
1711
+ session->stream_buf_size_ = 0 ;
1712
+ session->stream_buf_ab_ = Local<ArrayBuffer>();
1670
1713
}
1671
1714
1672
1715
void Http2Session::OnStreamDestructImpl (void * ctx) {
@@ -1781,30 +1824,6 @@ void Http2Stream::OnTrailers(const SubmitTrailers& submit_trailers) {
1781
1824
}
1782
1825
}
1783
1826
1784
- inline bool Http2Stream::HasDataChunks (bool ignore_eos) {
1785
- return data_chunks_.size () > (ignore_eos ? 1 : 0 );
1786
- }
1787
-
1788
- // Appends a chunk of received DATA frame data to this Http2Streams internal
1789
- // queue. Note that we must memcpy each chunk because of the way that nghttp2
1790
- // handles it's internal memory`.
1791
- inline void Http2Stream::AddChunk (const uint8_t * data, size_t len) {
1792
- CHECK (!this ->IsDestroyed ());
1793
- if (this ->statistics_ .first_byte == 0 )
1794
- this ->statistics_ .first_byte = uv_hrtime ();
1795
- if (flags_ & NGHTTP2_STREAM_FLAG_EOS)
1796
- return ;
1797
- char * buf = nullptr ;
1798
- if (len > 0 && data != nullptr ) {
1799
- buf = Malloc<char >(len);
1800
- memcpy (buf, data, len);
1801
- } else if (data == nullptr ) {
1802
- flags_ |= NGHTTP2_STREAM_FLAG_EOS;
1803
- }
1804
- data_chunks_.emplace (uv_buf_init (buf, len));
1805
- }
1806
-
1807
-
1808
1827
inline void Http2Stream::Close (int32_t code) {
1809
1828
CHECK (!this ->IsDestroyed ());
1810
1829
flags_ |= NGHTTP2_STREAM_FLAG_CLOSED;
@@ -1841,13 +1860,6 @@ inline void Http2Stream::Destroy() {
1841
1860
1842
1861
DEBUG_HTTP2STREAM (this , " destroying stream" );
1843
1862
1844
- // Free any remaining incoming data chunks.
1845
- while (!data_chunks_.empty ()) {
1846
- uv_buf_t buf = data_chunks_.front ();
1847
- free (buf.base );
1848
- data_chunks_.pop ();
1849
- }
1850
-
1851
1863
// Wait until the start of the next loop to delete because there
1852
1864
// may still be some pending operations queued for this stream.
1853
1865
env ()->SetImmediate ([](Environment* env, void * data) {
@@ -1873,39 +1885,6 @@ inline void Http2Stream::Destroy() {
1873
1885
}
1874
1886
1875
1887
1876
- // Uses the StreamBase API to push a single chunk of queued inbound DATA
1877
- // to JS land.
1878
- void Http2Stream::OnDataChunk (uv_buf_t * chunk) {
1879
- CHECK (!this ->IsDestroyed ());
1880
- Isolate* isolate = env ()->isolate ();
1881
- HandleScope scope (isolate);
1882
- ssize_t len = -1 ;
1883
- Local<Object> buf;
1884
- if (chunk != nullptr ) {
1885
- len = chunk->len ;
1886
- buf = Buffer::New (isolate, chunk->base , len).ToLocalChecked ();
1887
- }
1888
- EmitData (len, buf, this ->object ());
1889
- }
1890
-
1891
-
1892
- inline void Http2Stream::FlushDataChunks () {
1893
- CHECK (!this ->IsDestroyed ());
1894
- Http2Scope h2scope (this );
1895
- if (!data_chunks_.empty ()) {
1896
- uv_buf_t buf = data_chunks_.front ();
1897
- data_chunks_.pop ();
1898
- if (buf.len > 0 ) {
1899
- CHECK_EQ (nghttp2_session_consume_stream (session_->session (),
1900
- id_, buf.len ), 0 );
1901
- OnDataChunk (&buf);
1902
- } else {
1903
- OnDataChunk (nullptr );
1904
- }
1905
- }
1906
- }
1907
-
1908
-
1909
1888
// Initiates a response on the Http2Stream using data provided via the
1910
1889
// StreamBase Streams API.
1911
1890
inline int Http2Stream::SubmitResponse (nghttp2_nv* nva,
@@ -2012,13 +1991,20 @@ inline Http2Stream* Http2Stream::SubmitPushPromise(nghttp2_nv* nva,
2012
1991
// Switch the StreamBase into flowing mode to begin pushing chunks of data
2013
1992
// out to JS land.
2014
1993
inline int Http2Stream::ReadStart () {
1994
+ Http2Scope h2scope (this );
2015
1995
CHECK (!this ->IsDestroyed ());
2016
1996
flags_ |= NGHTTP2_STREAM_FLAG_READ_START;
2017
1997
flags_ &= ~NGHTTP2_STREAM_FLAG_READ_PAUSED;
2018
1998
2019
- // Flush any queued data chunks immediately out to the JS layer
2020
- FlushDataChunks ();
2021
1999
DEBUG_HTTP2STREAM (this , " reading starting" );
2000
+
2001
+ // Tell nghttp2 about our consumption of the data that was handed
2002
+ // off to JS land.
2003
+ nghttp2_session_consume_stream (session_->session (),
2004
+ id_,
2005
+ inbound_consumed_data_while_paused_);
2006
+ inbound_consumed_data_while_paused_ = 0 ;
2007
+
2022
2008
return 0 ;
2023
2009
}
2024
2010
0 commit comments