Skip to content

Commit 4a3ecbf

Browse files
MattiasBuelensmarco-ippolito
authored andcommitted
stream: implement min option for ReadableStreamBYOBReader.read
PR-URL: #50888 Backport-PR-URL: #54044 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Benjamin Gruenbaum <[email protected]> Reviewed-By: Debadree Chatterjee <[email protected]> Reviewed-By: Rafael Gonzaga <[email protected]>
1 parent 7625dc4 commit 4a3ecbf

15 files changed

+968
-114
lines changed

doc/api/webstreams.md

+12-3
Original file line numberDiff line numberDiff line change
@@ -488,7 +488,7 @@ added: v16.5.0
488488
-->
489489

490490
* Returns: A promise fulfilled with an object:
491-
* `value` {ArrayBuffer}
491+
* `value` {any}
492492
* `done` {boolean}
493493

494494
Requests the next chunk of data from the underlying {ReadableStream}
@@ -613,15 +613,24 @@ added: v16.5.0
613613
{ReadableStream} is closed or rejected if the stream errors or the reader's
614614
lock is released before the stream finishes closing.
615615
616-
#### `readableStreamBYOBReader.read(view)`
616+
#### `readableStreamBYOBReader.read(view[, options])`
617617
618618
<!-- YAML
619619
added: v16.5.0
620+
changes:
621+
- version: REPLACEME
622+
pr-url: https://github.com/nodejs/node/pull/54044
623+
description: Added `min` option.
620624
-->
621625
622626
* `view` {Buffer|TypedArray|DataView}
627+
* `options` {Object}
628+
* `min` {number} When set, the returned promise will only be
629+
fulfilled as soon as `min` number of elements are available.
630+
When not set, the promise fulfills when at least one element
631+
is available.
623632
* Returns: A promise fulfilled with an object:
624-
* `value` {ArrayBuffer}
633+
* `value` {TypedArray|DataView}
625634
* `done` {boolean}
626635
627636
Requests the next chunk of data from the underlying {ReadableStream}

lib/internal/encoding.js

+1-7
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,7 @@ const {
4747
const {
4848
validateString,
4949
validateObject,
50-
kValidateObjectAllowNullable,
51-
kValidateObjectAllowArray,
52-
kValidateObjectAllowFunction,
50+
kValidateObjectAllowObjectsAndNull,
5351
} = require('internal/validators');
5452
const binding = internalBinding('encoding_binding');
5553
const {
@@ -393,10 +391,6 @@ const TextDecoder =
393391
makeTextDecoderICU() :
394392
makeTextDecoderJS();
395393

396-
const kValidateObjectAllowObjectsAndNull = kValidateObjectAllowNullable |
397-
kValidateObjectAllowArray |
398-
kValidateObjectAllowFunction;
399-
400394
function makeTextDecoderICU() {
401395
const {
402396
decode: _decode,

lib/internal/validators.js

+7
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,11 @@ const kValidateObjectNone = 0;
222222
const kValidateObjectAllowNullable = 1 << 0;
223223
const kValidateObjectAllowArray = 1 << 1;
224224
const kValidateObjectAllowFunction = 1 << 2;
225+
const kValidateObjectAllowObjects = kValidateObjectAllowArray |
226+
kValidateObjectAllowFunction;
227+
const kValidateObjectAllowObjectsAndNull = kValidateObjectAllowNullable |
228+
kValidateObjectAllowArray |
229+
kValidateObjectAllowFunction;
225230

226231
/**
227232
* @callback validateObject
@@ -583,6 +588,8 @@ module.exports = {
583588
kValidateObjectAllowNullable,
584589
kValidateObjectAllowArray,
585590
kValidateObjectAllowFunction,
591+
kValidateObjectAllowObjects,
592+
kValidateObjectAllowObjectsAndNull,
586593
validateOneOf,
587594
validatePlainFunction,
588595
validatePort,

lib/internal/webstreams/readablestream.js

+67-40
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ const {
2323
SymbolAsyncIterator,
2424
SymbolDispose,
2525
SymbolToStringTag,
26+
TypedArrayPrototypeGetLength,
2627
Uint8Array,
2728
} = primordials;
2829

@@ -34,6 +35,7 @@ const {
3435
ERR_INVALID_ARG_TYPE,
3536
ERR_INVALID_STATE,
3637
ERR_INVALID_THIS,
38+
ERR_OUT_OF_RANGE,
3739
},
3840
} = require('internal/errors');
3941

@@ -59,8 +61,8 @@ const {
5961
validateAbortSignal,
6062
validateBuffer,
6163
validateObject,
62-
kValidateObjectAllowNullable,
63-
kValidateObjectAllowFunction,
64+
kValidateObjectAllowObjects,
65+
kValidateObjectAllowObjectsAndNull,
6466
} = require('internal/validators');
6567

6668
const {
@@ -247,9 +249,9 @@ class ReadableStream {
247249
* @param {UnderlyingSource} [source]
248250
* @param {QueuingStrategy} [strategy]
249251
*/
250-
constructor(source = {}, strategy = kEmptyObject) {
251-
if (source === null)
252-
throw new ERR_INVALID_ARG_VALUE('source', 'Object', source);
252+
constructor(source = kEmptyObject, strategy = kEmptyObject) {
253+
validateObject(source, 'source', kValidateObjectAllowObjects);
254+
validateObject(strategy, 'strategy', kValidateObjectAllowObjectsAndNull);
253255
this[kState] = createReadableStreamState();
254256

255257
this[kIsClosedPromise] = createDeferredPromise();
@@ -335,7 +337,7 @@ class ReadableStream {
335337
getReader(options = kEmptyObject) {
336338
if (!isReadableStream(this))
337339
throw new ERR_INVALID_THIS('ReadableStream');
338-
validateObject(options, 'options', kValidateObjectAllowNullable | kValidateObjectAllowFunction);
340+
validateObject(options, 'options', kValidateObjectAllowObjectsAndNull);
339341
const mode = options?.mode;
340342

341343
if (mode === undefined)
@@ -373,6 +375,7 @@ class ReadableStream {
373375

374376
// The web platform tests require that these be handled one at a
375377
// time and in a specific order. options can be null or undefined.
378+
validateObject(options, 'options', kValidateObjectAllowObjectsAndNull);
376379
const preventAbort = options?.preventAbort;
377380
const preventCancel = options?.preventCancel;
378381
const preventClose = options?.preventClose;
@@ -415,6 +418,7 @@ class ReadableStream {
415418
destination);
416419
}
417420

421+
validateObject(options, 'options', kValidateObjectAllowObjectsAndNull);
418422
const preventAbort = options?.preventAbort;
419423
const preventCancel = options?.preventCancel;
420424
const preventClose = options?.preventClose;
@@ -459,10 +463,8 @@ class ReadableStream {
459463
values(options = kEmptyObject) {
460464
if (!isReadableStream(this))
461465
throw new ERR_INVALID_THIS('ReadableStream');
462-
validateObject(options, 'options');
463-
const {
464-
preventCancel = false,
465-
} = options;
466+
validateObject(options, 'options', kValidateObjectAllowObjectsAndNull);
467+
const preventCancel = !!(options?.preventCancel);
466468

467469
// eslint-disable-next-line no-use-before-define
468470
const reader = new ReadableStreamDefaultReader(this);
@@ -926,47 +928,62 @@ class ReadableStreamBYOBReader {
926928

927929
/**
928930
* @param {ArrayBufferView} view
931+
* @param {{
932+
* min? : number
933+
* }} [options]
929934
* @returns {Promise<{
930-
* view : ArrayBufferView,
935+
* value : ArrayBufferView,
931936
* done : boolean,
932937
* }>}
933938
*/
934-
read(view) {
939+
async read(view, options = kEmptyObject) {
935940
if (!isReadableStreamBYOBReader(this))
936-
return PromiseReject(new ERR_INVALID_THIS('ReadableStreamBYOBReader'));
941+
throw new ERR_INVALID_THIS('ReadableStreamBYOBReader');
937942
if (!isArrayBufferView(view)) {
938-
return PromiseReject(
939-
new ERR_INVALID_ARG_TYPE(
940-
'view',
941-
[
942-
'Buffer',
943-
'TypedArray',
944-
'DataView',
945-
],
946-
view));
943+
throw new ERR_INVALID_ARG_TYPE(
944+
'view',
945+
[
946+
'Buffer',
947+
'TypedArray',
948+
'DataView',
949+
],
950+
view,
951+
);
947952
}
953+
validateObject(options, 'options', kValidateObjectAllowObjectsAndNull);
948954

949955
const viewByteLength = ArrayBufferViewGetByteLength(view);
950956
const viewBuffer = ArrayBufferViewGetBuffer(view);
951957
const viewBufferByteLength = ArrayBufferPrototypeGetByteLength(viewBuffer);
952958

953959
if (viewByteLength === 0 || viewBufferByteLength === 0) {
954-
return PromiseReject(
955-
new ERR_INVALID_STATE.TypeError(
956-
'View or Viewed ArrayBuffer is zero-length or detached',
957-
),
958-
);
960+
throw new ERR_INVALID_STATE.TypeError(
961+
'View or Viewed ArrayBuffer is zero-length or detached');
959962
}
960963

961964
// Supposed to assert here that the view's buffer is not
962965
// detached, but there's no API available to use to check that.
966+
967+
const min = options?.min ?? 1;
968+
if (typeof min !== 'number')
969+
throw new ERR_INVALID_ARG_TYPE('options.min', 'number', min);
970+
if (!NumberIsInteger(min))
971+
throw new ERR_INVALID_ARG_VALUE('options.min', min, 'must be an integer');
972+
if (min <= 0)
973+
throw new ERR_INVALID_ARG_VALUE('options.min', min, 'must be greater than 0');
974+
if (!isDataView(view)) {
975+
if (min > TypedArrayPrototypeGetLength(view)) {
976+
throw new ERR_OUT_OF_RANGE('options.min', '<= view.length', min);
977+
}
978+
} else if (min > viewByteLength) {
979+
throw new ERR_OUT_OF_RANGE('options.min', '<= view.byteLength', min);
980+
}
981+
963982
if (this[kState].stream === undefined) {
964-
return PromiseReject(
965-
new ERR_INVALID_STATE.TypeError(
966-
'The reader is not attached to a stream'));
983+
throw new ERR_INVALID_STATE.TypeError('The reader is not attached to a stream');
967984
}
968985
const readIntoRequest = new ReadIntoRequest();
969-
readableStreamBYOBReaderRead(this, view, readIntoRequest);
986+
readableStreamBYOBReaderRead(this, view, min, readIntoRequest);
970987
return readIntoRequest.promise;
971988
}
972989

@@ -1880,7 +1897,7 @@ function readableByteStreamTee(stream) {
18801897
reading = false;
18811898
},
18821899
};
1883-
readableStreamBYOBReaderRead(reader, view, readIntoRequest);
1900+
readableStreamBYOBReaderRead(reader, view, 1, readIntoRequest);
18841901
}
18851902

18861903
function pull1Algorithm() {
@@ -2207,7 +2224,7 @@ function readableStreamReaderGenericRelease(reader) {
22072224
reader[kState].stream = undefined;
22082225
}
22092226

2210-
function readableStreamBYOBReaderRead(reader, view, readIntoRequest) {
2227+
function readableStreamBYOBReaderRead(reader, view, min, readIntoRequest) {
22112228
const {
22122229
stream,
22132230
} = reader[kState];
@@ -2220,6 +2237,7 @@ function readableStreamBYOBReaderRead(reader, view, readIntoRequest) {
22202237
readableByteStreamControllerPullInto(
22212238
stream[kState].controller,
22222239
view,
2240+
min,
22232241
readIntoRequest);
22242242
}
22252243

@@ -2492,7 +2510,7 @@ function readableByteStreamControllerClose(controller) {
24922510

24932511
if (pendingPullIntos.length) {
24942512
const firstPendingPullInto = pendingPullIntos[0];
2495-
if (firstPendingPullInto.bytesFilled > 0) {
2513+
if (firstPendingPullInto.bytesFilled % firstPendingPullInto.elementSize !== 0) {
24962514
const error = new ERR_INVALID_STATE.TypeError('Partial read');
24972515
readableByteStreamControllerError(controller, error);
24982516
throw error;
@@ -2509,7 +2527,7 @@ function readableByteStreamControllerCommitPullIntoDescriptor(stream, desc) {
25092527

25102528
let done = false;
25112529
if (stream[kState].state === 'closed') {
2512-
desc.bytesFilled = 0;
2530+
assert(desc.bytesFilled % desc.elementSize === 0);
25132531
done = true;
25142532
}
25152533

@@ -2598,6 +2616,7 @@ function readableByteStreamControllerHandleQueueDrain(controller) {
25982616
function readableByteStreamControllerPullInto(
25992617
controller,
26002618
view,
2619+
min,
26012620
readIntoRequest) {
26022621
const {
26032622
closeRequested,
@@ -2610,6 +2629,11 @@ function readableByteStreamControllerPullInto(
26102629
elementSize = view.constructor.BYTES_PER_ELEMENT;
26112630
ctor = view.constructor;
26122631
}
2632+
2633+
const minimumFill = min * elementSize;
2634+
assert(minimumFill >= elementSize && minimumFill <= view.byteLength);
2635+
assert(minimumFill % elementSize === 0);
2636+
26132637
const buffer = ArrayBufferViewGetBuffer(view);
26142638
const byteOffset = ArrayBufferViewGetByteOffset(view);
26152639
const byteLength = ArrayBufferViewGetByteLength(view);
@@ -2628,6 +2652,7 @@ function readableByteStreamControllerPullInto(
26282652
byteOffset,
26292653
byteLength,
26302654
bytesFilled: 0,
2655+
minimumFill,
26312656
elementSize,
26322657
ctor,
26332658
type: 'byob',
@@ -2715,7 +2740,7 @@ function readableByteStreamControllerRespond(controller, bytesWritten) {
27152740
}
27162741

27172742
function readableByteStreamControllerRespondInClosedState(controller, desc) {
2718-
assert(!desc.bytesFilled);
2743+
assert(desc.bytesFilled % desc.elementSize === 0);
27192744
if (desc.type === 'none') {
27202745
readableByteStreamControllerShiftPendingPullInto(controller);
27212746
}
@@ -2892,17 +2917,18 @@ function readableByteStreamControllerFillPullIntoDescriptorFromQueue(
28922917
byteLength,
28932918
byteOffset,
28942919
bytesFilled,
2920+
minimumFill,
28952921
elementSize,
28962922
} = desc;
2897-
const currentAlignedBytes = bytesFilled - (bytesFilled % elementSize);
28982923
const maxBytesToCopy = MathMin(
28992924
controller[kState].queueTotalSize,
29002925
byteLength - bytesFilled);
29012926
const maxBytesFilled = bytesFilled + maxBytesToCopy;
29022927
const maxAlignedBytes = maxBytesFilled - (maxBytesFilled % elementSize);
29032928
let totalBytesToCopyRemaining = maxBytesToCopy;
29042929
let ready = false;
2905-
if (maxAlignedBytes > currentAlignedBytes) {
2930+
assert(bytesFilled < minimumFill);
2931+
if (maxAlignedBytes >= minimumFill) {
29062932
totalBytesToCopyRemaining = maxAlignedBytes - bytesFilled;
29072933
ready = true;
29082934
}
@@ -2945,7 +2971,7 @@ function readableByteStreamControllerFillPullIntoDescriptorFromQueue(
29452971
if (!ready) {
29462972
assert(!controller[kState].queueTotalSize);
29472973
assert(desc.bytesFilled > 0);
2948-
assert(desc.bytesFilled < elementSize);
2974+
assert(desc.bytesFilled < minimumFill);
29492975
}
29502976
return ready;
29512977
}
@@ -3001,7 +3027,7 @@ function readableByteStreamControllerRespondInReadableState(
30013027
return;
30023028
}
30033029

3004-
if (desc.bytesFilled < desc.elementSize)
3030+
if (desc.bytesFilled < desc.minimumFill)
30053031
return;
30063032

30073033
readableByteStreamControllerShiftPendingPullInto(controller);
@@ -3186,6 +3212,7 @@ function readableByteStreamControllerPullSteps(controller, readRequest) {
31863212
byteOffset: 0,
31873213
byteLength: autoAllocateChunkSize,
31883214
bytesFilled: 0,
3215+
minimumFill: 1,
31893216
elementSize: 1,
31903217
ctor: Uint8Array,
31913218
type: 'default',

lib/internal/webstreams/transformstream.js

+10-1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,12 @@ const {
2929
kEnumerableProperty,
3030
} = require('internal/util');
3131

32+
const {
33+
validateObject,
34+
kValidateObjectAllowObjects,
35+
kValidateObjectAllowObjectsAndNull,
36+
} = require('internal/validators');
37+
3238
const {
3339
kDeserialize,
3440
kTransfer,
@@ -119,9 +125,12 @@ class TransformStream {
119125
* @param {QueuingStrategy} [readableStrategy]
120126
*/
121127
constructor(
122-
transformer = null,
128+
transformer = kEmptyObject,
123129
writableStrategy = kEmptyObject,
124130
readableStrategy = kEmptyObject) {
131+
validateObject(transformer, 'transformer', kValidateObjectAllowObjects);
132+
validateObject(writableStrategy, 'writableStrategy', kValidateObjectAllowObjectsAndNull);
133+
validateObject(readableStrategy, 'readableStrategy', kValidateObjectAllowObjectsAndNull);
125134
const readableType = transformer?.readableType;
126135
const writableType = transformer?.writableType;
127136
const start = transformer?.start;

0 commit comments

Comments
 (0)