Skip to content

Commit f706cb0

Browse files
mcollinarvagg
authored andcommitted
streams: 5% throughput gain when sending small chunks
Improves the performance when moving small buffers by 5%, and it adds a benchmark to avoid regression in that area. In all other cases it is equally performant to current master. Full performance results available at: https://gist.github.com/mcollina/717c35ad07d15710b6b9. PR-URL: #4354 Reviewed-By: James M Snell <[email protected]>
1 parent 5c54d49 commit f706cb0

File tree

2 files changed

+144
-15
lines changed

2 files changed

+144
-15
lines changed

benchmark/net/net-c2s-cork.js

+96
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
// test the speed of .pipe() with sockets
2+
3+
var common = require('../common.js');
4+
var PORT = common.PORT;
5+
6+
var bench = common.createBenchmark(main, {
7+
len: [4, 8, 16, 32, 64, 128, 512, 1024],
8+
type: ['buf'],
9+
dur: [5],
10+
});
11+
12+
var dur;
13+
var len;
14+
var type;
15+
var chunk;
16+
var encoding;
17+
18+
function main(conf) {
19+
dur = +conf.dur;
20+
len = +conf.len;
21+
type = conf.type;
22+
23+
switch (type) {
24+
case 'buf':
25+
chunk = new Buffer(len);
26+
chunk.fill('x');
27+
break;
28+
case 'utf':
29+
encoding = 'utf8';
30+
chunk = new Array(len / 2 + 1).join('ü');
31+
break;
32+
case 'asc':
33+
encoding = 'ascii';
34+
chunk = new Array(len + 1).join('x');
35+
break;
36+
default:
37+
throw new Error('invalid type: ' + type);
38+
break;
39+
}
40+
41+
server();
42+
}
43+
44+
var net = require('net');
45+
46+
function Writer() {
47+
this.received = 0;
48+
this.writable = true;
49+
}
50+
51+
Writer.prototype.write = function(chunk, encoding, cb) {
52+
this.received += chunk.length;
53+
54+
if (typeof encoding === 'function')
55+
encoding();
56+
else if (typeof cb === 'function')
57+
cb();
58+
59+
return true;
60+
};
61+
62+
// doesn't matter, never emits anything.
63+
Writer.prototype.on = function() {};
64+
Writer.prototype.once = function() {};
65+
Writer.prototype.emit = function() {};
66+
67+
function server() {
68+
var writer = new Writer();
69+
70+
// the actual benchmark.
71+
var server = net.createServer(function(socket) {
72+
socket.pipe(writer);
73+
});
74+
75+
server.listen(PORT, function() {
76+
var socket = net.connect(PORT);
77+
socket.on('connect', function() {
78+
bench.start();
79+
80+
socket.on('drain', send)
81+
send()
82+
83+
setTimeout(function() {
84+
var bytes = writer.received;
85+
var gbits = (bytes * 8) / (1024 * 1024 * 1024);
86+
bench.end(gbits);
87+
}, dur * 1000);
88+
89+
function send() {
90+
socket.cork();
91+
while(socket.write(chunk, encoding)) {}
92+
socket.uncork();
93+
}
94+
});
95+
});
96+
}

lib/_stream_writable.js

+48-15
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,14 @@ function WritableState(options, stream) {
108108

109109
// True if the error was already emitted and should not be thrown again
110110
this.errorEmitted = false;
111+
112+
// count buffered requests
113+
this.bufferedRequestCount = 0;
114+
115+
// create the two objects needed to store the corked requests
116+
// they are not a linked list, as no new elements are inserted in there
117+
this.corkedRequestsFree = new CorkedRequest(this);
118+
this.corkedRequestsFree.next = new CorkedRequest(this);
111119
}
112120

113121
WritableState.prototype.getBuffer = function writableStateGetBuffer() {
@@ -274,6 +282,7 @@ function writeOrBuffer(stream, state, chunk, encoding, cb) {
274282
} else {
275283
state.bufferedRequest = state.lastBufferedRequest;
276284
}
285+
state.bufferedRequestCount += 1;
277286
} else {
278287
doWrite(stream, state, false, len, chunk, encoding, cb);
279288
}
@@ -357,34 +366,33 @@ function onwriteDrain(stream, state) {
357366
}
358367
}
359368

360-
361369
// if there's something in the buffer waiting, then process it
362370
function clearBuffer(stream, state) {
363371
state.bufferProcessing = true;
364372
var entry = state.bufferedRequest;
365373

366374
if (stream._writev && entry && entry.next) {
367375
// Fast case, write everything using _writev()
368-
var buffer = [];
369-
var cbs = [];
376+
var l = state.bufferedRequestCount;
377+
var buffer = new Array(l);
378+
var holder = state.corkedRequestsFree;
379+
holder.entry = entry;
380+
381+
var count = 0;
370382
while (entry) {
371-
cbs.push(entry.callback);
372-
buffer.push(entry);
383+
buffer[count] = entry;
373384
entry = entry.next;
385+
count += 1;
374386
}
375387

376-
// count the one we are adding, as well.
377-
// TODO(isaacs) clean this up
388+
doWrite(stream, state, true, state.length, buffer, '', holder.finish);
389+
390+
// doWrite is always async, defer these to save a bit of time
391+
// as the hot path ends with doWrite
378392
state.pendingcb++;
379393
state.lastBufferedRequest = null;
380-
doWrite(stream, state, true, state.length, buffer, '', function(err) {
381-
for (var i = 0; i < cbs.length; i++) {
382-
state.pendingcb--;
383-
cbs[i](err);
384-
}
385-
});
386-
387-
// Clear buffer
394+
state.corkedRequestsFree = holder.next;
395+
holder.next = null;
388396
} else {
389397
// Slow case, write chunks one-by-one
390398
while (entry) {
@@ -407,6 +415,8 @@ function clearBuffer(stream, state) {
407415
if (entry === null)
408416
state.lastBufferedRequest = null;
409417
}
418+
419+
state.bufferedRequestCount = 0;
410420
state.bufferedRequest = entry;
411421
state.bufferProcessing = false;
412422
}
@@ -485,3 +495,26 @@ function endWritable(stream, state, cb) {
485495
state.ended = true;
486496
stream.writable = false;
487497
}
498+
499+
// It seems a linked list but it is not
500+
// there will be only 2 of these for each stream
501+
function CorkedRequest(state) {
502+
this.next = null;
503+
this.entry = null;
504+
505+
this.finish = (err) => {
506+
var entry = this.entry;
507+
this.entry = null;
508+
while (entry) {
509+
var cb = entry.callback;
510+
state.pendingcb--;
511+
cb(err);
512+
entry = entry.next;
513+
}
514+
if (state.corkedRequestsFree) {
515+
state.corkedRequestsFree.next = this;
516+
} else {
517+
state.corkedRequestsFree = this;
518+
}
519+
};
520+
}

0 commit comments

Comments
 (0)