Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

streams: performance improvements for clearBuffer in Writable #3751

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 39 additions & 11 deletions lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
module.exports = Writable;
Writable.WritableState = WritableState;

const assert = require('assert');
const util = require('util');
const internalUtil = require('internal/util');
const Stream = require('stream');
Expand Down Expand Up @@ -108,6 +109,26 @@ function WritableState(options, stream) {

// True if the error was already emitted and should not be thrown again
this.errorEmitted = false;

// count buffered requests
this.bufferedRequestCount = 0;

// the requests that needs to be called by uncork
this.corkedCbs = new Array(2);
this.corkedCbsId = 0;

// call all the corked requests
this.afterCorkedWrite = function afterCorkedWrite(err) {
var state = stream._writableState;
var id = state.corkedCbsId === 1 ? 0 : 1;
var cbs = state.corkedCbs[id];
state.corkedCbs[id] = null;

for (var i = 0; i < cbs.length; i++) {
state.pendingcb--;
cbs[i](err);
}
};
}

WritableState.prototype.getBuffer = function writableStateGetBuffer() {
Expand Down Expand Up @@ -274,6 +295,7 @@ function writeOrBuffer(stream, state, chunk, encoding, cb) {
} else {
state.bufferedRequest = state.lastBufferedRequest;
}
state.bufferedRequestCount++;
} else {
doWrite(stream, state, false, len, chunk, encoding, cb);
}
Expand Down Expand Up @@ -362,27 +384,33 @@ function onwriteDrain(stream, state) {
function clearBuffer(stream, state) {
state.bufferProcessing = true;
var entry = state.bufferedRequest;
var bufferedRequestCount = state.bufferedRequestCount;

state.bufferedRequestCount = 0;

if (stream._writev && entry && entry.next) {
// Fast case, write everything using _writev()
var buffer = [];
var cbs = [];
var buffer = new Array(bufferedRequestCount);
var cbs = new Array(bufferedRequestCount);
var count = 0;

while (entry) {
cbs.push(entry.callback);
buffer.push(entry);
cbs[count] = entry.callback;
buffer[count] = entry;
count++;
entry = entry.next;
}

// count the one we are adding, as well.
// TODO(isaacs) clean this up
state.pendingcb++;
state.lastBufferedRequest = null;
doWrite(stream, state, true, state.length, buffer, '', function(err) {
for (var i = 0; i < cbs.length; i++) {
state.pendingcb--;
cbs[i](err);
}
});

assert(!state.corkedCbs[state.corkedCbsId], 'only two sets of callbacks');
state.corkedCbs[state.corkedCbsId] = cbs;
state.corkedCbsId = ++state.corkedCbsId % 2;

doWrite(stream, state, true, state.length, buffer, '',
state.afterCorkedWrite);

// Clear buffer
} else {
Expand Down