Skip to content

Commit 48ed81f

Browse files
addaleaxtargos
authored andcommitted
src: improve StreamBase read throughput
Improve performance by providing JS with the raw ingridients for the read data, i.e. an `ArrayBuffer` + offset + length fields, instead of creating `Buffer` instances in C++ land. PR-URL: #23797 Reviewed-By: Tiancheng "Timothy" Gu <[email protected]> Reviewed-By: James M Snell <[email protected]>
1 parent 1cda41b commit 48ed81f

16 files changed

+115
-44
lines changed

benchmark/net/tcp-raw-c2s.js

+4-4
Original file line numberDiff line numberDiff line change
@@ -46,15 +46,15 @@ function main({ dur, len, type }) {
4646
process.exit(0);
4747
}, dur * 1000);
4848

49-
clientHandle.onread = function(nread, buffer) {
49+
clientHandle.onread = function(buffer) {
5050
// we're not expecting to ever get an EOF from the client.
5151
// just lots of data forever.
52-
if (nread < 0)
53-
fail(nread, 'read');
52+
if (!buffer)
53+
fail('read');
5454

5555
// don't slice the buffer. the point of this is to isolate, not
5656
// simulate real traffic.
57-
bytes += buffer.length;
57+
bytes += buffer.byteLength;
5858
};
5959

6060
clientHandle.readStart();

benchmark/net/tcp-raw-pipe.js

+8-8
Original file line numberDiff line numberDiff line change
@@ -43,15 +43,15 @@ function main({ dur, len, type }) {
4343
if (err)
4444
fail(err, 'connect');
4545

46-
clientHandle.onread = function(nread, buffer) {
46+
clientHandle.onread = function(buffer) {
4747
// we're not expecting to ever get an EOF from the client.
4848
// just lots of data forever.
49-
if (nread < 0)
50-
fail(nread, 'read');
49+
if (!buffer)
50+
fail('read');
5151

5252
const writeReq = new WriteWrap();
5353
writeReq.async = false;
54-
err = clientHandle.writeBuffer(writeReq, buffer);
54+
err = clientHandle.writeBuffer(writeReq, Buffer.from(buffer));
5555

5656
if (err)
5757
fail(err, 'write');
@@ -89,11 +89,11 @@ function main({ dur, len, type }) {
8989
if (err)
9090
fail(err, 'connect');
9191

92-
clientHandle.onread = function(nread, buffer) {
93-
if (nread < 0)
94-
fail(nread, 'read');
92+
clientHandle.onread = function(buffer) {
93+
if (!buffer)
94+
fail('read');
9595

96-
bytes += buffer.length;
96+
bytes += buffer.byteLength;
9797
};
9898

9999
connectReq.oncomplete = function(err) {

benchmark/net/tcp-raw-s2c.js

+4-4
Original file line numberDiff line numberDiff line change
@@ -109,15 +109,15 @@ function main({ dur, len, type }) {
109109

110110
connectReq.oncomplete = function() {
111111
var bytes = 0;
112-
clientHandle.onread = function(nread, buffer) {
112+
clientHandle.onread = function(buffer) {
113113
// we're not expecting to ever get an EOF from the client.
114114
// just lots of data forever.
115-
if (nread < 0)
116-
fail(nread, 'read');
115+
if (!buffer)
116+
fail('read');
117117

118118
// don't slice the buffer. the point of this is to isolate, not
119119
// simulate real traffic.
120-
bytes += buffer.length;
120+
bytes += buffer.byteLength;
121121
};
122122

123123
clientHandle.readStart();

lib/internal/child_process.js

+11-4
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,12 @@ const util = require('util');
2222
const assert = require('assert');
2323

2424
const { Process } = internalBinding('process_wrap');
25-
const { WriteWrap } = internalBinding('stream_wrap');
25+
const {
26+
WriteWrap,
27+
kReadBytesOrError,
28+
kArrayBufferOffset,
29+
streamBaseState
30+
} = internalBinding('stream_wrap');
2631
const { Pipe, constants: PipeConstants } = internalBinding('pipe_wrap');
2732
const { TCP } = internalBinding('tcp_wrap');
2833
const { TTY } = internalBinding('tty_wrap');
@@ -486,11 +491,13 @@ function setupChannel(target, channel) {
486491
var pendingHandle = null;
487492
channel.buffering = false;
488493
channel.pendingHandle = null;
489-
channel.onread = function(nread, pool) {
494+
channel.onread = function(arrayBuffer) {
490495
const recvHandle = channel.pendingHandle;
491496
channel.pendingHandle = null;
492-
// TODO(bnoordhuis) Check that nread > 0.
493-
if (pool) {
497+
if (arrayBuffer) {
498+
const nread = streamBaseState[kReadBytesOrError];
499+
const offset = streamBaseState[kArrayBufferOffset];
500+
const pool = new Uint8Array(arrayBuffer, offset, nread);
494501
if (recvHandle)
495502
pendingHandle = recvHandle;
496503

lib/internal/http2/core.js

+7-2
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,11 @@ const { isArrayBufferView } = require('internal/util/types');
120120

121121
const { FileHandle } = process.binding('fs');
122122
const binding = internalBinding('http2');
123-
const { ShutdownWrap } = internalBinding('stream_wrap');
123+
const {
124+
ShutdownWrap,
125+
kReadBytesOrError,
126+
streamBaseState
127+
} = internalBinding('stream_wrap');
124128
const { UV_EOF } = internalBinding('uv');
125129

126130
const { StreamPipe } = internalBinding('stream_pipe');
@@ -2043,7 +2047,8 @@ function onFileUnpipe() {
20432047

20442048
// This is only called once the pipe has returned back control, so
20452049
// it only has to handle errors and End-of-File.
2046-
function onPipedFileHandleRead(err) {
2050+
function onPipedFileHandleRead() {
2051+
const err = streamBaseState[kReadBytesOrError];
20472052
if (err < 0 && err !== UV_EOF) {
20482053
this.stream.close(NGHTTP2_INTERNAL_ERROR);
20492054
}

lib/internal/stream_base_commons.js

+12-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,13 @@
11
'use strict';
22

33
const { Buffer } = require('buffer');
4-
const { WriteWrap } = internalBinding('stream_wrap');
4+
const { FastBuffer } = require('internal/buffer');
5+
const {
6+
WriteWrap,
7+
kReadBytesOrError,
8+
kArrayBufferOffset,
9+
streamBaseState
10+
} = internalBinding('stream_wrap');
511
const { UV_EOF } = internalBinding('uv');
612
const { errnoException } = require('internal/errors');
713
const { owner_symbol } = require('internal/async_hooks').symbols;
@@ -84,13 +90,17 @@ function afterWriteDispatched(self, req, err, cb) {
8490
}
8591
}
8692

87-
function onStreamRead(nread, buf) {
93+
function onStreamRead(arrayBuffer) {
94+
const nread = streamBaseState[kReadBytesOrError];
95+
8896
const handle = this;
8997
const stream = this[owner_symbol];
9098

9199
stream[kUpdateTimer]();
92100

93101
if (nread > 0 && !stream.destroyed) {
102+
const offset = streamBaseState[kArrayBufferOffset];
103+
const buf = new FastBuffer(arrayBuffer, offset, nread);
94104
if (!stream.push(buf)) {
95105
handle.reading = false;
96106
if (!stream.destroyed) {

src/env-inl.h

+5
Original file line numberDiff line numberDiff line change
@@ -446,6 +446,11 @@ Environment::trace_category_state() {
446446
return trace_category_state_;
447447
}
448448

449+
inline AliasedBuffer<int32_t, v8::Int32Array>&
450+
Environment::stream_base_state() {
451+
return stream_base_state_;
452+
}
453+
449454
inline uint32_t Environment::get_next_module_id() {
450455
return module_id_counter_++;
451456
}

src/env.cc

+1
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@ Environment::Environment(IsolateData* isolate_data,
158158
makecallback_cntr_(0),
159159
should_abort_on_uncaught_toggle_(isolate_, 1),
160160
trace_category_state_(isolate_, kTraceCategoryCount),
161+
stream_base_state_(isolate_, StreamBase::kNumStreamBaseStateFields),
161162
http_parser_buffer_(nullptr),
162163
fs_stats_field_array_(isolate_, kFsStatsFieldsLength * 2),
163164
fs_stats_field_bigint_array_(isolate_, kFsStatsFieldsLength * 2),

src/env.h

+3
Original file line numberDiff line numberDiff line change
@@ -668,6 +668,7 @@ class Environment {
668668
should_abort_on_uncaught_toggle();
669669

670670
inline AliasedBuffer<uint8_t, v8::Uint8Array>& trace_category_state();
671+
inline AliasedBuffer<int32_t, v8::Int32Array>& stream_base_state();
671672

672673
// The necessary API for async_hooks.
673674
inline double new_async_id();
@@ -951,6 +952,8 @@ class Environment {
951952
AliasedBuffer<uint8_t, v8::Uint8Array> trace_category_state_;
952953
std::unique_ptr<TrackingTraceStateObserver> trace_state_observer_;
953954

955+
AliasedBuffer<int32_t, v8::Int32Array> stream_base_state_;
956+
954957
std::unique_ptr<performance::performance_state> performance_state_;
955958
std::unordered_map<std::string, uint64_t> performance_marks_;
956959

src/node_http2.cc

+1-4
Original file line numberDiff line numberDiff line change
@@ -1256,10 +1256,7 @@ void Http2StreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
12561256
CHECK_LE(offset, session->stream_buf_.len);
12571257
CHECK_LE(offset + buf.len, session->stream_buf_.len);
12581258

1259-
Local<Object> buffer =
1260-
Buffer::New(env, session->stream_buf_ab_, offset, nread).ToLocalChecked();
1261-
1262-
stream->CallJSOnreadMethod(nread, buffer);
1259+
stream->CallJSOnreadMethod(nread, session->stream_buf_ab_, offset);
12631260
}
12641261

12651262

src/stream_base.cc

+25-8
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
namespace node {
1818

1919
using v8::Array;
20+
using v8::ArrayBuffer;
2021
using v8::Boolean;
2122
using v8::Context;
2223
using v8::FunctionCallbackInfo;
@@ -303,17 +304,29 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
303304
}
304305

305306

306-
void StreamBase::CallJSOnreadMethod(ssize_t nread, Local<Object> buf) {
307+
void StreamBase::CallJSOnreadMethod(ssize_t nread,
308+
Local<ArrayBuffer> ab,
309+
size_t offset) {
307310
Environment* env = env_;
308311

312+
#ifdef DEBUG
313+
CHECK_EQ(static_cast<int32_t>(nread), nread);
314+
CHECK_EQ(static_cast<int32_t>(offset), offset);
315+
316+
if (ab.IsEmpty()) {
317+
CHECK_EQ(offset, 0);
318+
CHECK_LE(nread, 0);
319+
} else {
320+
CHECK_GE(nread, 0);
321+
}
322+
#endif
323+
env->stream_base_state()[kReadBytesOrError] = nread;
324+
env->stream_base_state()[kArrayBufferOffset] = offset;
325+
309326
Local<Value> argv[] = {
310-
Integer::New(env->isolate(), nread),
311-
buf
327+
ab.IsEmpty() ? Undefined(env->isolate()).As<Value>() : ab.As<Value>()
312328
};
313329

314-
if (argv[1].IsEmpty())
315-
argv[1] = Undefined(env->isolate());
316-
317330
AsyncWrap* wrap = GetAsyncWrap();
318331
CHECK_NOT_NULL(wrap);
319332
wrap->MakeCallback(env->onread_string(), arraysize(argv), argv);
@@ -366,14 +379,18 @@ void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
366379
if (nread <= 0) {
367380
free(buf.base);
368381
if (nread < 0)
369-
stream->CallJSOnreadMethod(nread, Local<Object>());
382+
stream->CallJSOnreadMethod(nread, Local<ArrayBuffer>());
370383
return;
371384
}
372385

373386
CHECK_LE(static_cast<size_t>(nread), buf.len);
374387
char* base = Realloc(buf.base, nread);
375388

376-
Local<Object> obj = Buffer::New(env, base, nread).ToLocalChecked();
389+
Local<ArrayBuffer> obj = ArrayBuffer::New(
390+
env->isolate(),
391+
base,
392+
nread,
393+
v8::ArrayBufferCreationMode::kInternalized); // Transfer ownership to V8.
377394
stream->CallJSOnreadMethod(nread, obj);
378395
}
379396

src/stream_base.h

+11-1
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,9 @@ class StreamBase : public StreamResource {
264264
virtual bool IsIPCPipe();
265265
virtual int GetFD();
266266

267-
void CallJSOnreadMethod(ssize_t nread, v8::Local<v8::Object> buf);
267+
void CallJSOnreadMethod(ssize_t nread,
268+
v8::Local<v8::ArrayBuffer> ab,
269+
size_t offset = 0);
268270

269271
// This is named `stream_env` to avoid name clashes, because a lot of
270272
// subclasses are also `BaseObject`s.
@@ -326,12 +328,20 @@ class StreamBase : public StreamResource {
326328
const v8::FunctionCallbackInfo<v8::Value>& args)>
327329
static void JSMethod(const v8::FunctionCallbackInfo<v8::Value>& args);
328330

331+
// Internal, used only in StreamBase methods + env.cc.
332+
enum StreamBaseStateFields {
333+
kReadBytesOrError,
334+
kArrayBufferOffset,
335+
kNumStreamBaseStateFields
336+
};
337+
329338
private:
330339
Environment* env_;
331340
EmitToJSStreamListener default_listener_;
332341

333342
friend class WriteWrap;
334343
friend class ShutdownWrap;
344+
friend class Environment; // For kNumStreamBaseStateFields.
335345
};
336346

337347

src/stream_wrap.cc

+5
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,11 @@ void LibuvStreamWrap::Initialize(Local<Object> target,
8080
target->Set(writeWrapString,
8181
ww->GetFunction(env->context()).ToLocalChecked());
8282
env->set_write_wrap_template(ww->InstanceTemplate());
83+
84+
NODE_DEFINE_CONSTANT(target, kReadBytesOrError);
85+
NODE_DEFINE_CONSTANT(target, kArrayBufferOffset);
86+
target->Set(context, FIXED_ONE_BYTE_STRING(env->isolate(), "streamBaseState"),
87+
env->stream_base_state().GetJSArray()).FromJust();
8388
}
8489

8590

test/parallel/test-net-end-close.js

+5-1
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,15 @@ const net = require('net');
66

77
const { internalBinding } = require('internal/test/binding');
88
const { UV_EOF } = internalBinding('uv');
9+
const { streamBaseState, kReadBytesOrError } = internalBinding('stream_wrap');
910

1011
const s = new net.Socket({
1112
handle: {
1213
readStart: function() {
13-
setImmediate(() => this.onread(UV_EOF, null));
14+
setImmediate(() => {
15+
streamBaseState[kReadBytesOrError] = UV_EOF;
16+
this.onread();
17+
});
1418
},
1519
close: (cb) => setImmediate(cb)
1620
},

test/parallel/test-process-wrap.js

+2-3
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,10 @@ p.onexit = function(exitCode, signal) {
4444
processExited = true;
4545
};
4646

47-
pipe.onread = function(err, b, off, len) {
47+
pipe.onread = function(arrayBuffer) {
4848
assert.ok(processExited);
49-
if (b) {
49+
if (arrayBuffer) {
5050
gotPipeData = true;
51-
console.log('read %d', len);
5251
} else {
5352
gotPipeEOF = true;
5453
pipe.close();

test/parallel/test-tcp-wrap-listen.js

+11-3
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,12 @@ const assert = require('assert');
55

66
const { internalBinding } = require('internal/test/binding');
77
const { TCP, constants: TCPConstants } = internalBinding('tcp_wrap');
8-
const { WriteWrap } = internalBinding('stream_wrap');
8+
const {
9+
WriteWrap,
10+
kReadBytesOrError,
11+
kArrayBufferOffset,
12+
streamBaseState
13+
} = internalBinding('stream_wrap');
914

1015
const server = new TCP(TCPConstants.SOCKET);
1116

@@ -30,8 +35,11 @@ server.onconnection = (err, client) => {
3035

3136
client.readStart();
3237
client.pendingWrites = [];
33-
client.onread = common.mustCall((err, buffer) => {
34-
if (buffer) {
38+
client.onread = common.mustCall((arrayBuffer) => {
39+
if (arrayBuffer) {
40+
const offset = streamBaseState[kArrayBufferOffset];
41+
const nread = streamBaseState[kReadBytesOrError];
42+
const buffer = Buffer.from(arrayBuffer, offset, nread);
3543
assert.ok(buffer.length > 0);
3644

3745
assert.strictEqual(client.writeQueueSize, 0);

0 commit comments

Comments
 (0)