From 1819b6f199c87ac8a08e3e126be4549193080b70 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Tue, 10 Nov 2015 10:20:58 +0000 Subject: [PATCH 1/2] streams: perf improvments for clearBuffer in Writable. This commits removes a function allocation inside the clearBuffer function. Moreover, it adds counting of buffered chunks to instantiate a fixed-sized Array. The performance improvements are in the range 5-50%, depending on the usecase. --- lib/_stream_writable.js | 59 +++++++++++++++++++++++++++++++++-------- 1 file changed, 48 insertions(+), 11 deletions(-) diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 10fcae04f5553e..a4ecf8a6828e75 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -7,6 +7,7 @@ module.exports = Writable; Writable.WritableState = WritableState; +const assert = require('assert'); const util = require('util'); const internalUtil = require('internal/util'); const Stream = require('stream'); @@ -108,6 +109,31 @@ function WritableState(options, stream) { // True if the error was already emitted and should not be thrown again this.errorEmitted = false; + + // count buffered requests + this.bufferedRequestCount = 0; + + // the requests that needs to be called by uncork + this.corkedCbs = null; + + // call all the corked requests + this.afterCorkedWrite = function afterCorkedWrite(err) { + var state = stream._writableState; + var entry = state.corkedCbs; + var cbs = entry.cbs; + + state.corkedCbs = entry.next; + + for (var i = 0; i < cbs.length; i++) { + state.pendingcb--; + cbs[i](err); + } + }; +} + +function CorkedCbs(cbs) { + this.cbs = cbs; + this.next = null; } WritableState.prototype.getBuffer = function writableStateGetBuffer() { @@ -274,6 +300,7 @@ function writeOrBuffer(stream, state, chunk, encoding, cb) { } else { state.bufferedRequest = state.lastBufferedRequest; } + state.bufferedRequestCount++; } else { doWrite(stream, state, false, len, chunk, encoding, cb); } @@ -362,27 +389,37 @@ function onwriteDrain(stream, state) { function clearBuffer(stream, state) { state.bufferProcessing = true; var entry = state.bufferedRequest; + var bufferedRequestCount = state.bufferedRequestCount; + + state.bufferedRequestCount = 0; if (stream._writev && entry && entry.next) { // Fast case, write everything using _writev() - var buffer = []; - var cbs = []; + var buffer = new Array(bufferedRequestCount); + var cbs = new Array(bufferedRequestCount); + var count = 0; + while (entry) { - cbs.push(entry.callback); - buffer.push(entry); + cbs[count] = entry.callback; + buffer[count] = entry; + count++; entry = entry.next; } // count the one we are adding, as well. - // TODO(isaacs) clean this up state.pendingcb++; state.lastBufferedRequest = null; - doWrite(stream, state, true, state.length, buffer, '', function(err) { - for (var i = 0; i < cbs.length; i++) { - state.pendingcb--; - cbs[i](err); - } - }); + + if (state.corkedCbs) { + // only two corkedCbs objects are supported + assert(!state.corkedCbs.next); + state.corkedCbs.next = new CorkedCbs(cbs); + } else { + state.corkedCbs = new CorkedCbs(cbs); + } + + doWrite(stream, state, true, state.length, buffer, '', + state.afterCorkedWrite); // Clear buffer } else { From 1353631ad70f87bf8762e71e63194d4035a82b79 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Tue, 15 Dec 2015 02:23:04 +0100 Subject: [PATCH 2/2] simplified logic for the new clearBuffer. --- lib/_stream_writable.js | 25 ++++++++----------------- 1 file changed, 8 insertions(+), 17 deletions(-) diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index a4ecf8a6828e75..d9be0976a21ee5 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -114,15 +114,15 @@ function WritableState(options, stream) { this.bufferedRequestCount = 0; // the requests that needs to be called by uncork - this.corkedCbs = null; + this.corkedCbs = new Array(2); + this.corkedCbsId = 0; // call all the corked requests this.afterCorkedWrite = function afterCorkedWrite(err) { var state = stream._writableState; - var entry = state.corkedCbs; - var cbs = entry.cbs; - - state.corkedCbs = entry.next; + var id = state.corkedCbsId === 1 ? 0 : 1; + var cbs = state.corkedCbs[id]; + state.corkedCbs[id] = null; for (var i = 0; i < cbs.length; i++) { state.pendingcb--; @@ -131,11 +131,6 @@ function WritableState(options, stream) { }; } -function CorkedCbs(cbs) { - this.cbs = cbs; - this.next = null; -} - WritableState.prototype.getBuffer = function writableStateGetBuffer() { var current = this.bufferedRequest; var out = []; @@ -410,13 +405,9 @@ function clearBuffer(stream, state) { state.pendingcb++; state.lastBufferedRequest = null; - if (state.corkedCbs) { - // only two corkedCbs objects are supported - assert(!state.corkedCbs.next); - state.corkedCbs.next = new CorkedCbs(cbs); - } else { - state.corkedCbs = new CorkedCbs(cbs); - } + assert(!state.corkedCbs[state.corkedCbsId], 'only two sets of callbacks'); + state.corkedCbs[state.corkedCbsId] = cbs; + state.corkedCbsId = ++state.corkedCbsId % 2; doWrite(stream, state, true, state.length, buffer, '', state.afterCorkedWrite);