Skip to content

Commit 538e194

Browse files
mcollinaaduh95
authored andcommitted
worker: refactor stdio to improve performance
Signed-off-by: Matteo Collina <[email protected]> PR-URL: #56630 Reviewed-By: Yagiz Nizipli <[email protected]> Reviewed-By: Robert Nagy <[email protected]> Reviewed-By: Paolo Insogna <[email protected]> Reviewed-By: Luigi Pinca <[email protected]>
1 parent 7347d34 commit 538e194

File tree

2 files changed

+26
-15
lines changed

2 files changed

+26
-15
lines changed

lib/internal/worker.js

+4-2
Original file line numberDiff line numberDiff line change
@@ -340,9 +340,11 @@ class Worker extends EventEmitter {
340340
{
341341
const { stream, chunks } = message;
342342
const readable = this[kParentSideStdio][stream];
343-
ArrayPrototypeForEach(chunks, ({ chunk, encoding }) => {
343+
// This is a hot path, use a for(;;) loop
344+
for (let i = 0; i < chunks.length; i++) {
345+
const { chunk, encoding } = chunks[i];
344346
readable.push(chunk, encoding);
345-
});
347+
}
346348
return;
347349
}
348350
case messageTypes.STDIO_WANTS_MORE_DATA:

lib/internal/worker/io.js

+22-13
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
11
'use strict';
22

33
const {
4-
ArrayPrototypeForEach,
5-
ArrayPrototypeMap,
6-
ArrayPrototypePush,
4+
Array,
75
FunctionPrototypeBind,
86
FunctionPrototypeCall,
97
ObjectAssign,
@@ -77,7 +75,7 @@ const kOnMessage = Symbol('kOnMessage');
7775
const kOnMessageError = Symbol('kOnMessageError');
7876
const kPort = Symbol('kPort');
7977
const kWaitingStreams = Symbol('kWaitingStreams');
80-
const kWritableCallbacks = Symbol('kWritableCallbacks');
78+
const kWritableCallback = Symbol('kWritableCallback');
8179
const kStartedReading = Symbol('kStartedReading');
8280
const kStdioWantsMoreDataCallback = Symbol('kStdioWantsMoreDataCallback');
8381
const kCurrentlyReceivingPorts =
@@ -282,20 +280,29 @@ class WritableWorkerStdio extends Writable {
282280
super({ decodeStrings: false });
283281
this[kPort] = port;
284282
this[kName] = name;
285-
this[kWritableCallbacks] = [];
283+
this[kWritableCallback] = null;
286284
}
287285

288286
_writev(chunks, cb) {
287+
const toSend = new Array(chunks.length);
288+
289+
// We avoid .map() because it's a hot path
290+
for (let i = 0; i < chunks.length; i++) {
291+
const { chunk, encoding } = chunks[i];
292+
toSend[i] = { chunk, encoding };
293+
}
294+
289295
this[kPort].postMessage({
290296
type: messageTypes.STDIO_PAYLOAD,
291297
stream: this[kName],
292-
chunks: ArrayPrototypeMap(chunks,
293-
({ chunk, encoding }) => ({ chunk, encoding })),
298+
chunks: toSend,
294299
});
295300
if (process._exiting) {
296301
cb();
297302
} else {
298-
ArrayPrototypePush(this[kWritableCallbacks], cb);
303+
// Only one writev happens at any given time,
304+
// so we can safely overwrite the callback.
305+
this[kWritableCallback] = cb;
299306
if (this[kPort][kWaitingStreams]++ === 0)
300307
this[kPort].ref();
301308
}
@@ -311,11 +318,13 @@ class WritableWorkerStdio extends Writable {
311318
}
312319

313320
[kStdioWantsMoreDataCallback]() {
314-
const cbs = this[kWritableCallbacks];
315-
this[kWritableCallbacks] = [];
316-
ArrayPrototypeForEach(cbs, (cb) => cb());
317-
if ((this[kPort][kWaitingStreams] -= cbs.length) === 0)
318-
this[kPort].unref();
321+
const cb = this[kWritableCallback];
322+
if (cb) {
323+
this[kWritableCallback] = null;
324+
cb();
325+
if (--this[kPort][kWaitingStreams] === 0)
326+
this[kPort].unref();
327+
}
319328
}
320329
}
321330

0 commit comments

Comments
 (0)