Skip to content

Commit 8b0a1b3

Browse files
addaleaxMylesBorins
authored andcommitted
http2: refactor read mechanism
Refactor the read mechanism to completely avoid copying. Instead of copying individual `DATA` frame contents into buffers, create `ArrayBuffer` instances for all socket reads and emit slices of those `ArrayBuffer`s to JS. Backport-PR-URL: #20456 PR-URL: #18030 Reviewed-By: James M Snell <[email protected]>
1 parent 4e807d6 commit 8b0a1b3

File tree

7 files changed

+174
-127
lines changed

7 files changed

+174
-127
lines changed

lib/internal/http2/core.js

+12-18
Original file line numberDiff line numberDiff line change
@@ -277,16 +277,15 @@ function submitRstStream(code) {
277277
// point, close them. If there is an open fd for file send, close that also.
278278
// At this point the underlying node::http2:Http2Stream handle is no
279279
// longer usable so destroy it also.
280-
function onStreamClose(code, hasData) {
280+
function onStreamClose(code) {
281281
const stream = this[kOwner];
282282
if (stream.destroyed)
283283
return;
284284

285285
const state = stream[kState];
286286

287287
debug(`Http2Stream ${stream[kID]} [Http2Session ` +
288-
`${sessionName(stream[kSession][kType])}]: closed with code ${code}` +
289-
` [has data? ${hasData}]`);
288+
`${sessionName(stream[kSession][kType])}]: closed with code ${code}`);
290289

291290
if (!stream.closed) {
292291
// Unenroll from timeouts
@@ -304,21 +303,22 @@ function onStreamClose(code, hasData) {
304303

305304
if (state.fd !== undefined)
306305
tryClose(state.fd);
307-
stream[kMaybeDestroy](null, code, hasData);
306+
stream.push(null);
307+
stream[kMaybeDestroy](null, code);
308308
}
309309

310310
// Receives a chunk of data for a given stream and forwards it on
311311
// to the Http2Stream Duplex for processing.
312-
function onStreamRead(nread, buf, handle) {
313-
const stream = handle[kOwner];
312+
function onStreamRead(nread, buf) {
313+
const stream = this[kOwner];
314314
if (nread >= 0 && !stream.destroyed) {
315315
debug(`Http2Stream ${stream[kID]} [Http2Session ` +
316316
`${sessionName(stream[kSession][kType])}]: receiving data chunk ` +
317317
`of size ${nread}`);
318318
stream[kUpdateTimer]();
319319
if (!stream.push(buf)) {
320320
if (!stream.destroyed) // we have to check a second time
321-
handle.readStop();
321+
this.readStop();
322322
}
323323
return;
324324
}
@@ -1427,13 +1427,8 @@ function streamOnResume() {
14271427
}
14281428

14291429
function streamOnPause() {
1430-
// if (!this.destroyed && !this.pending)
1431-
// this[kHandle].readStop();
1432-
}
1433-
1434-
function handleFlushData(self) {
14351430
if (!this.destroyed && !this.pending)
1436-
this[kHandle].flushData();
1431+
this[kHandle].readStop();
14371432
}
14381433

14391434
// If the writable side of the Http2Stream is still open, emit the
@@ -1679,11 +1674,10 @@ class Http2Stream extends Duplex {
16791674
this.push(null);
16801675
return;
16811676
}
1682-
const flushfn = handleFlushData.bind(this);
16831677
if (!this.pending) {
1684-
flushfn();
1678+
streamOnResume.call(this);
16851679
} else {
1686-
this.once('ready', flushfn);
1680+
this.once('ready', streamOnResume);
16871681
}
16881682
}
16891683

@@ -1822,10 +1816,10 @@ class Http2Stream extends Duplex {
18221816

18231817
// The Http2Stream can be destroyed if it has closed and if the readable
18241818
// side has received the final chunk.
1825-
[kMaybeDestroy](error, code = NGHTTP2_NO_ERROR, hasData = true) {
1819+
[kMaybeDestroy](error, code = NGHTTP2_NO_ERROR) {
18261820
if (error == null) {
18271821
if (code === NGHTTP2_NO_ERROR &&
1828-
((!this._readableState.ended && hasData) ||
1822+
(!this._readableState.ended ||
18291823
!this._writableState.ended ||
18301824
this._writableState.pendingcb > 0 ||
18311825
!this.closed)) {

src/node_http2.cc

+85-99
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
namespace node {
1111

12+
using v8::ArrayBuffer;
1213
using v8::Boolean;
1314
using v8::Context;
1415
using v8::Float64Array;
@@ -979,7 +980,6 @@ inline int Http2Session::OnStreamClose(nghttp2_session* handle,
979980
// Intentionally ignore the callback if the stream does not exist or has
980981
// already been destroyed
981982
if (stream != nullptr && !stream->IsDestroyed()) {
982-
stream->AddChunk(nullptr, 0);
983983
stream->Close(code);
984984
// It is possible for the stream close to occur before the stream is
985985
// ever passed on to the javascript side. If that happens, skip straight
@@ -990,9 +990,8 @@ inline int Http2Session::OnStreamClose(nghttp2_session* handle,
990990
stream->object()->Get(context, env->onstreamclose_string())
991991
.ToLocalChecked();
992992
if (fn->IsFunction()) {
993-
Local<Value> argv[2] = {
994-
Integer::NewFromUnsigned(isolate, code),
995-
Boolean::New(isolate, stream->HasDataChunks(true))
993+
Local<Value> argv[] = {
994+
Integer::NewFromUnsigned(isolate, code)
996995
};
997996
stream->MakeCallback(fn.As<Function>(), arraysize(argv), argv);
998997
} else {
@@ -1029,6 +1028,8 @@ inline int Http2Session::OnDataChunkReceived(nghttp2_session* handle,
10291028
Http2Session* session = static_cast<Http2Session*>(user_data);
10301029
DEBUG_HTTP2SESSION2(session, "buffering data chunk for stream %d, size: "
10311030
"%d, flags: %d", id, len, flags);
1031+
Environment* env = session->env();
1032+
HandleScope scope(env->isolate());
10321033
// We should never actually get a 0-length chunk so this check is
10331034
// only a precaution at this point.
10341035
if (len > 0) {
@@ -1040,8 +1041,25 @@ inline int Http2Session::OnDataChunkReceived(nghttp2_session* handle,
10401041
// If the stream has been destroyed, ignore this chunk
10411042
if (stream->IsDestroyed())
10421043
return 0;
1044+
10431045
stream->statistics_.received_bytes += len;
1044-
stream->AddChunk(data, len);
1046+
1047+
// There is a single large array buffer for the entire data read from the
1048+
// network; create a slice of that array buffer and emit it as the
1049+
// received data buffer.
1050+
CHECK(!session->stream_buf_ab_.IsEmpty());
1051+
size_t offset = reinterpret_cast<const char*>(data) - session->stream_buf_;
1052+
// Verify that the data offset is inside the current read buffer.
1053+
CHECK_LE(offset, session->stream_buf_size_);
1054+
1055+
Local<Object> buf =
1056+
Buffer::New(env, session->stream_buf_ab_, offset, len).ToLocalChecked();
1057+
1058+
stream->EmitData(len, buf, Local<Object>());
1059+
if (!stream->IsReading())
1060+
stream->inbound_consumed_data_while_paused_ += len;
1061+
else
1062+
nghttp2_session_consume_stream(handle, id, len);
10451063
}
10461064
return 0;
10471065
}
@@ -1227,9 +1245,8 @@ inline void Http2Session::HandlePriorityFrame(const nghttp2_frame* frame) {
12271245

12281246

12291247
// Called by OnFrameReceived when a complete DATA frame has been received.
1230-
// If we know that this is the last DATA frame (because the END_STREAM flag
1231-
// is set), then we'll terminate the readable side of the StreamBase. If
1232-
// the StreamBase is flowing, we'll push the chunks of data out to JS land.
1248+
// If we know that this was the last DATA frame (because the END_STREAM flag
1249+
// is set), then we'll terminate the readable side of the StreamBase.
12331250
inline void Http2Session::HandleDataFrame(const nghttp2_frame* frame) {
12341251
int32_t id = GetFrameID(frame);
12351252
DEBUG_HTTP2SESSION2(this, "handling data frame for stream %d", id);
@@ -1240,11 +1257,8 @@ inline void Http2Session::HandleDataFrame(const nghttp2_frame* frame) {
12401257
return;
12411258

12421259
if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM) {
1243-
stream->AddChunk(nullptr, 0);
1260+
stream->EmitData(UV_EOF, Local<Object>(), Local<Object>());
12441261
}
1245-
1246-
if (stream->IsReading())
1247-
stream->FlushDataChunks();
12481262
}
12491263

12501264

@@ -1619,45 +1633,67 @@ void Http2Session::OnStreamAllocImpl(size_t suggested_size,
16191633
uv_buf_t* buf,
16201634
void* ctx) {
16211635
Http2Session* session = static_cast<Http2Session*>(ctx);
1622-
buf->base = session->stream_alloc();
1623-
buf->len = kAllocBufferSize;
1636+
CHECK_EQ(session->stream_buf_, nullptr);
1637+
CHECK_EQ(session->stream_buf_size_, 0);
1638+
buf->base = session->stream_buf_ = Malloc(suggested_size);
1639+
buf->len = session->stream_buf_size_ = suggested_size;
1640+
session->IncrementCurrentSessionMemory(suggested_size);
16241641
}
16251642

16261643
// Callback used to receive inbound data from the i/o stream
16271644
void Http2Session::OnStreamReadImpl(ssize_t nread,
1628-
const uv_buf_t* bufs,
1645+
const uv_buf_t* buf,
16291646
uv_handle_type pending,
16301647
void* ctx) {
16311648
Http2Session* session = static_cast<Http2Session*>(ctx);
16321649
Http2Scope h2scope(session);
16331650
CHECK_NE(session->stream_, nullptr);
16341651
DEBUG_HTTP2SESSION2(session, "receiving %d bytes", nread);
1635-
if (nread < 0) {
1636-
uv_buf_t tmp_buf;
1637-
tmp_buf.base = nullptr;
1638-
tmp_buf.len = 0;
1639-
session->prev_read_cb_.fn(nread,
1640-
&tmp_buf,
1641-
pending,
1642-
session->prev_read_cb_.ctx);
1643-
return;
1644-
}
1645-
if (bufs->len > 0) {
1652+
if (nread <= 0) {
1653+
free(session->stream_buf_);
1654+
if (nread < 0) {
1655+
uv_buf_t tmp_buf = uv_buf_init(nullptr, 0);
1656+
session->prev_read_cb_.fn(nread,
1657+
&tmp_buf,
1658+
pending,
1659+
session->prev_read_cb_.ctx);
1660+
}
1661+
} else {
16461662
// Only pass data on if nread > 0
1647-
uv_buf_t buf[] { uv_buf_init((*bufs).base, nread) };
1663+
1664+
// Verify that currently: There is memory allocated into which
1665+
// the data has been read, and that memory buffer is at least as large
1666+
// as the amount of data we have read, but we have not yet made an
1667+
// ArrayBuffer out of it.
1668+
CHECK_NE(session->stream_buf_, nullptr);
1669+
CHECK_EQ(session->stream_buf_, buf->base);
1670+
CHECK_EQ(session->stream_buf_size_, buf->len);
1671+
CHECK_GE(session->stream_buf_size_, static_cast<size_t>(nread));
1672+
CHECK(session->stream_buf_ab_.IsEmpty());
1673+
1674+
Environment* env = session->env();
1675+
Isolate* isolate = env->isolate();
1676+
HandleScope scope(isolate);
1677+
Local<Context> context = env->context();
1678+
Context::Scope context_scope(context);
1679+
1680+
// Create an array buffer for the read data. DATA frames will be emitted
1681+
// as slices of this array buffer to avoid having to copy memory.
1682+
session->stream_buf_ab_ =
1683+
ArrayBuffer::New(isolate,
1684+
session->stream_buf_,
1685+
session->stream_buf_size_,
1686+
v8::ArrayBufferCreationMode::kInternalized);
1687+
1688+
uv_buf_t buf_ = uv_buf_init(buf->base, nread);
16481689
session->statistics_.data_received += nread;
1649-
ssize_t ret = session->Write(buf, 1);
1690+
ssize_t ret = session->Write(&buf_, 1);
16501691

16511692
// Note: if ssize_t is not defined (e.g. on Win32), nghttp2 will typedef
16521693
// ssize_t to int. Cast here so that the < 0 check actually works on
16531694
// Windows.
16541695
if (static_cast<int>(ret) < 0) {
16551696
DEBUG_HTTP2SESSION2(session, "fatal error receiving data: %d", ret);
1656-
Environment* env = session->env();
1657-
Isolate* isolate = env->isolate();
1658-
HandleScope scope(isolate);
1659-
Local<Context> context = env->context();
1660-
Context::Scope context_scope(context);
16611697

16621698
Local<Value> argv[1] = {
16631699
Integer::New(isolate, ret),
@@ -1668,6 +1704,13 @@ void Http2Session::OnStreamReadImpl(ssize_t nread,
16681704
nghttp2_session_want_read(**session));
16691705
}
16701706
}
1707+
1708+
// Since we are finished handling this write, reset the stream buffer.
1709+
// The memory has either been free()d or was handed over to V8.
1710+
session->DecrementCurrentSessionMemory(session->stream_buf_size_);
1711+
session->stream_buf_ = nullptr;
1712+
session->stream_buf_size_ = 0;
1713+
session->stream_buf_ab_ = Local<ArrayBuffer>();
16711714
}
16721715

16731716
void Http2Session::OnStreamDestructImpl(void* ctx) {
@@ -1782,30 +1825,6 @@ void Http2Stream::OnTrailers(const SubmitTrailers& submit_trailers) {
17821825
}
17831826
}
17841827

1785-
inline bool Http2Stream::HasDataChunks(bool ignore_eos) {
1786-
return data_chunks_.size() > (ignore_eos ? 1 : 0);
1787-
}
1788-
1789-
// Appends a chunk of received DATA frame data to this Http2Streams internal
1790-
// queue. Note that we must memcpy each chunk because of the way that nghttp2
1791-
// handles it's internal memory`.
1792-
inline void Http2Stream::AddChunk(const uint8_t* data, size_t len) {
1793-
CHECK(!this->IsDestroyed());
1794-
if (this->statistics_.first_byte == 0)
1795-
this->statistics_.first_byte = uv_hrtime();
1796-
if (flags_ & NGHTTP2_STREAM_FLAG_EOS)
1797-
return;
1798-
char* buf = nullptr;
1799-
if (len > 0 && data != nullptr) {
1800-
buf = Malloc<char>(len);
1801-
memcpy(buf, data, len);
1802-
} else if (data == nullptr) {
1803-
flags_ |= NGHTTP2_STREAM_FLAG_EOS;
1804-
}
1805-
data_chunks_.emplace(uv_buf_init(buf, len));
1806-
}
1807-
1808-
18091828
inline void Http2Stream::Close(int32_t code) {
18101829
CHECK(!this->IsDestroyed());
18111830
flags_ |= NGHTTP2_STREAM_FLAG_CLOSED;
@@ -1842,13 +1861,6 @@ inline void Http2Stream::Destroy() {
18421861

18431862
DEBUG_HTTP2STREAM(this, "destroying stream");
18441863

1845-
// Free any remaining incoming data chunks.
1846-
while (!data_chunks_.empty()) {
1847-
uv_buf_t buf = data_chunks_.front();
1848-
free(buf.base);
1849-
data_chunks_.pop();
1850-
}
1851-
18521864
// Wait until the start of the next loop to delete because there
18531865
// may still be some pending operations queued for this stream.
18541866
env()->SetImmediate([](Environment* env, void* data) {
@@ -1874,39 +1886,6 @@ inline void Http2Stream::Destroy() {
18741886
}
18751887

18761888

1877-
// Uses the StreamBase API to push a single chunk of queued inbound DATA
1878-
// to JS land.
1879-
void Http2Stream::OnDataChunk(uv_buf_t* chunk) {
1880-
CHECK(!this->IsDestroyed());
1881-
Isolate* isolate = env()->isolate();
1882-
HandleScope scope(isolate);
1883-
ssize_t len = -1;
1884-
Local<Object> buf;
1885-
if (chunk != nullptr) {
1886-
len = chunk->len;
1887-
buf = Buffer::New(isolate, chunk->base, len).ToLocalChecked();
1888-
}
1889-
EmitData(len, buf, this->object());
1890-
}
1891-
1892-
1893-
inline void Http2Stream::FlushDataChunks() {
1894-
CHECK(!this->IsDestroyed());
1895-
Http2Scope h2scope(this);
1896-
if (!data_chunks_.empty()) {
1897-
uv_buf_t buf = data_chunks_.front();
1898-
data_chunks_.pop();
1899-
if (buf.len > 0) {
1900-
CHECK_EQ(nghttp2_session_consume_stream(session_->session(),
1901-
id_, buf.len), 0);
1902-
OnDataChunk(&buf);
1903-
} else {
1904-
OnDataChunk(nullptr);
1905-
}
1906-
}
1907-
}
1908-
1909-
19101889
// Initiates a response on the Http2Stream using data provided via the
19111890
// StreamBase Streams API.
19121891
inline int Http2Stream::SubmitResponse(nghttp2_nv* nva,
@@ -2013,13 +1992,20 @@ inline Http2Stream* Http2Stream::SubmitPushPromise(nghttp2_nv* nva,
20131992
// Switch the StreamBase into flowing mode to begin pushing chunks of data
20141993
// out to JS land.
20151994
inline int Http2Stream::ReadStart() {
1995+
Http2Scope h2scope(this);
20161996
CHECK(!this->IsDestroyed());
20171997
flags_ |= NGHTTP2_STREAM_FLAG_READ_START;
20181998
flags_ &= ~NGHTTP2_STREAM_FLAG_READ_PAUSED;
20191999

2020-
// Flush any queued data chunks immediately out to the JS layer
2021-
FlushDataChunks();
20222000
DEBUG_HTTP2STREAM(this, "reading starting");
2001+
2002+
// Tell nghttp2 about our consumption of the data that was handed
2003+
// off to JS land.
2004+
nghttp2_session_consume_stream(session_->session(),
2005+
id_,
2006+
inbound_consumed_data_while_paused_);
2007+
inbound_consumed_data_while_paused_ = 0;
2008+
20232009
return 0;
20242010
}
20252011

0 commit comments

Comments
 (0)