diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 10fcae04f5553e..d9be0976a21ee5 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -7,6 +7,7 @@ module.exports = Writable; Writable.WritableState = WritableState; +const assert = require('assert'); const util = require('util'); const internalUtil = require('internal/util'); const Stream = require('stream'); @@ -108,6 +109,26 @@ function WritableState(options, stream) { // True if the error was already emitted and should not be thrown again this.errorEmitted = false; + + // count buffered requests + this.bufferedRequestCount = 0; + + // the requests that needs to be called by uncork + this.corkedCbs = new Array(2); + this.corkedCbsId = 0; + + // call all the corked requests + this.afterCorkedWrite = function afterCorkedWrite(err) { + var state = stream._writableState; + var id = state.corkedCbsId === 1 ? 0 : 1; + var cbs = state.corkedCbs[id]; + state.corkedCbs[id] = null; + + for (var i = 0; i < cbs.length; i++) { + state.pendingcb--; + cbs[i](err); + } + }; } WritableState.prototype.getBuffer = function writableStateGetBuffer() { @@ -274,6 +295,7 @@ function writeOrBuffer(stream, state, chunk, encoding, cb) { } else { state.bufferedRequest = state.lastBufferedRequest; } + state.bufferedRequestCount++; } else { doWrite(stream, state, false, len, chunk, encoding, cb); } @@ -362,27 +384,33 @@ function onwriteDrain(stream, state) { function clearBuffer(stream, state) { state.bufferProcessing = true; var entry = state.bufferedRequest; + var bufferedRequestCount = state.bufferedRequestCount; + + state.bufferedRequestCount = 0; if (stream._writev && entry && entry.next) { // Fast case, write everything using _writev() - var buffer = []; - var cbs = []; + var buffer = new Array(bufferedRequestCount); + var cbs = new Array(bufferedRequestCount); + var count = 0; + while (entry) { - cbs.push(entry.callback); - buffer.push(entry); + cbs[count] = entry.callback; + buffer[count] = entry; + count++; entry = entry.next; } // count the one we are adding, as well. - // TODO(isaacs) clean this up state.pendingcb++; state.lastBufferedRequest = null; - doWrite(stream, state, true, state.length, buffer, '', function(err) { - for (var i = 0; i < cbs.length; i++) { - state.pendingcb--; - cbs[i](err); - } - }); + + assert(!state.corkedCbs[state.corkedCbsId], 'only two sets of callbacks'); + state.corkedCbs[state.corkedCbsId] = cbs; + state.corkedCbsId = ++state.corkedCbsId % 2; + + doWrite(stream, state, true, state.length, buffer, '', + state.afterCorkedWrite); // Clear buffer } else {