diff --git a/lib/internal/child_process/serialization.js b/lib/internal/child_process/serialization.js index ec858f401bea9e..497bf233d77897 100644 --- a/lib/internal/child_process/serialization.js +++ b/lib/internal/child_process/serialization.js @@ -4,6 +4,7 @@ const { JSONParse, JSONStringify, StringPrototypeSplit, + ArrayPrototypePush, Symbol, TypedArrayPrototypeSubarray, } = primordials; @@ -15,6 +16,7 @@ const assert = require('internal/assert'); const { streamBaseState, kLastWriteWasAsync } = internalBinding('stream_wrap'); const kMessageBuffer = Symbol('kMessageBuffer'); +const kMessageBufferSize = Symbol('kMessageBufferSize'); const kJSONBuffer = Symbol('kJSONBuffer'); const kStringDecoder = Symbol('kStringDecoder'); @@ -51,47 +53,77 @@ class ChildProcessDeserializer extends v8.DefaultDeserializer { // (aka 'advanced') const advanced = { initMessageChannel(channel) { - channel[kMessageBuffer] = Buffer.alloc(0); + channel[kMessageBuffer] = []; + channel[kMessageBufferSize] = 0; channel.buffering = false; }, *parseChannelMessages(channel, readData) { if (readData.length === 0) return; - let messageBuffer = Buffer.concat([channel[kMessageBuffer], readData]); - while (messageBuffer.length > 4) { - const size = messageBuffer.readUInt32BE(); - if (messageBuffer.length < 4 + size) { - break; - } + ArrayPrototypePush(channel[kMessageBuffer], readData); + channel[kMessageBufferSize] += readData.length; + + // Index 0 should always be present because we just pushed data into it. + let messageBufferHead = channel[kMessageBuffer][0]; + while (messageBufferHead.length >= 4) { + // We call `readUInt32BE` manually here, because this is faster than first converting + // it to a buffer and using `readUInt32BE` on that. + const fullMessageSize = ( + messageBufferHead[0] << 24 | + messageBufferHead[1] << 16 | + messageBufferHead[2] << 8 | + messageBufferHead[3] + ) + 4; + + if (channel[kMessageBufferSize] < fullMessageSize) break; + + const concatenatedBuffer = channel[kMessageBuffer].length === 1 ? + channel[kMessageBuffer][0] : + Buffer.concat( + channel[kMessageBuffer], + channel[kMessageBufferSize] + ); const deserializer = new ChildProcessDeserializer( - TypedArrayPrototypeSubarray(messageBuffer, 4, 4 + size)); - messageBuffer = TypedArrayPrototypeSubarray(messageBuffer, 4 + size); + TypedArrayPrototypeSubarray(concatenatedBuffer, 4, fullMessageSize) + ); + + messageBufferHead = TypedArrayPrototypeSubarray(concatenatedBuffer, fullMessageSize); + channel[kMessageBufferSize] = messageBufferHead.length; + channel[kMessageBuffer] = + channel[kMessageBufferSize] !== 0 ? [messageBufferHead] : []; deserializer.readHeader(); yield deserializer.readValue(); } - channel[kMessageBuffer] = messageBuffer; - channel.buffering = messageBuffer.length > 0; + + channel.buffering = channel[kMessageBufferSize] > 0; }, writeChannelMessage(channel, req, message, handle) { const ser = new ChildProcessSerializer(); + // Add 4 bytes, to later populate with message length + ser.writeRawBytes(Buffer.allocUnsafe(4)); ser.writeHeader(); ser.writeValue(message); + const serializedMessage = ser.releaseBuffer(); - const sizeBuffer = Buffer.allocUnsafe(4); - sizeBuffer.writeUInt32BE(serializedMessage.length); - - const buffer = Buffer.concat([ - sizeBuffer, - serializedMessage, - ]); - const result = channel.writeBuffer(req, buffer, handle); + const serializedMessageLength = serializedMessage.length - 4; + + serializedMessage.set([ + serializedMessageLength >> 24 & 0xFF, + serializedMessageLength >> 16 & 0xFF, + serializedMessageLength >> 8 & 0xFF, + serializedMessageLength & 0xFF, + ], 0); + + const result = channel.writeBuffer(req, serializedMessage, handle); + // Mirror what stream_base_commons.js does for Buffer retention. if (streamBaseState[kLastWriteWasAsync]) - req.buffer = buffer; + req.buffer = serializedMessage; + return result; }, };