Skip to content

Commit 3becb02

Browse files
mscdexjasnell
authored andcommitted
lib,src: improve writev() performance for Buffers
PR-URL: #13187 Reviewed-By: Benjamin Gruenbaum <[email protected]> Reviewed-By: Anna Henningsen <[email protected]>
1 parent 686e753 commit 3becb02

File tree

3 files changed

+114
-70
lines changed

3 files changed

+114
-70
lines changed

lib/_stream_writable.js

+12-1
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,7 @@ function writeOrBuffer(stream, state, isBuf, chunk, encoding, cb) {
320320
if (!isBuf) {
321321
var newChunk = decodeChunk(state, chunk, encoding);
322322
if (chunk !== newChunk) {
323+
isBuf = true;
323324
encoding = 'buffer';
324325
chunk = newChunk;
325326
}
@@ -335,7 +336,13 @@ function writeOrBuffer(stream, state, isBuf, chunk, encoding, cb) {
335336

336337
if (state.writing || state.corked) {
337338
var last = state.lastBufferedRequest;
338-
state.lastBufferedRequest = { chunk, encoding, callback: cb, next: null };
339+
state.lastBufferedRequest = {
340+
chunk,
341+
encoding,
342+
isBuf,
343+
callback: cb,
344+
next: null
345+
};
339346
if (last) {
340347
last.next = state.lastBufferedRequest;
341348
} else {
@@ -438,11 +445,15 @@ function clearBuffer(stream, state) {
438445
holder.entry = entry;
439446

440447
var count = 0;
448+
var allBuffers = true;
441449
while (entry) {
442450
buffer[count] = entry;
451+
if (!entry.isBuf)
452+
allBuffers = false;
443453
entry = entry.next;
444454
count += 1;
445455
}
456+
buffer.allBuffers = allBuffers;
446457

447458
doWrite(stream, state, true, state.length, buffer, '', holder.finish);
448459

lib/net.js

+15-6
Original file line numberDiff line numberDiff line change
@@ -726,13 +726,22 @@ Socket.prototype._writeGeneric = function(writev, data, encoding, cb) {
726726
var err;
727727

728728
if (writev) {
729-
var chunks = new Array(data.length << 1);
730-
for (var i = 0; i < data.length; i++) {
731-
var entry = data[i];
732-
chunks[i * 2] = entry.chunk;
733-
chunks[i * 2 + 1] = entry.encoding;
729+
var allBuffers = data.allBuffers;
730+
var chunks;
731+
var i;
732+
if (allBuffers) {
733+
chunks = data;
734+
for (i = 0; i < data.length; i++)
735+
data[i] = data[i].chunk;
736+
} else {
737+
chunks = new Array(data.length << 1);
738+
for (i = 0; i < data.length; i++) {
739+
var entry = data[i];
740+
chunks[i * 2] = entry.chunk;
741+
chunks[i * 2 + 1] = entry.encoding;
742+
}
734743
}
735-
err = this._handle.writev(req, chunks);
744+
err = this._handle.writev(req, chunks, allBuffers);
736745

737746
// Retain chunks
738747
if (err === 0) req._chunks = chunks;

src/stream_base.cc

+87-63
Original file line numberDiff line numberDiff line change
@@ -100,92 +100,116 @@ int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
100100

101101
Local<Object> req_wrap_obj = args[0].As<Object>();
102102
Local<Array> chunks = args[1].As<Array>();
103+
bool all_buffers = args[2]->IsTrue();
103104

104-
size_t count = chunks->Length() >> 1;
105+
size_t count;
106+
if (all_buffers)
107+
count = chunks->Length();
108+
else
109+
count = chunks->Length() >> 1;
105110

106111
MaybeStackBuffer<uv_buf_t, 16> bufs(count);
112+
uv_buf_t* buf_list = *bufs;
107113

108-
// Determine storage size first
109114
size_t storage_size = 0;
110-
for (size_t i = 0; i < count; i++) {
111-
storage_size = ROUND_UP(storage_size, WriteWrap::kAlignSize);
112-
113-
Local<Value> chunk = chunks->Get(i * 2);
114-
115-
if (Buffer::HasInstance(chunk))
116-
continue;
117-
// Buffer chunk, no additional storage required
118-
119-
// String chunk
120-
Local<String> string = chunk->ToString(env->isolate());
121-
enum encoding encoding = ParseEncoding(env->isolate(),
122-
chunks->Get(i * 2 + 1));
123-
size_t chunk_size;
124-
if (encoding == UTF8 && string->Length() > 65535)
125-
chunk_size = StringBytes::Size(env->isolate(), string, encoding);
126-
else
127-
chunk_size = StringBytes::StorageSize(env->isolate(), string, encoding);
128-
129-
storage_size += chunk_size;
130-
}
115+
uint32_t bytes = 0;
116+
size_t offset;
117+
AsyncWrap* wrap;
118+
WriteWrap* req_wrap;
119+
int err;
131120

132-
if (storage_size > INT_MAX)
133-
return UV_ENOBUFS;
121+
if (!all_buffers) {
122+
// Determine storage size first
123+
for (size_t i = 0; i < count; i++) {
124+
storage_size = ROUND_UP(storage_size, WriteWrap::kAlignSize);
134125

135-
AsyncWrap* wrap = GetAsyncWrap();
136-
CHECK_NE(wrap, nullptr);
137-
env->set_init_trigger_id(wrap->get_id());
138-
WriteWrap* req_wrap = WriteWrap::New(env,
139-
req_wrap_obj,
140-
this,
141-
AfterWrite,
142-
storage_size);
126+
Local<Value> chunk = chunks->Get(i * 2);
143127

144-
uint32_t bytes = 0;
145-
size_t offset = 0;
146-
for (size_t i = 0; i < count; i++) {
147-
Local<Value> chunk = chunks->Get(i * 2);
128+
if (Buffer::HasInstance(chunk))
129+
continue;
130+
// Buffer chunk, no additional storage required
131+
132+
// String chunk
133+
Local<String> string = chunk->ToString(env->isolate());
134+
enum encoding encoding = ParseEncoding(env->isolate(),
135+
chunks->Get(i * 2 + 1));
136+
size_t chunk_size;
137+
if (encoding == UTF8 && string->Length() > 65535)
138+
chunk_size = StringBytes::Size(env->isolate(), string, encoding);
139+
else
140+
chunk_size = StringBytes::StorageSize(env->isolate(), string, encoding);
148141

149-
// Write buffer
150-
if (Buffer::HasInstance(chunk)) {
142+
storage_size += chunk_size;
143+
}
144+
145+
if (storage_size > INT_MAX)
146+
return UV_ENOBUFS;
147+
} else {
148+
for (size_t i = 0; i < count; i++) {
149+
Local<Value> chunk = chunks->Get(i);
151150
bufs[i].base = Buffer::Data(chunk);
152151
bufs[i].len = Buffer::Length(chunk);
153152
bytes += bufs[i].len;
154-
continue;
155153
}
156154

157-
// Write string
158-
offset = ROUND_UP(offset, WriteWrap::kAlignSize);
159-
CHECK_LE(offset, storage_size);
160-
char* str_storage = req_wrap->Extra(offset);
161-
size_t str_size = storage_size - offset;
162-
163-
Local<String> string = chunk->ToString(env->isolate());
164-
enum encoding encoding = ParseEncoding(env->isolate(),
165-
chunks->Get(i * 2 + 1));
166-
str_size = StringBytes::Write(env->isolate(),
167-
str_storage,
168-
str_size,
169-
string,
170-
encoding);
171-
bufs[i].base = str_storage;
172-
bufs[i].len = str_size;
173-
offset += str_size;
174-
bytes += str_size;
155+
// Try writing immediately without allocation
156+
err = DoTryWrite(&buf_list, &count);
157+
if (err != 0 || count == 0)
158+
goto done;
175159
}
176160

177-
int err = DoWrite(req_wrap, *bufs, count, nullptr);
161+
wrap = GetAsyncWrap();
162+
CHECK_NE(wrap, nullptr);
163+
env->set_init_trigger_id(wrap->get_id());
164+
req_wrap = WriteWrap::New(env, req_wrap_obj, this, AfterWrite, storage_size);
178165

166+
offset = 0;
167+
if (!all_buffers) {
168+
for (size_t i = 0; i < count; i++) {
169+
Local<Value> chunk = chunks->Get(i * 2);
170+
171+
// Write buffer
172+
if (Buffer::HasInstance(chunk)) {
173+
bufs[i].base = Buffer::Data(chunk);
174+
bufs[i].len = Buffer::Length(chunk);
175+
bytes += bufs[i].len;
176+
continue;
177+
}
178+
179+
// Write string
180+
offset = ROUND_UP(offset, WriteWrap::kAlignSize);
181+
CHECK_LE(offset, storage_size);
182+
char* str_storage = req_wrap->Extra(offset);
183+
size_t str_size = storage_size - offset;
184+
185+
Local<String> string = chunk->ToString(env->isolate());
186+
enum encoding encoding = ParseEncoding(env->isolate(),
187+
chunks->Get(i * 2 + 1));
188+
str_size = StringBytes::Write(env->isolate(),
189+
str_storage,
190+
str_size,
191+
string,
192+
encoding);
193+
bufs[i].base = str_storage;
194+
bufs[i].len = str_size;
195+
offset += str_size;
196+
bytes += str_size;
197+
}
198+
}
199+
200+
err = DoWrite(req_wrap, buf_list, count, nullptr);
179201
req_wrap_obj->Set(env->async(), True(env->isolate()));
180-
req_wrap_obj->Set(env->bytes_string(), Number::New(env->isolate(), bytes));
202+
203+
if (err)
204+
req_wrap->Dispose();
205+
206+
done:
181207
const char* msg = Error();
182208
if (msg != nullptr) {
183209
req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
184210
ClearError();
185211
}
186-
187-
if (err)
188-
req_wrap->Dispose();
212+
req_wrap_obj->Set(env->bytes_string(), Number::New(env->isolate(), bytes));
189213

190214
return err;
191215
}

0 commit comments

Comments
 (0)