Skip to content

Commit 4a79b25

Browse files
addaleaxtargos
authored andcommitted
src: improve StreamBase write throughput
Improve performance by transferring information about write status to JS through an `AliasedBuffer`, rather than object properties set from C++. PR-URL: #23843 Reviewed-By: James M Snell <[email protected]> Reviewed-By: Anatoli Papirovski <[email protected]>
1 parent 22bbece commit 4a79b25

11 files changed

+41
-37
lines changed

benchmark/net/net-c2s.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ const net = require('net');
66
const PORT = common.PORT;
77

88
const bench = common.createBenchmark(main, {
9-
len: [102400, 1024 * 1024 * 16],
9+
len: [64, 102400, 1024 * 1024 * 16],
1010
type: ['utf', 'asc', 'buf'],
1111
dur: [5],
1212
});

benchmark/net/net-pipe.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ const net = require('net');
66
const PORT = common.PORT;
77

88
const bench = common.createBenchmark(main, {
9-
len: [102400, 1024 * 1024 * 16],
9+
len: [64, 102400, 1024 * 1024 * 16],
1010
type: ['utf', 'asc', 'buf'],
1111
dur: [5],
1212
});

benchmark/net/net-s2c.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ const common = require('../common.js');
55
const PORT = common.PORT;
66

77
const bench = common.createBenchmark(main, {
8-
len: [102400, 1024 * 1024 * 16],
8+
len: [64, 102400, 1024 * 1024 * 16],
99
type: ['utf', 'asc', 'buf'],
1010
dur: [5]
1111
});

benchmark/net/net-wrap-js-stream-passthrough.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ const common = require('../common.js');
55
const { PassThrough } = require('stream');
66

77
const bench = common.createBenchmark(main, {
8-
len: [102400, 1024 * 1024 * 16],
8+
len: [64, 102400, 1024 * 1024 * 16],
99
type: ['utf', 'asc', 'buf'],
1010
dur: [5],
1111
}, {

lib/internal/child_process.js

+3-2
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ const {
2626
WriteWrap,
2727
kReadBytesOrError,
2828
kArrayBufferOffset,
29+
kLastWriteWasAsync,
2930
streamBaseState
3031
} = internalBinding('stream_wrap');
3132
const { Pipe, constants: PipeConstants } = internalBinding('pipe_wrap');
@@ -717,10 +718,10 @@ function setupChannel(target, channel) {
717718
}
718719

719720
var req = new WriteWrap();
720-
req.async = false;
721721

722722
var string = JSON.stringify(message) + '\n';
723723
var err = channel.writeUtf8String(req, string, handle);
724+
var wasAsyncWrite = streamBaseState[kLastWriteWasAsync];
724725

725726
if (err === 0) {
726727
if (handle) {
@@ -730,7 +731,7 @@ function setupChannel(target, channel) {
730731
obj.postSend(message, handle, options, callback, target);
731732
}
732733

733-
if (req.async) {
734+
if (wasAsyncWrite) {
734735
req.oncomplete = function() {
735736
control.unref();
736737
if (typeof callback === 'function')

lib/internal/stream_base_commons.js

+20-2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ const {
66
WriteWrap,
77
kReadBytesOrError,
88
kArrayBufferOffset,
9+
kBytesWritten,
10+
kLastWriteWasAsync,
911
streamBaseState
1012
} = internalBinding('stream_wrap');
1113
const { UV_EOF } = internalBinding('uv');
@@ -20,7 +22,12 @@ function handleWriteReq(req, data, encoding) {
2022

2123
switch (encoding) {
2224
case 'buffer':
23-
return handle.writeBuffer(req, data);
25+
{
26+
const ret = handle.writeBuffer(req, data);
27+
if (streamBaseState[kLastWriteWasAsync])
28+
req.buffer = data;
29+
return ret;
30+
}
2431
case 'latin1':
2532
case 'binary':
2633
return handle.writeLatin1String(req, data);
@@ -35,7 +42,13 @@ function handleWriteReq(req, data, encoding) {
3542
case 'utf-16le':
3643
return handle.writeUcs2String(req, data);
3744
default:
38-
return handle.writeBuffer(req, Buffer.from(data, encoding));
45+
{
46+
const buffer = Buffer.from(data, encoding);
47+
const ret = handle.writeBuffer(req, buffer);
48+
if (streamBaseState[kLastWriteWasAsync])
49+
req.buffer = buffer;
50+
return ret;
51+
}
3952
}
4053
}
4154

@@ -45,6 +58,8 @@ function createWriteWrap(handle, oncomplete) {
4558
req.handle = handle;
4659
req.oncomplete = oncomplete;
4760
req.async = false;
61+
req.bytes = 0;
62+
req.buffer = null;
4863

4964
return req;
5065
}
@@ -80,6 +95,9 @@ function writeGeneric(self, req, data, encoding, cb) {
8095
}
8196

8297
function afterWriteDispatched(self, req, err, cb) {
98+
req.bytes = streamBaseState[kBytesWritten];
99+
req.async = !!streamBaseState[kLastWriteWasAsync];
100+
83101
if (err !== 0)
84102
return self.destroy(errnoException(err, 'write', req.error), cb);
85103

src/env.h

-2
Original file line numberDiff line numberDiff line change
@@ -125,10 +125,8 @@ struct PackageConfig {
125125
V(address_string, "address") \
126126
V(aliases_string, "aliases") \
127127
V(args_string, "args") \
128-
V(async, "async") \
129128
V(async_ids_stack_string, "async_ids_stack") \
130129
V(buffer_string, "buffer") \
131-
V(bytes_string, "bytes") \
132130
V(bytes_parsed_string, "bytesParsed") \
133131
V(bytes_read_string, "bytesRead") \
134132
V(bytes_written_string, "bytesWritten") \

src/stream_base.cc

+7-26
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,11 @@ namespace node {
1818

1919
using v8::Array;
2020
using v8::ArrayBuffer;
21-
using v8::Boolean;
2221
using v8::Context;
2322
using v8::FunctionCallbackInfo;
2423
using v8::HandleScope;
2524
using v8::Integer;
2625
using v8::Local;
27-
using v8::Number;
2826
using v8::Object;
2927
using v8::String;
3028
using v8::Value;
@@ -56,18 +54,9 @@ int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) {
5654
return Shutdown(req_wrap_obj);
5755
}
5856

59-
inline void SetWriteResultPropertiesOnWrapObject(
60-
Environment* env,
61-
Local<Object> req_wrap_obj,
62-
const StreamWriteResult& res) {
63-
req_wrap_obj->Set(
64-
env->context(),
65-
env->bytes_string(),
66-
Number::New(env->isolate(), res.bytes)).FromJust();
67-
req_wrap_obj->Set(
68-
env->context(),
69-
env->async(),
70-
Boolean::New(env->isolate(), res.async)).FromJust();
57+
void StreamBase::SetWriteResult(const StreamWriteResult& res) {
58+
env_->stream_base_state()[kBytesWritten] = res.bytes;
59+
env_->stream_base_state()[kLastWriteWasAsync] = res.async;
7160
}
7261

7362
int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
@@ -160,7 +149,7 @@ int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
160149
}
161150

162151
StreamWriteResult res = Write(*bufs, count, nullptr, req_wrap_obj);
163-
SetWriteResultPropertiesOnWrapObject(env, req_wrap_obj, res);
152+
SetWriteResult(res);
164153
if (res.wrap != nullptr && storage_size > 0) {
165154
res.wrap->SetAllocatedStorage(storage.release(), storage_size);
166155
}
@@ -185,10 +174,7 @@ int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
185174
buf.len = Buffer::Length(args[1]);
186175

187176
StreamWriteResult res = Write(&buf, 1, nullptr, req_wrap_obj);
188-
189-
if (res.async)
190-
req_wrap_obj->Set(env->context(), env->buffer_string(), args[1]).FromJust();
191-
SetWriteResultPropertiesOnWrapObject(env, req_wrap_obj, res);
177+
SetWriteResult(res);
192178

193179
return res.err;
194180
}
@@ -247,12 +233,7 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
247233

248234
// Immediate failure or success
249235
if (err != 0 || count == 0) {
250-
req_wrap_obj->Set(env->context(), env->async(), False(env->isolate()))
251-
.FromJust();
252-
req_wrap_obj->Set(env->context(),
253-
env->bytes_string(),
254-
Integer::NewFromUnsigned(env->isolate(), data_size))
255-
.FromJust();
236+
SetWriteResult(StreamWriteResult { false, err, nullptr, data_size });
256237
return err;
257238
}
258239

@@ -295,7 +276,7 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
295276
StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj);
296277
res.bytes += synchronously_written;
297278

298-
SetWriteResultPropertiesOnWrapObject(env, req_wrap_obj, res);
279+
SetWriteResult(res);
299280
if (res.wrap != nullptr) {
300281
res.wrap->SetAllocatedStorage(data.release(), data_size);
301282
}

src/stream_base.h

+4
Original file line numberDiff line numberDiff line change
@@ -332,13 +332,17 @@ class StreamBase : public StreamResource {
332332
enum StreamBaseStateFields {
333333
kReadBytesOrError,
334334
kArrayBufferOffset,
335+
kBytesWritten,
336+
kLastWriteWasAsync,
335337
kNumStreamBaseStateFields
336338
};
337339

338340
private:
339341
Environment* env_;
340342
EmitToJSStreamListener default_listener_;
341343

344+
void SetWriteResult(const StreamWriteResult& res);
345+
342346
friend class WriteWrap;
343347
friend class ShutdownWrap;
344348
friend class Environment; // For kNumStreamBaseStateFields.

src/stream_wrap.cc

+2
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ void LibuvStreamWrap::Initialize(Local<Object> target,
8383

8484
NODE_DEFINE_CONSTANT(target, kReadBytesOrError);
8585
NODE_DEFINE_CONSTANT(target, kArrayBufferOffset);
86+
NODE_DEFINE_CONSTANT(target, kBytesWritten);
87+
NODE_DEFINE_CONSTANT(target, kLastWriteWasAsync);
8688
target->Set(context, FIXED_ONE_BYTE_STRING(env->isolate(), "streamBaseState"),
8789
env->stream_base_state().GetJSArray()).FromJust();
8890
}

test/sequential/test-async-wrap-getasyncid.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ if (common.hasCrypto) { // eslint-disable-line node-core/crypto-check
239239
const err = handle.writeLatin1String(wreq, 'hi'.repeat(100000));
240240
if (err)
241241
throw new Error(`write failed: ${getSystemErrorName(err)}`);
242-
if (!wreq.async) {
242+
if (!stream_wrap.streamBaseState[stream_wrap.kLastWriteWasAsync]) {
243243
testUninitialized(wreq, 'WriteWrap');
244244
// Synchronous finish. Write more data until we hit an
245245
// asynchronous write.

0 commit comments

Comments
 (0)