Skip to content

Commit 9158666

Browse files
stream: switch _writableState.buffer to queue
In cases where many small writes are made to a stream lacking _writev, the array data structure backing the WriteReq buffer would greatly increase GC pressure. Specifically, in the fs.WriteStream case, the clearBuffer routine would only clear a single WriteReq from the buffer before exiting, but would cause the entire backing array to be GC'd. Switching to [].shift lessened pressure, but still the bulk of the time was spent in memcpy. This replaces that structure with a linked list-backed queue so that adding and removing from the queue is O(1). In the _writev case, collecting the buffer requires an O(N) loop over the buffer, but that was already being performed to collect callbacks, so slowdown should be neglible. PR-URL: nodejs/node-v0.x-archive#8826 Reviewed-by: Timothy J Fontaine <[email protected]> Reviewed-by: Trevor Norris <[email protected]>
1 parent 93533e9 commit 9158666

File tree

3 files changed

+50
-22
lines changed

3 files changed

+50
-22
lines changed

lib/_stream_writable.js

+48-20
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,15 @@ Writable.WritableState = WritableState;
2828

2929
var util = require('util');
3030
var Stream = require('stream');
31+
var debug = util.debuglog('stream');
3132

3233
util.inherits(Writable, Stream);
3334

3435
function WriteReq(chunk, encoding, cb) {
3536
this.chunk = chunk;
3637
this.encoding = encoding;
3738
this.callback = cb;
39+
this.next = null;
3840
}
3941

4042
function WritableState(options, stream) {
@@ -109,7 +111,8 @@ function WritableState(options, stream) {
109111
// the amount that is being written when _write is called.
110112
this.writelen = 0;
111113

112-
this.buffer = [];
114+
this.bufferedRequest = null;
115+
this.lastBufferedRequest = null;
113116

114117
// number of pending user-supplied write callbacks
115118
// this must be 0 before 'finish' can be emitted
@@ -123,6 +126,23 @@ function WritableState(options, stream) {
123126
this.errorEmitted = false;
124127
}
125128

129+
WritableState.prototype.getBuffer = function writableStateGetBuffer() {
130+
var current = this.bufferedRequest;
131+
var out = [];
132+
while (current) {
133+
out.push(current);
134+
current = current.next;
135+
}
136+
return out;
137+
};
138+
139+
Object.defineProperty(WritableState.prototype, 'buffer', {
140+
get: util.deprecate(function() {
141+
return this.getBuffer();
142+
}, '_writableState.buffer is deprecated. Use ' +
143+
'_writableState.getBuffer() instead.')
144+
});
145+
126146
function Writable(options) {
127147
// Writable ctor is applied to Duplexes, though they're not
128148
// instanceof Writable, they're instanceof Readable.
@@ -216,7 +236,7 @@ Writable.prototype.uncork = function() {
216236
!state.corked &&
217237
!state.finished &&
218238
!state.bufferProcessing &&
219-
state.buffer.length)
239+
state.bufferedRequest)
220240
clearBuffer(this, state);
221241
}
222242
};
@@ -255,8 +275,15 @@ function writeOrBuffer(stream, state, chunk, encoding, cb) {
255275
if (!ret)
256276
state.needDrain = true;
257277

258-
if (state.writing || state.corked)
259-
state.buffer.push(new WriteReq(chunk, encoding, cb));
278+
if (state.writing || state.corked) {
279+
var last = state.lastBufferedRequest;
280+
state.lastBufferedRequest = new WriteReq(chunk, encoding, cb);
281+
if (last) {
282+
last.next = state.lastBufferedRequest;
283+
} else {
284+
state.bufferedRequest = state.lastBufferedRequest;
285+
}
286+
}
260287
else
261288
doWrite(stream, state, false, len, chunk, encoding, cb);
262289

@@ -313,7 +340,7 @@ function onwrite(stream, er) {
313340
if (!finished &&
314341
!state.corked &&
315342
!state.bufferProcessing &&
316-
state.buffer.length) {
343+
state.bufferedRequest) {
317344
clearBuffer(stream, state);
318345
}
319346

@@ -349,52 +376,53 @@ function onwriteDrain(stream, state) {
349376
// if there's something in the buffer waiting, then process it
350377
function clearBuffer(stream, state) {
351378
state.bufferProcessing = true;
379+
var entry = state.bufferedRequest;
352380

353-
if (stream._writev && state.buffer.length > 1) {
381+
if (stream._writev && entry && entry.next) {
354382
// Fast case, write everything using _writev()
383+
var buffer = [];
355384
var cbs = [];
356-
for (var c = 0; c < state.buffer.length; c++)
357-
cbs.push(state.buffer[c].callback);
385+
while (entry) {
386+
cbs.push(entry.callback);
387+
buffer.push(entry);
388+
entry = entry.next;
389+
}
358390

359391
// count the one we are adding, as well.
360392
// TODO(isaacs) clean this up
361393
state.pendingcb++;
362-
doWrite(stream, state, true, state.length, state.buffer, '', function(err) {
394+
state.lastBufferedRequest = null;
395+
doWrite(stream, state, true, state.length, buffer, '', function(err) {
363396
for (var i = 0; i < cbs.length; i++) {
364397
state.pendingcb--;
365398
cbs[i](err);
366399
}
367400
});
368401

369402
// Clear buffer
370-
state.buffer = [];
371403
} else {
372404
// Slow case, write chunks one-by-one
373-
for (var c = 0; c < state.buffer.length; c++) {
374-
var entry = state.buffer[c];
405+
while (entry) {
375406
var chunk = entry.chunk;
376407
var encoding = entry.encoding;
377408
var cb = entry.callback;
378409
var len = state.objectMode ? 1 : chunk.length;
379410

380411
doWrite(stream, state, false, len, chunk, encoding, cb);
381-
412+
entry = entry.next;
382413
// if we didn't call the onwrite immediately, then
383414
// it means that we need to wait until it does.
384415
// also, that means that the chunk and cb are currently
385416
// being processed, so move the buffer counter past them.
386417
if (state.writing) {
387-
c++;
388418
break;
389419
}
390420
}
391421

392-
if (c < state.buffer.length)
393-
state.buffer = state.buffer.slice(c);
394-
else
395-
state.buffer.length = 0;
422+
if (entry === null)
423+
state.lastBufferedRequest = null;
396424
}
397-
425+
state.bufferedRequest = entry;
398426
state.bufferProcessing = false;
399427
}
400428

@@ -435,7 +463,7 @@ Writable.prototype.end = function(chunk, encoding, cb) {
435463
function needFinish(stream, state) {
436464
return (state.ending &&
437465
state.length === 0 &&
438-
state.buffer.length === 0 &&
466+
state.bufferedRequest === null &&
439467
!state.finished &&
440468
!state.writing);
441469
}

lib/net.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -732,7 +732,7 @@ Socket.prototype.__defineGetter__('bytesWritten', function() {
732732
data = this._pendingData,
733733
encoding = this._pendingEncoding;
734734

735-
state.buffer.forEach(function(el) {
735+
state.getBuffer().forEach(function(el) {
736736
if (util.isBuffer(el.chunk))
737737
bytes += el.chunk.length;
738738
else

test/simple/test-stream2-transform.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ test('writable side consumption', function(t) {
8181
t.equal(tx._readableState.length, 10);
8282
t.equal(transformed, 10);
8383
t.equal(tx._transformState.writechunk.length, 5);
84-
t.same(tx._writableState.buffer.map(function(c) {
84+
t.same(tx._writableState.getBuffer().map(function(c) {
8585
return c.chunk.length;
8686
}), [6, 7, 8, 9, 10]);
8787

0 commit comments

Comments
 (0)