Skip to content

Commit a85e418

Browse files
authored
stream: reduce overhead of transfer
PR-URL: #50107 Reviewed-By: Yagiz Nizipli <[email protected]> Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Stephen Belanger <[email protected]>
1 parent 760b5dd commit a85e418

File tree

4 files changed

+163
-85
lines changed

4 files changed

+163
-85
lines changed

benchmark/webstreams/js_transfer.js

+52
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
'use strict';
2+
3+
const common = require('../common.js');
4+
5+
const { MessageChannel } = require('worker_threads');
6+
const { WritableStream, TransformStream, ReadableStream } = require('stream/web');
7+
8+
const bench = common.createBenchmark(main, {
9+
payload: ['WritableStream', 'ReadableStream', 'TransformStream'],
10+
n: [1e4],
11+
});
12+
13+
function main({ n, payload: payloadType }) {
14+
let createPayload;
15+
let messages = 0;
16+
17+
switch (payloadType) {
18+
case 'WritableStream':
19+
createPayload = () => new WritableStream();
20+
break;
21+
case 'ReadableStream':
22+
createPayload = () => new ReadableStream();
23+
break;
24+
case 'TransformStream':
25+
createPayload = () => new TransformStream();
26+
break;
27+
default:
28+
throw new Error('Unsupported payload type');
29+
}
30+
31+
const { port1, port2 } = new MessageChannel();
32+
33+
port2.onmessage = onMessage;
34+
35+
function onMessage() {
36+
if (messages++ === n) {
37+
bench.end(n);
38+
port1.close();
39+
} else {
40+
send();
41+
}
42+
}
43+
44+
function send() {
45+
const stream = createPayload();
46+
47+
port1.postMessage(stream, [stream]);
48+
}
49+
50+
bench.start();
51+
send();
52+
}

lib/internal/webstreams/readablestream.js

+29-19
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ const {
1717
PromisePrototypeThen,
1818
PromiseResolve,
1919
PromiseReject,
20-
ReflectConstruct,
2120
SafePromiseAll,
2221
Symbol,
2322
SymbolAsyncIterator,
@@ -642,26 +641,37 @@ ObjectDefineProperties(ReadableStream.prototype, {
642641
[SymbolToStringTag]: getNonWritablePropertyDescriptor(ReadableStream.name),
643642
});
644643

645-
function TransferredReadableStream() {
646-
return ReflectConstruct(
647-
function() {
648-
markTransferMode(this, false, true);
649-
this[kType] = 'ReadableStream';
650-
this[kState] = {
651-
disturbed: false,
652-
state: 'readable',
653-
storedError: undefined,
654-
stream: undefined,
655-
transfer: {
656-
writable: undefined,
657-
port: undefined,
658-
promise: undefined,
659-
},
660-
};
661-
this[kIsClosedPromise] = createDeferredPromise();
644+
function InternalTransferredReadableStream() {
645+
markTransferMode(this, false, true);
646+
this[kType] = 'ReadableStream';
647+
this[kState] = {
648+
disturbed: false,
649+
reader: undefined,
650+
state: 'readable',
651+
storedError: undefined,
652+
stream: undefined,
653+
transfer: {
654+
writable: undefined,
655+
port1: undefined,
656+
port2: undefined,
657+
promise: undefined,
662658
},
663-
[], ReadableStream);
659+
};
660+
661+
this[kIsClosedPromise] = createDeferredPromise();
664662
}
663+
664+
ObjectSetPrototypeOf(InternalTransferredReadableStream.prototype, ReadableStream.prototype);
665+
ObjectSetPrototypeOf(InternalTransferredReadableStream, ReadableStream);
666+
667+
function TransferredReadableStream() {
668+
const stream = new InternalTransferredReadableStream();
669+
670+
stream.constructor = ReadableStream;
671+
672+
return stream;
673+
}
674+
665675
TransferredReadableStream.prototype[kDeserialize] = () => {};
666676

667677
class ReadableStreamBYOBRequest {

lib/internal/webstreams/transformstream.js

+26-18
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@ const {
44
FunctionPrototypeBind,
55
FunctionPrototypeCall,
66
ObjectDefineProperties,
7+
ObjectSetPrototypeOf,
78
PromisePrototypeThen,
89
PromiseResolve,
9-
ReflectConstruct,
1010
SymbolToStringTag,
1111
Symbol,
1212
} = primordials;
@@ -247,25 +247,33 @@ ObjectDefineProperties(TransformStream.prototype, {
247247
[SymbolToStringTag]: getNonWritablePropertyDescriptor(TransformStream.name),
248248
});
249249

250-
function TransferredTransformStream() {
251-
return ReflectConstruct(
252-
function() {
253-
markTransferMode(this, false, true);
254-
this[kType] = 'TransformStream';
255-
this[kState] = {
256-
readable: undefined,
257-
writable: undefined,
258-
backpressure: undefined,
259-
backpressureChange: {
260-
promise: undefined,
261-
resolve: undefined,
262-
reject: undefined,
263-
},
264-
controller: undefined,
265-
};
250+
function InternalTransferredTransformStream() {
251+
markTransferMode(this, false, true);
252+
this[kType] = 'TransformStream';
253+
this[kState] = {
254+
readable: undefined,
255+
writable: undefined,
256+
backpressure: undefined,
257+
backpressureChange: {
258+
promise: undefined,
259+
resolve: undefined,
260+
reject: undefined,
266261
},
267-
[], TransformStream);
262+
controller: undefined,
263+
};
268264
}
265+
266+
ObjectSetPrototypeOf(InternalTransferredTransformStream.prototype, TransformStream.prototype);
267+
ObjectSetPrototypeOf(InternalTransferredTransformStream, TransformStream);
268+
269+
function TransferredTransformStream() {
270+
const stream = new InternalTransferredTransformStream();
271+
272+
stream.constructor = TransformStream;
273+
274+
return stream;
275+
}
276+
269277
TransferredTransformStream.prototype[kDeserialize] = () => {};
270278

271279
class TransformStreamDefaultController {

lib/internal/webstreams/writablestream.js

+56-48
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@ const {
66
FunctionPrototypeBind,
77
FunctionPrototypeCall,
88
ObjectDefineProperties,
9+
ObjectSetPrototypeOf,
910
PromisePrototypeThen,
1011
PromiseResolve,
1112
PromiseReject,
12-
ReflectConstruct,
1313
Symbol,
1414
SymbolToStringTag,
1515
} = primordials;
@@ -326,55 +326,63 @@ ObjectDefineProperties(WritableStream.prototype, {
326326
[SymbolToStringTag]: getNonWritablePropertyDescriptor(WritableStream.name),
327327
});
328328

329-
function TransferredWritableStream() {
330-
return ReflectConstruct(
331-
function() {
332-
markTransferMode(this, false, true);
333-
this[kType] = 'WritableStream';
334-
this[kState] = {
335-
close: createDeferredPromise(),
336-
closeRequest: {
337-
promise: undefined,
338-
resolve: undefined,
339-
reject: undefined,
340-
},
341-
inFlightWriteRequest: {
342-
promise: undefined,
343-
resolve: undefined,
344-
reject: undefined,
345-
},
346-
inFlightCloseRequest: {
347-
promise: undefined,
348-
resolve: undefined,
349-
reject: undefined,
350-
},
351-
pendingAbortRequest: {
352-
abort: {
353-
promise: undefined,
354-
resolve: undefined,
355-
reject: undefined,
356-
},
357-
reason: undefined,
358-
wasAlreadyErroring: false,
359-
},
360-
backpressure: false,
361-
controller: undefined,
362-
state: 'writable',
363-
storedError: undefined,
364-
writeRequests: [],
365-
writer: undefined,
366-
transfer: {
367-
promise: undefined,
368-
port1: undefined,
369-
port2: undefined,
370-
readable: undefined,
371-
},
372-
};
373-
this[kIsClosedPromise] = createDeferredPromise();
374-
this[kControllerErrorFunction] = () => {};
329+
function InternalTransferredWritableStream() {
330+
markTransferMode(this, false, true);
331+
this[kType] = 'WritableStream';
332+
this[kState] = {
333+
close: createDeferredPromise(),
334+
closeRequest: {
335+
promise: undefined,
336+
resolve: undefined,
337+
reject: undefined,
338+
},
339+
inFlightWriteRequest: {
340+
promise: undefined,
341+
resolve: undefined,
342+
reject: undefined,
375343
},
376-
[], WritableStream);
344+
inFlightCloseRequest: {
345+
promise: undefined,
346+
resolve: undefined,
347+
reject: undefined,
348+
},
349+
pendingAbortRequest: {
350+
abort: {
351+
promise: undefined,
352+
resolve: undefined,
353+
reject: undefined,
354+
},
355+
reason: undefined,
356+
wasAlreadyErroring: false,
357+
},
358+
backpressure: false,
359+
controller: undefined,
360+
state: 'writable',
361+
storedError: undefined,
362+
writeRequests: [],
363+
writer: undefined,
364+
transfer: {
365+
readable: undefined,
366+
port1: undefined,
367+
port2: undefined,
368+
promise: undefined,
369+
},
370+
};
371+
372+
this[kIsClosedPromise] = createDeferredPromise();
377373
}
374+
375+
ObjectSetPrototypeOf(InternalTransferredWritableStream.prototype, WritableStream.prototype);
376+
ObjectSetPrototypeOf(InternalTransferredWritableStream, WritableStream);
377+
378+
function TransferredWritableStream() {
379+
const stream = new InternalTransferredWritableStream();
380+
381+
stream.constructor = WritableStream;
382+
383+
return stream;
384+
}
385+
378386
TransferredWritableStream.prototype[kDeserialize] = () => {};
379387

380388
class WritableStreamDefaultWriter {

0 commit comments

Comments
 (0)