Skip to content

Commit 66be8e5

Browse files
addaleaxMylesBorins
authored andcommittedJan 9, 2018
http2: refactor outgoing write mechanism
- Only finish outgoing `WriteWrap`s once data has actually been passed to the underlying socket. - This makes HTTP2 streams respect backpressure - Use `DoTryWrite` as a shortcut for sending out as much of the data synchronously without blocking as possible - Use `NGHTTP2_DATA_FLAG_NO_COPY` to avoid copying DATA frame contents into nghttp2’s buffers before sending them out. PR-URL: nodejs#17718 Reviewed-By: James M Snell <[email protected]>
1 parent 7ab5c62 commit 66be8e5

File tree

2 files changed

+231
-115
lines changed

2 files changed

+231
-115
lines changed
 

‎src/node_http2.cc

+208-109
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
#include "node_http2.h"
55
#include "node_http2_state.h"
66

7-
#include <queue>
87
#include <algorithm>
98

109
namespace node {
@@ -23,6 +22,23 @@ using v8::Undefined;
2322

2423
namespace http2 {
2524

25+
namespace {
26+
27+
const char zero_bytes_256[256] = {};
28+
29+
inline Http2Stream* GetStream(Http2Session* session,
30+
int32_t id,
31+
nghttp2_data_source* source) {
32+
Http2Stream* stream = static_cast<Http2Stream*>(source->ptr);
33+
if (stream == nullptr)
34+
stream = session->FindStream(id);
35+
CHECK_NE(stream, nullptr);
36+
CHECK_EQ(id, stream->id());
37+
return stream;
38+
}
39+
40+
} // anonymous namespace
41+
2642
// These configure the callbacks required by nghttp2 itself. There are
2743
// two sets of callback functions, one that is used if a padding callback
2844
// is set, and other that does not include the padding callback.
@@ -370,6 +386,8 @@ Http2Session::Callbacks::Callbacks(bool kHasGetPaddingCallback) {
370386
callbacks, OnInvalidHeader);
371387
nghttp2_session_callbacks_set_error_callback(
372388
callbacks, OnNghttpError);
389+
nghttp2_session_callbacks_set_send_data_callback(
390+
callbacks, OnSendData);
373391

374392
if (kHasGetPaddingCallback) {
375393
nghttp2_session_callbacks_set_select_padding_callback(
@@ -419,6 +437,9 @@ Http2Session::Http2Session(Environment* env,
419437
// be catching before it gets this far. Either way, crash if this
420438
// fails.
421439
CHECK_EQ(fn(&session_, callbacks, this, *opts), 0);
440+
441+
outgoing_storage_.reserve(4096);
442+
outgoing_buffers_.reserve(32);
422443
}
423444

424445
void Http2Session::Unconsume() {
@@ -508,6 +529,7 @@ inline ssize_t Http2Session::OnMaxFrameSizePadding(size_t frameLen,
508529
// not be the preferred option.
509530
inline ssize_t Http2Session::OnCallbackPadding(size_t frameLen,
510531
size_t maxPayloadLen) {
532+
if (frameLen == 0) return 0;
511533
DEBUG_HTTP2SESSION(this, "using callback to determine padding");
512534
Isolate* isolate = env()->isolate();
513535
HandleScope handle_scope(isolate);
@@ -1033,13 +1055,28 @@ inline void Http2Session::HandleSettingsFrame(const nghttp2_frame* frame) {
10331055
MakeCallback(env()->onsettings_string(), arraysize(argv), argv);
10341056
}
10351057

1058+
// Callback used when data has been written to the stream.
1059+
void Http2Session::OnStreamAfterWriteImpl(WriteWrap* w, int status, void* ctx) {
1060+
Http2Session* session = static_cast<Http2Session*>(ctx);
1061+
DEBUG_HTTP2SESSION2(session, "write finished with status %d", status);
1062+
1063+
// Inform all pending writes about their completion.
1064+
session->ClearOutgoing(status);
1065+
1066+
if (!(session->flags_ & SESSION_STATE_WRITE_SCHEDULED)) {
1067+
// Schedule a new write if nghttp2 wants to send data.
1068+
session->MaybeScheduleWrite();
1069+
}
1070+
}
1071+
10361072
// If the underlying nghttp2_session struct has data pending in its outbound
10371073
// queue, MaybeScheduleWrite will schedule a SendPendingData() call to occcur
10381074
// on the next iteration of the Node.js event loop (using the SetImmediate
10391075
// queue), but only if a write has not already been scheduled.
10401076
void Http2Session::MaybeScheduleWrite() {
10411077
CHECK_EQ(flags_ & SESSION_STATE_WRITE_SCHEDULED, 0);
10421078
if (session_ != nullptr && nghttp2_session_want_write(session_)) {
1079+
DEBUG_HTTP2SESSION(this, "scheduling write");
10431080
flags_ |= SESSION_STATE_WRITE_SCHEDULED;
10441081
env()->SetImmediate([](Environment* env, void* data) {
10451082
Http2Session* session = static_cast<Http2Session*>(data);
@@ -1059,6 +1096,39 @@ void Http2Session::MaybeScheduleWrite() {
10591096
}
10601097
}
10611098

1099+
// Unset the sending state, finish up all current writes, and reset
1100+
// storage for data and metadata that was associated with these writes.
1101+
void Http2Session::ClearOutgoing(int status) {
1102+
CHECK_NE(flags_ & SESSION_STATE_SENDING, 0);
1103+
flags_ &= ~SESSION_STATE_SENDING;
1104+
1105+
for (const nghttp2_stream_write& wr : outgoing_buffers_) {
1106+
WriteWrap* wrap = wr.req_wrap;
1107+
if (wrap != nullptr)
1108+
wrap->Done(status);
1109+
}
1110+
1111+
outgoing_buffers_.clear();
1112+
outgoing_storage_.clear();
1113+
}
1114+
1115+
// Queue a given block of data for sending. This always creates a copy,
1116+
// so it is used for the cases in which nghttp2 requests sending of a
1117+
// small chunk of data.
1118+
void Http2Session::CopyDataIntoOutgoing(const uint8_t* src, size_t src_length) {
1119+
size_t offset = outgoing_storage_.size();
1120+
outgoing_storage_.resize(offset + src_length);
1121+
memcpy(&outgoing_storage_[offset], src, src_length);
1122+
1123+
// Store with a base of `nullptr` initially, since future resizes
1124+
// of the outgoing_buffers_ vector may invalidate the pointer.
1125+
// The correct base pointers will be set later, before writing to the
1126+
// underlying socket.
1127+
outgoing_buffers_.emplace_back(nghttp2_stream_write {
1128+
uv_buf_init(nullptr, src_length)
1129+
});
1130+
}
1131+
10621132
// Prompts nghttp2 to begin serializing it's pending data and pushes each
10631133
// chunk out to the i/o socket to be sent. This is a particularly hot method
10641134
// that will generally be called at least twice be event loop iteration.
@@ -1075,64 +1145,133 @@ void Http2Session::SendPendingData() {
10751145
// SendPendingData should not be called recursively.
10761146
if (flags_ & SESSION_STATE_SENDING)
10771147
return;
1148+
// This is cleared by ClearOutgoing().
10781149
flags_ |= SESSION_STATE_SENDING;
10791150

1080-
WriteWrap* req = nullptr;
1081-
char* dest = nullptr;
1082-
size_t destRemaining = 0;
1083-
size_t destLength = 0; // amount of data stored in dest
1084-
size_t destOffset = 0; // current write offset of dest
1085-
1086-
const uint8_t* src; // pointer to the serialized data
1087-
ssize_t srcLength = 0; // length of serialized data chunk
1088-
1089-
// While srcLength is greater than zero
1090-
while ((srcLength = nghttp2_session_mem_send(session_, &src)) > 0) {
1091-
if (req == nullptr) {
1092-
req = AllocateSend();
1093-
destRemaining = req->ExtraSize();
1094-
dest = req->Extra();
1095-
}
1096-
DEBUG_HTTP2SESSION2(this, "nghttp2 has %d bytes to send", srcLength);
1097-
size_t srcRemaining = srcLength;
1098-
size_t srcOffset = 0;
1099-
1100-
// The amount of data we have to copy is greater than the space
1101-
// remaining. Copy what we can into the remaining space, send it,
1102-
// the proceed with the rest.
1103-
while (srcRemaining > destRemaining) {
1104-
DEBUG_HTTP2SESSION2(this, "pushing %d bytes to the socket",
1105-
destLength + destRemaining);
1106-
memcpy(dest + destOffset, src + srcOffset, destRemaining);
1107-
destLength += destRemaining;
1108-
Send(req, dest, destLength);
1109-
destOffset = 0;
1110-
destLength = 0;
1111-
srcRemaining -= destRemaining;
1112-
srcOffset += destRemaining;
1113-
req = AllocateSend();
1114-
destRemaining = req->ExtraSize();
1115-
dest = req->Extra();
1116-
}
1151+
ssize_t src_length;
1152+
const uint8_t* src;
1153+
1154+
CHECK_EQ(outgoing_buffers_.size(), 0);
1155+
CHECK_EQ(outgoing_storage_.size(), 0);
11171156

1118-
if (srcRemaining > 0) {
1119-
memcpy(dest + destOffset, src + srcOffset, srcRemaining);
1120-
destLength += srcRemaining;
1121-
destOffset += srcRemaining;
1122-
destRemaining -= srcRemaining;
1123-
srcRemaining = 0;
1124-
srcOffset = 0;
1157+
// Part One: Gather data from nghttp2
1158+
1159+
while ((src_length = nghttp2_session_mem_send(session_, &src)) > 0) {
1160+
DEBUG_HTTP2SESSION2(this, "nghttp2 has %d bytes to send", src_length);
1161+
CopyDataIntoOutgoing(src, src_length);
1162+
}
1163+
1164+
CHECK_NE(src_length, NGHTTP2_ERR_NOMEM);
1165+
1166+
if (stream_ == nullptr) {
1167+
// It would seem nice to bail out earlier, but `nghttp2_session_mem_send()`
1168+
// does take care of things like closing the individual streams after
1169+
// a socket has been torn down, so we still need to call it.
1170+
ClearOutgoing(UV_ECANCELED);
1171+
return;
1172+
}
1173+
1174+
// Part Two: Pass Data to the underlying stream
1175+
1176+
size_t count = outgoing_buffers_.size();
1177+
if (count == 0) {
1178+
flags_ &= ~SESSION_STATE_SENDING;
1179+
return;
1180+
}
1181+
MaybeStackBuffer<uv_buf_t, 32> bufs;
1182+
bufs.AllocateSufficientStorage(count);
1183+
1184+
// Set the buffer base pointers for copied data that ended up in the
1185+
// sessions's own storage since it might have shifted around during gathering.
1186+
// (Those are marked by having .base == nullptr.)
1187+
size_t offset = 0;
1188+
size_t i = 0;
1189+
for (const nghttp2_stream_write& write : outgoing_buffers_) {
1190+
if (write.buf.base == nullptr) {
1191+
bufs[i++] = uv_buf_init(
1192+
reinterpret_cast<char*>(outgoing_storage_.data() + offset),
1193+
write.buf.len);
1194+
offset += write.buf.len;
1195+
} else {
1196+
bufs[i++] = write.buf;
11251197
}
11261198
}
1127-
CHECK_NE(srcLength, NGHTTP2_ERR_NOMEM);
1128-
if (destLength > 0 && srcLength >= 0) {
1129-
DEBUG_HTTP2SESSION2(this, "pushing %d bytes to the socket", destLength);
1130-
Send(req, dest, destLength);
1199+
1200+
chunks_sent_since_last_write_++;
1201+
1202+
// DoTryWrite may modify both the buffer list start itself and the
1203+
// base pointers/length of the individual buffers.
1204+
uv_buf_t* writebufs = *bufs;
1205+
if (stream_->DoTryWrite(&writebufs, &count) != 0 || count == 0) {
1206+
// All writes finished synchronously, nothing more to do here.
1207+
ClearOutgoing(0);
1208+
return;
1209+
}
1210+
1211+
WriteWrap* req = AllocateSend();
1212+
if (stream_->DoWrite(req, writebufs, count, nullptr) != 0) {
1213+
req->Dispose();
11311214
}
1215+
11321216
DEBUG_HTTP2SESSION2(this, "wants data in return? %d",
11331217
nghttp2_session_want_read(session_));
1218+
}
11341219

1135-
flags_ &= ~SESSION_STATE_SENDING;
1220+
1221+
// This callback is called from nghttp2 when it wants to send DATA frames for a
1222+
// given Http2Stream, when we set the `NGHTTP2_DATA_FLAG_NO_COPY` flag earlier
1223+
// in the Http2Stream::Provider::Stream::OnRead callback.
1224+
// We take the write information directly out of the stream's data queue.
1225+
int Http2Session::OnSendData(
1226+
nghttp2_session* session_,
1227+
nghttp2_frame* frame,
1228+
const uint8_t* framehd,
1229+
size_t length,
1230+
nghttp2_data_source* source,
1231+
void* user_data) {
1232+
Http2Session* session = static_cast<Http2Session*>(user_data);
1233+
Http2Stream* stream = GetStream(session, frame->hd.stream_id, source);
1234+
1235+
// Send the frame header + a byte that indicates padding length.
1236+
session->CopyDataIntoOutgoing(framehd, 9);
1237+
if (frame->data.padlen > 0) {
1238+
uint8_t padding_byte = frame->data.padlen - 1;
1239+
CHECK_EQ(padding_byte, frame->data.padlen - 1);
1240+
session->CopyDataIntoOutgoing(&padding_byte, 1);
1241+
}
1242+
1243+
DEBUG_HTTP2SESSION2(session, "nghttp2 has %d bytes to send directly", length);
1244+
while (length > 0) {
1245+
// nghttp2 thinks that there is data available (length > 0), which means
1246+
// we told it so, which means that we *should* have data available.
1247+
CHECK(!stream->queue_.empty());
1248+
1249+
nghttp2_stream_write& write = stream->queue_.front();
1250+
if (write.buf.len <= length) {
1251+
// This write does not suffice by itself, so we can consume it completely.
1252+
length -= write.buf.len;
1253+
session->outgoing_buffers_.emplace_back(std::move(write));
1254+
stream->queue_.pop();
1255+
continue;
1256+
}
1257+
1258+
// Slice off `length` bytes of the first write in the queue.
1259+
session->outgoing_buffers_.emplace_back(nghttp2_stream_write {
1260+
uv_buf_init(write.buf.base, length)
1261+
});
1262+
write.buf.base += length;
1263+
write.buf.len -= length;
1264+
break;
1265+
}
1266+
1267+
if (frame->data.padlen > 0) {
1268+
// Send padding if that was requested.
1269+
session->outgoing_buffers_.emplace_back(nghttp2_stream_write {
1270+
uv_buf_init(const_cast<char*>(zero_bytes_256), frame->data.padlen - 1)
1271+
});
1272+
}
1273+
1274+
return 0;
11361275
}
11371276

11381277
// Creates a new Http2Stream and submits a new http2 request.
@@ -1163,25 +1302,7 @@ WriteWrap* Http2Session::AllocateSend() {
11631302
Local<Object> obj =
11641303
env()->write_wrap_constructor_function()
11651304
->NewInstance(env()->context()).ToLocalChecked();
1166-
// Base the amount allocated on the remote peers max frame size
1167-
uint32_t size =
1168-
nghttp2_session_get_remote_settings(
1169-
session(),
1170-
NGHTTP2_SETTINGS_MAX_FRAME_SIZE);
1171-
// Max frame size + 9 bytes for the header
1172-
return WriteWrap::New(env(), obj, stream_, size + 9);
1173-
}
1174-
1175-
// Pushes chunks of data to the i/o stream.
1176-
void Http2Session::Send(WriteWrap* req, char* buf, size_t length) {
1177-
DEBUG_HTTP2SESSION2(this, "attempting to send %d bytes", length);
1178-
if (stream_ == nullptr)
1179-
return;
1180-
chunks_sent_since_last_write_++;
1181-
uv_buf_t actual = uv_buf_init(buf, length);
1182-
if (stream_->DoWrite(req, &actual, 1, nullptr)) {
1183-
req->Dispose();
1184-
}
1305+
return WriteWrap::New(env(), obj, stream_);
11851306
}
11861307

11871308
// Allocates the data buffer used to receive inbound data from the i/o stream
@@ -1255,6 +1376,7 @@ void Http2Session::Consume(Local<External> external) {
12551376
prev_read_cb_ = stream->read_cb();
12561377
stream->set_alloc_cb({ Http2Session::OnStreamAllocImpl, this });
12571378
stream->set_read_cb({ Http2Session::OnStreamReadImpl, this });
1379+
stream->set_after_write_cb({ Http2Session::OnStreamAfterWriteImpl, this });
12581380
stream->set_destruct_cb({ Http2Session::OnStreamDestructImpl, this });
12591381
DEBUG_HTTP2SESSION(this, "i/o stream consumed");
12601382
}
@@ -1422,9 +1544,9 @@ inline void Http2Stream::Destroy() {
14221544
// here because it's possible for destroy to have been called while
14231545
// we still have qeueued outbound writes.
14241546
while (!stream->queue_.empty()) {
1425-
nghttp2_stream_write* head = stream->queue_.front();
1426-
head->req_wrap->Done(UV_ECANCELED);
1427-
delete head;
1547+
nghttp2_stream_write& head = stream->queue_.front();
1548+
if (head.req_wrap != nullptr)
1549+
head.req_wrap->Done(UV_ECANCELED);
14281550
stream->queue_.pop();
14291551
}
14301552

@@ -1616,12 +1738,15 @@ inline int Http2Stream::DoWrite(WriteWrap* req_wrap,
16161738
return 0;
16171739
}
16181740
DEBUG_HTTP2STREAM2(this, "queuing %d buffers to send", id_, nbufs);
1619-
nghttp2_stream_write* item = new nghttp2_stream_write;
1620-
item->req_wrap = req_wrap;
1621-
item->nbufs = nbufs;
1622-
item->bufs.AllocateSufficientStorage(nbufs);
1623-
memcpy(*(item->bufs), bufs, nbufs * sizeof(*bufs));
1624-
queue_.push(item);
1741+
for (size_t i = 0; i < nbufs; ++i) {
1742+
// Store the req_wrap on the last write info in the queue, so that it is
1743+
// only marked as finished once all buffers associated with it are finished.
1744+
queue_.emplace(nghttp2_stream_write {
1745+
i == nbufs - 1 ? req_wrap : nullptr,
1746+
bufs[i]
1747+
});
1748+
available_outbound_length_ += bufs[i].len;
1749+
}
16251750
CHECK_NE(nghttp2_session_resume_data(**session_, id_), NGHTTP2_ERR_NOMEM);
16261751
return 0;
16271752
}
@@ -1655,18 +1780,6 @@ inline bool Http2Stream::AddHeader(nghttp2_rcbuf* name,
16551780
return true;
16561781
}
16571782

1658-
1659-
Http2Stream* GetStream(Http2Session* session,
1660-
int32_t id,
1661-
nghttp2_data_source* source) {
1662-
Http2Stream* stream = static_cast<Http2Stream*>(source->ptr);
1663-
if (stream == nullptr)
1664-
stream = session->FindStream(id);
1665-
CHECK_NE(stream, nullptr);
1666-
CHECK_EQ(id, stream->id());
1667-
return stream;
1668-
}
1669-
16701783
// A Provider is the thing that provides outbound DATA frame data.
16711784
Http2Stream::Provider::Provider(Http2Stream* stream, int options) {
16721785
CHECK(!stream->IsDestroyed());
@@ -1787,30 +1900,16 @@ ssize_t Http2Stream::Provider::Stream::OnRead(nghttp2_session* handle,
17871900

17881901
size_t amount = 0; // amount of data being sent in this data frame.
17891902

1790-
uv_buf_t current;
1791-
17921903
if (!stream->queue_.empty()) {
17931904
DEBUG_HTTP2SESSION2(session, "stream %d has pending outbound data", id);
1794-
nghttp2_stream_write* head = stream->queue_.front();
1795-
current = head->bufs[stream->queue_index_];
1796-
size_t clen = current.len - stream->queue_offset_;
1797-
amount = std::min(clen, length);
1905+
amount = std::min(stream->available_outbound_length_, length);
17981906
DEBUG_HTTP2SESSION2(session, "sending %d bytes for data frame on stream %d",
17991907
amount, id);
18001908
if (amount > 0) {
1801-
memcpy(buf, current.base + stream->queue_offset_, amount);
1802-
stream->queue_offset_ += amount;
1803-
}
1804-
if (stream->queue_offset_ == current.len) {
1805-
stream->queue_index_++;
1806-
stream->queue_offset_ = 0;
1807-
}
1808-
if (stream->queue_index_ == head->nbufs) {
1809-
head->req_wrap->Done(0);
1810-
delete head;
1811-
stream->queue_.pop();
1812-
stream->queue_offset_ = 0;
1813-
stream->queue_index_ = 0;
1909+
// Just return the length, let Http2Session::OnSendData take care of
1910+
// actually taking the buffers out of the queue.
1911+
*flags |= NGHTTP2_DATA_FLAG_NO_COPY;
1912+
stream->available_outbound_length_ -= amount;
18141913
}
18151914
}
18161915

‎src/node_http2.h

+23-6
Original file line numberDiff line numberDiff line change
@@ -126,9 +126,12 @@ enum nghttp2_stream_options {
126126
};
127127

128128
struct nghttp2_stream_write {
129-
unsigned int nbufs = 0;
130129
WriteWrap* req_wrap = nullptr;
131-
MaybeStackBuffer<uv_buf_t, MAX_BUFFER_COUNT> bufs;
130+
uv_buf_t buf;
131+
132+
inline explicit nghttp2_stream_write(uv_buf_t buf_) : buf(buf_) {}
133+
inline nghttp2_stream_write(WriteWrap* req, uv_buf_t buf_) :
134+
req_wrap(req), buf(buf_) {}
132135
};
133136

134137
struct nghttp2_header {
@@ -725,11 +728,12 @@ class Http2Stream : public AsyncWrap,
725728

726729
// Outbound Data... This is the data written by the JS layer that is
727730
// waiting to be written out to the socket.
728-
std::queue<nghttp2_stream_write*> queue_;
729-
unsigned int queue_index_ = 0;
730-
size_t queue_offset_ = 0;
731+
std::queue<nghttp2_stream_write> queue_;
732+
size_t available_outbound_length_ = 0;
731733
int64_t fd_offset_ = 0;
732734
int64_t fd_length_ = -1;
735+
736+
friend class Http2Session;
733737
};
734738

735739
class Http2Stream::Provider {
@@ -860,6 +864,7 @@ class Http2Session : public AsyncWrap {
860864
const uv_buf_t* bufs,
861865
uv_handle_type pending,
862866
void* ctx);
867+
static void OnStreamAfterWriteImpl(WriteWrap* w, int status, void* ctx);
863868
static void OnStreamDestructImpl(void* ctx);
864869

865870
// The JavaScript API
@@ -882,7 +887,6 @@ class Http2Session : public AsyncWrap {
882887
template <get_setting fn>
883888
static void GetSettings(const FunctionCallbackInfo<Value>& args);
884889

885-
void Send(WriteWrap* req, char* buf, size_t length);
886890
WriteWrap* AllocateSend();
887891

888892
uv_loop_t* event_loop() const {
@@ -957,6 +961,13 @@ class Http2Session : public AsyncWrap {
957961
const char* message,
958962
size_t len,
959963
void* user_data);
964+
static inline int OnSendData(
965+
nghttp2_session* session,
966+
nghttp2_frame* frame,
967+
const uint8_t* framehd,
968+
size_t length,
969+
nghttp2_data_source* source,
970+
void* user_data);
960971

961972

962973
static inline ssize_t OnStreamReadFD(
@@ -1015,6 +1026,12 @@ class Http2Session : public AsyncWrap {
10151026
size_t max_outstanding_pings_ = DEFAULT_MAX_PINGS;
10161027
std::queue<Http2Ping*> outstanding_pings_;
10171028

1029+
std::vector<nghttp2_stream_write> outgoing_buffers_;
1030+
std::vector<uint8_t> outgoing_storage_;
1031+
1032+
void CopyDataIntoOutgoing(const uint8_t* src, size_t src_length);
1033+
void ClearOutgoing(int status);
1034+
10181035
friend class Http2Scope;
10191036
};
10201037

0 commit comments

Comments
 (0)
Please sign in to comment.