Skip to content

Commit 3f7b05e

Browse files
committed
stream: improve tee perf by reduce ReflectConstruct usages
also added more webstream creation benchmarks PR-URL: nodejs#49546 Reviewed-By: Yagiz Nizipli <[email protected]> Reviewed-By: Matteo Collina <[email protected]>
1 parent 022f1b7 commit 3f7b05e

File tree

2 files changed

+96
-35
lines changed

2 files changed

+96
-35
lines changed

benchmark/webstreams/creation.js

+60-8
Original file line numberDiff line numberDiff line change
@@ -2,47 +2,99 @@
22
const common = require('../common.js');
33
const {
44
ReadableStream,
5+
ReadableStreamDefaultReader,
6+
ReadableStreamBYOBReader,
57
TransformStream,
68
WritableStream,
79
} = require('node:stream/web');
810
const assert = require('assert');
911

1012
const bench = common.createBenchmark(main, {
1113
n: [50e3],
12-
kind: ['ReadableStream', 'TransformStream', 'WritableStream'],
14+
kind: [
15+
'ReadableStream',
16+
'TransformStream',
17+
'WritableStream',
18+
19+
'ReadableStreamDefaultReader',
20+
'ReadableStreamBYOBReader',
21+
22+
'ReadableStream.tee',
23+
],
1324
});
1425

15-
let rs, ws, ts;
26+
let readableStream;
27+
let transformStream;
28+
let writableStream;
29+
let readableStreamDefaultReader;
30+
let readableStreamBYOBReader;
31+
let teeResult;
1632

1733
function main({ n, kind }) {
1834
switch (kind) {
1935
case 'ReadableStream':
2036
bench.start();
2137
for (let i = 0; i < n; ++i)
22-
rs = new ReadableStream();
38+
readableStream = new ReadableStream();
2339
bench.end(n);
2440

2541
// Avoid V8 deadcode (elimination)
26-
assert.ok(rs);
42+
assert.ok(readableStream);
2743
break;
2844
case 'WritableStream':
2945
bench.start();
3046
for (let i = 0; i < n; ++i)
31-
ws = new WritableStream();
47+
writableStream = new WritableStream();
3248
bench.end(n);
3349

3450
// Avoid V8 deadcode (elimination)
35-
assert.ok(ws);
51+
assert.ok(writableStream);
3652
break;
3753
case 'TransformStream':
3854
bench.start();
3955
for (let i = 0; i < n; ++i)
40-
ts = new TransformStream();
56+
transformStream = new TransformStream();
57+
bench.end(n);
58+
59+
// Avoid V8 deadcode (elimination)
60+
assert.ok(transformStream);
61+
break;
62+
case 'ReadableStreamDefaultReader': {
63+
const readers = Array.from({ length: n }, () => new ReadableStream());
64+
65+
bench.start();
66+
for (let i = 0; i < n; ++i)
67+
readableStreamDefaultReader = new ReadableStreamDefaultReader(readers[i]);
68+
bench.end(n);
69+
70+
// Avoid V8 deadcode (elimination)
71+
assert.ok(readableStreamDefaultReader);
72+
break;
73+
}
74+
case 'ReadableStreamBYOBReader': {
75+
const readers = Array.from({ length: n }, () => new ReadableStream({ type: 'bytes' }));
76+
77+
bench.start();
78+
for (let i = 0; i < n; ++i)
79+
readableStreamBYOBReader = new ReadableStreamBYOBReader(readers[i]);
80+
bench.end(n);
81+
82+
// Avoid V8 deadcode (elimination)
83+
assert.ok(readableStreamBYOBReader);
84+
break;
85+
}
86+
case 'ReadableStream.tee': {
87+
const streams = Array.from({ length: n }, () => new ReadableStream());
88+
89+
bench.start();
90+
for (let i = 0; i < n; ++i)
91+
teeResult = streams[i].tee();
4192
bench.end(n);
4293

4394
// Avoid V8 deadcode (elimination)
44-
assert.ok(ts);
95+
assert.ok(teeResult);
4596
break;
97+
}
4698
default:
4799
throw new Error('Invalid kind');
48100
}

lib/internal/webstreams/readablestream.js

+36-27
Original file line numberDiff line numberDiff line change
@@ -1200,34 +1200,43 @@ ObjectDefineProperties(ReadableByteStreamController.prototype, {
12001200
[SymbolToStringTag]: getNonWritablePropertyDescriptor(ReadableByteStreamController.name),
12011201
});
12021202

1203+
function TeeReadableStream(start, pull, cancel) {
1204+
this[kType] = 'ReadableStream';
1205+
this[kState] = {
1206+
disturbed: false,
1207+
state: 'readable',
1208+
storedError: undefined,
1209+
stream: undefined,
1210+
transfer: {
1211+
writable: undefined,
1212+
port: undefined,
1213+
promise: undefined,
1214+
},
1215+
};
1216+
this[kIsClosedPromise] = createDeferredPromise();
1217+
setupReadableStreamDefaultControllerFromSource(
1218+
this,
1219+
ObjectCreate(null, {
1220+
start: { __proto__: null, value: start },
1221+
pull: { __proto__: null, value: pull },
1222+
cancel: { __proto__: null, value: cancel },
1223+
}),
1224+
1,
1225+
() => 1);
1226+
1227+
1228+
return makeTransferable(this);
1229+
}
1230+
1231+
ObjectSetPrototypeOf(TeeReadableStream.prototype, ReadableStream.prototype);
1232+
ObjectSetPrototypeOf(TeeReadableStream, ReadableStream);
1233+
12031234
function createTeeReadableStream(start, pull, cancel) {
1204-
return ReflectConstruct(
1205-
function() {
1206-
this[kType] = 'ReadableStream';
1207-
this[kState] = {
1208-
disturbed: false,
1209-
state: 'readable',
1210-
storedError: undefined,
1211-
stream: undefined,
1212-
transfer: {
1213-
writable: undefined,
1214-
port: undefined,
1215-
promise: undefined,
1216-
},
1217-
};
1218-
this[kIsClosedPromise] = createDeferredPromise();
1219-
setupReadableStreamDefaultControllerFromSource(
1220-
this,
1221-
ObjectCreate(null, {
1222-
start: { __proto__: null, value: start },
1223-
pull: { __proto__: null, value: pull },
1224-
cancel: { __proto__: null, value: cancel },
1225-
}),
1226-
1,
1227-
() => 1);
1228-
return makeTransferable(this);
1229-
}, [], ReadableStream,
1230-
);
1235+
const tee = new TeeReadableStream(start, pull, cancel);
1236+
1237+
// For spec compliance the Tee must be a ReadableStream
1238+
tee.constructor = ReadableStream;
1239+
return tee;
12311240
}
12321241

12331242
const isReadableStream =

0 commit comments

Comments
 (0)