Skip to content

Commit 2b6aa94

Browse files
Ron KorvingFishrock123
Ron Korving
authored andcommitted
fs: implemented WriteStream#writev
Streams with writev allow many buffers to be pushed to underlying OS APIs in one batch, in this case improving write throughput by an order of magnitude. This is especially noticeable when writing many (small) buffers. PR-URL: #2167 Reviewed-By: Trevor Norris <[email protected]>
1 parent ff6d30d commit 2b6aa94

File tree

2 files changed

+99
-0
lines changed

2 files changed

+99
-0
lines changed

lib/fs.js

+44
Original file line numberDiff line numberDiff line change
@@ -1867,6 +1867,50 @@ WriteStream.prototype._write = function(data, encoding, cb) {
18671867
};
18681868

18691869

1870+
function writev(fd, chunks, position, callback) {
1871+
function wrapper(err, written) {
1872+
// Retain a reference to chunks so that they can't be GC'ed too soon.
1873+
callback(err, written || 0, chunks);
1874+
}
1875+
1876+
const req = new FSReqWrap();
1877+
req.oncomplete = wrapper;
1878+
binding.writeBuffers(fd, chunks, position, req);
1879+
}
1880+
1881+
1882+
WriteStream.prototype._writev = function(data, cb) {
1883+
if (typeof this.fd !== 'number')
1884+
return this.once('open', function() {
1885+
this._writev(data, cb);
1886+
});
1887+
1888+
const self = this;
1889+
const len = data.length;
1890+
const chunks = new Array(len);
1891+
var size = 0;
1892+
1893+
for (var i = 0; i < len; i++) {
1894+
var chunk = data[i].chunk;
1895+
1896+
chunks[i] = chunk;
1897+
size += chunk.length;
1898+
}
1899+
1900+
writev(this.fd, chunks, this.pos, function(er, bytes) {
1901+
if (er) {
1902+
self.destroy();
1903+
return cb(er);
1904+
}
1905+
self.bytesWritten += bytes;
1906+
cb();
1907+
});
1908+
1909+
if (this.pos !== undefined)
1910+
this.pos += size;
1911+
};
1912+
1913+
18701914
WriteStream.prototype.destroy = ReadStream.prototype.destroy;
18711915
WriteStream.prototype.close = ReadStream.prototype.close;
18721916

src/node_file.cc

+55
Original file line numberDiff line numberDiff line change
@@ -907,6 +907,60 @@ static void WriteBuffer(const FunctionCallbackInfo<Value>& args) {
907907
}
908908

909909

910+
// Wrapper for writev(2).
911+
//
912+
// bytesWritten = writev(fd, chunks, position, callback)
913+
// 0 fd integer. file descriptor
914+
// 1 chunks array of buffers to write
915+
// 2 position if integer, position to write at in the file.
916+
// if null, write from the current position
917+
static void WriteBuffers(const FunctionCallbackInfo<Value>& args) {
918+
Environment* env = Environment::GetCurrent(args);
919+
920+
CHECK(args[0]->IsInt32());
921+
CHECK(args[1]->IsArray());
922+
923+
int fd = args[0]->Int32Value();
924+
Local<Array> chunks = args[1].As<Array>();
925+
int64_t pos = GET_OFFSET(args[2]);
926+
Local<Value> req = args[3];
927+
928+
uint32_t chunkCount = chunks->Length();
929+
930+
uv_buf_t s_iovs[1024]; // use stack allocation when possible
931+
uv_buf_t* iovs;
932+
933+
if (chunkCount > ARRAY_SIZE(s_iovs))
934+
iovs = new uv_buf_t[chunkCount];
935+
else
936+
iovs = s_iovs;
937+
938+
for (uint32_t i = 0; i < chunkCount; i++) {
939+
Local<Value> chunk = chunks->Get(i);
940+
941+
if (!Buffer::HasInstance(chunk)) {
942+
if (iovs != s_iovs)
943+
delete[] iovs;
944+
return env->ThrowTypeError("Array elements all need to be buffers");
945+
}
946+
947+
iovs[i] = uv_buf_init(Buffer::Data(chunk), Buffer::Length(chunk));
948+
}
949+
950+
if (req->IsObject()) {
951+
ASYNC_CALL(write, req, fd, iovs, chunkCount, pos)
952+
if (iovs != s_iovs)
953+
delete[] iovs;
954+
return;
955+
}
956+
957+
SYNC_CALL(write, nullptr, fd, iovs, chunkCount, pos)
958+
if (iovs != s_iovs)
959+
delete[] iovs;
960+
args.GetReturnValue().Set(SYNC_RESULT);
961+
}
962+
963+
910964
// Wrapper for write(2).
911965
//
912966
// bytesWritten = write(fd, string, position, enc, callback)
@@ -1248,6 +1302,7 @@ void InitFs(Local<Object> target,
12481302
env->SetMethod(target, "readlink", ReadLink);
12491303
env->SetMethod(target, "unlink", Unlink);
12501304
env->SetMethod(target, "writeBuffer", WriteBuffer);
1305+
env->SetMethod(target, "writeBuffers", WriteBuffers);
12511306
env->SetMethod(target, "writeString", WriteString);
12521307

12531308
env->SetMethod(target, "chmod", Chmod);

0 commit comments

Comments
 (0)