Skip to content

Commit 55319fe

Browse files
indutnyMyles Borins
authored and
Myles Borins
committed
stream_base: expose bytesRead getter
This will provide `bytesRead` data on consumed sockets. Fix: #3021 PR-URL: #6284 Reviewed-By: Ben Noordhuis <[email protected]>
1 parent e3f78eb commit 55319fe

File tree

5 files changed

+79
-7
lines changed

5 files changed

+79
-7
lines changed

lib/net.js

+13-5
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,6 @@ exports._normalizeConnectArgs = normalizeConnectArgs;
9797
// called when creating new Socket, or when re-using a closed Socket
9898
function initSocketHandle(self) {
9999
self.destroyed = false;
100-
self.bytesRead = 0;
101100
self._bytesDispatched = 0;
102101
self._sockname = null;
103102

@@ -112,6 +111,10 @@ function initSocketHandle(self) {
112111
}
113112
}
114113

114+
115+
const BYTES_READ = Symbol('bytesRead');
116+
117+
115118
function Socket(options) {
116119
if (!(this instanceof Socket)) return new Socket(options);
117120

@@ -179,6 +182,9 @@ function Socket(options) {
179182
// Reserve properties
180183
this.server = null;
181184
this._server = null;
185+
186+
// Used after `.destroy()`
187+
this[BYTES_READ] = 0;
182188
}
183189
util.inherits(Socket, stream.Duplex);
184190

@@ -472,6 +478,9 @@ Socket.prototype._destroy = function(exception, cb) {
472478
if (this !== process.stderr)
473479
debug('close handle');
474480
var isException = exception ? true : false;
481+
// `bytesRead` should be accessible after `.destroy()`
482+
this[BYTES_READ] = this._handle.bytesRead;
483+
475484
this._handle.close(function() {
476485
debug('emit close');
477486
self.emit('close', isException);
@@ -523,10 +532,6 @@ function onread(nread, buffer) {
523532
// will prevent this from being called again until _read() gets
524533
// called again.
525534

526-
// if it's not enough data, we'll just call handle.readStart()
527-
// again right away.
528-
self.bytesRead += nread;
529-
530535
// Optimization: emit the original buffer with end points
531536
var ret = self.push(buffer);
532537

@@ -582,6 +587,9 @@ Socket.prototype._getpeername = function() {
582587
return this._peername;
583588
};
584589

590+
Socket.prototype.__defineGetter__('bytesRead', function() {
591+
return this._handle ? this._handle.bytesRead : this[BYTES_READ];
592+
});
585593

586594
Socket.prototype.__defineGetter__('remoteAddress', function() {
587595
return this._getpeername().address;

src/env.h

+1
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ namespace node {
5454
V(buffer_string, "buffer") \
5555
V(bytes_string, "bytes") \
5656
V(bytes_parsed_string, "bytesParsed") \
57+
V(bytes_read_string, "bytesRead") \
5758
V(callback_string, "callback") \
5859
V(change_string, "change") \
5960
V(oncertcb_string, "oncertcb") \

src/stream_base-inl.h

+17
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,13 @@ void StreamBase::AddMethods(Environment* env,
4343
v8::DEFAULT,
4444
attributes);
4545

46+
t->InstanceTemplate()->SetAccessor(env->bytes_read_string(),
47+
GetBytesRead<Base>,
48+
nullptr,
49+
env->as_external(),
50+
v8::DEFAULT,
51+
attributes);
52+
4653
env->SetProtoMethod(t, "readStart", JSMethod<Base, &StreamBase::ReadStart>);
4754
env->SetProtoMethod(t, "readStop", JSMethod<Base, &StreamBase::ReadStop>);
4855
if ((flags & kFlagNoShutdown) == 0)
@@ -79,6 +86,16 @@ void StreamBase::GetFD(Local<String> key,
7986
}
8087

8188

89+
template <class Base>
90+
void StreamBase::GetBytesRead(Local<String> key,
91+
const PropertyCallbackInfo<Value>& args) {
92+
StreamBase* wrap = Unwrap<Base>(args.Holder());
93+
94+
// uint64_t -> double. 53bits is enough for all real cases.
95+
args.GetReturnValue().Set(static_cast<double>(wrap->bytes_read_));
96+
}
97+
98+
8299
template <class Base>
83100
void StreamBase::GetExternal(Local<String> key,
84101
const PropertyCallbackInfo<Value>& args) {

src/stream_base.h

+11-2
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ class StreamResource {
136136
uv_handle_type pending,
137137
void* ctx);
138138

139-
StreamResource() {
139+
StreamResource() : bytes_read_(0) {
140140
}
141141
virtual ~StreamResource() = default;
142142

@@ -160,9 +160,11 @@ class StreamResource {
160160
alloc_cb_.fn(size, buf, alloc_cb_.ctx);
161161
}
162162

163-
inline void OnRead(size_t nread,
163+
inline void OnRead(ssize_t nread,
164164
const uv_buf_t* buf,
165165
uv_handle_type pending = UV_UNKNOWN_HANDLE) {
166+
if (nread > 0)
167+
bytes_read_ += static_cast<uint64_t>(nread);
166168
if (!read_cb_.is_empty())
167169
read_cb_.fn(nread, buf, pending, read_cb_.ctx);
168170
}
@@ -182,6 +184,9 @@ class StreamResource {
182184
Callback<AfterWriteCb> after_write_cb_;
183185
Callback<AllocCb> alloc_cb_;
184186
Callback<ReadCb> read_cb_;
187+
uint64_t bytes_read_;
188+
189+
friend class StreamBase;
185190
};
186191

187192
class StreamBase : public StreamResource {
@@ -249,6 +254,10 @@ class StreamBase : public StreamResource {
249254
static void GetExternal(v8::Local<v8::String> key,
250255
const v8::PropertyCallbackInfo<v8::Value>& args);
251256

257+
template <class Base>
258+
static void GetBytesRead(v8::Local<v8::String> key,
259+
const v8::PropertyCallbackInfo<v8::Value>& args);
260+
252261
template <class Base,
253262
int (StreamBase::*Method)( // NOLINT(whitespace/parens)
254263
const v8::FunctionCallbackInfo<v8::Value>& args)>

test/parallel/test-net-bytes-read.js

+37
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const assert = require('assert');
5+
const net = require('net');
6+
7+
const big = Buffer(1024 * 1024);
8+
9+
const server = net.createServer((socket) => {
10+
socket.end(big);
11+
server.close();
12+
}).listen(common.PORT, () => {
13+
let prev = 0;
14+
15+
function checkRaise(value) {
16+
assert(value > prev);
17+
prev = value;
18+
}
19+
20+
const socket = net.connect(common.PORT, () => {
21+
socket.on('data', (chunk) => {
22+
checkRaise(socket.bytesRead);
23+
});
24+
25+
socket.on('end', common.mustCall(() => {
26+
assert.equal(socket.bytesRead, prev);
27+
assert.equal(big.length, prev);
28+
}));
29+
30+
socket.on('close', common.mustCall(() => {
31+
assert(!socket._handle);
32+
assert.equal(socket.bytesRead, prev);
33+
assert.equal(big.length, prev);
34+
}));
35+
});
36+
socket.end();
37+
});

0 commit comments

Comments
 (0)