Skip to content

Commit ce71929

Browse files
daeyeonRafaelGSS
authored andcommitted
stream: handle a pending pull request from a released reader
In order to meet the specification, this includes mainly the followings: - Adding the 'release steps' to ReadableStreamController - Responding to a pull request from a released reader in ReadableByteStreamController Signed-off-by: Daeyeon Jeong [email protected] PR-URL: #44702 Refs: https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontroller-releasesteps Refs: https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond-in-readable-state Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Minwoo Jung <[email protected]>
1 parent e353bf7 commit ce71929

File tree

3 files changed

+86
-8
lines changed

3 files changed

+86
-8
lines changed

doc/api/webstreams.md

+4
Original file line numberDiff line numberDiff line change
@@ -651,6 +651,10 @@ Signals an error that causes the {ReadableStream} to error and close.
651651
652652
<!-- YAML
653653
added: v16.5.0
654+
changes:
655+
- version: REPLACEME
656+
pr-url: https://github.com/nodejs/node/pull/44702
657+
description: Support handling a BYOB pull request from a released reader.
654658
-->
655659
656660
Every {ReadableStream} has a controller that is responsible for

lib/internal/webstreams/readablestream.js

+82
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ const kClose = Symbol('kClose');
139139
const kChunk = Symbol('kChunk');
140140
const kError = Symbol('kError');
141141
const kPull = Symbol('kPull');
142+
const kRelease = Symbol('kRelease');
142143

143144
/**
144145
* @typedef {import('../abort_controller').AbortSignal} AbortSignal
@@ -1019,6 +1020,8 @@ class ReadableStreamDefaultController {
10191020
readableStreamDefaultControllerPullSteps(this, readRequest);
10201021
}
10211022

1023+
[kRelease]() {}
1024+
10221025
[kInspect](depth, options) {
10231026
return customInspect(depth, options, this[kType], { });
10241027
}
@@ -1143,6 +1146,17 @@ class ReadableByteStreamController {
11431146
readableByteStreamControllerPullSteps(this, readRequest);
11441147
}
11451148

1149+
[kRelease]() {
1150+
const {
1151+
pendingPullIntos,
1152+
} = this[kState];
1153+
if (pendingPullIntos.length > 0) {
1154+
const firstPendingPullInto = pendingPullIntos[0];
1155+
firstPendingPullInto.type = 'none';
1156+
this[kState].pendingPullIntos = [firstPendingPullInto];
1157+
}
1158+
}
1159+
11461160
[kInspect](depth, options) {
11471161
return customInspect(depth, options, this[kType], { });
11481162
}
@@ -2060,6 +2074,9 @@ function readableStreamReaderGenericRelease(reader) {
20602074
};
20612075
}
20622076
setPromiseHandled(reader[kState].close.promise);
2077+
2078+
stream[kState].controller[kRelease]();
2079+
20632080
stream[kState].reader = undefined;
20642081
reader[kState].stream = undefined;
20652082
}
@@ -2365,6 +2382,8 @@ function readableByteStreamControllerClose(controller) {
23652382

23662383
function readableByteStreamControllerCommitPullIntoDescriptor(stream, desc) {
23672384
assert(stream[kState].state !== 'errored');
2385+
assert(desc.type !== 'none');
2386+
23682387
let done = false;
23692388
if (stream[kState].state === 'closed') {
23702389
desc.bytesFilled = 0;
@@ -2574,6 +2593,9 @@ function readableByteStreamControllerRespond(controller, bytesWritten) {
25742593

25752594
function readableByteStreamControllerRespondInClosedState(controller, desc) {
25762595
assert(!desc.bytesFilled);
2596+
if (desc.type === 'none') {
2597+
readableByteStreamControllerShiftPendingPullInto(controller);
2598+
}
25772599
const {
25782600
stream,
25792601
} = controller[kState];
@@ -2663,6 +2685,31 @@ function readableByteStreamControllerEnqueue(controller, chunk) {
26632685
readableByteStreamControllerCallPullIfNeeded(controller);
26642686
}
26652687

2688+
function readableByteStreamControllerEnqueueClonedChunkToQueue(
2689+
controller,
2690+
buffer,
2691+
byteOffset,
2692+
byteLength
2693+
) {
2694+
let cloneResult;
2695+
try {
2696+
cloneResult = ArrayBufferPrototypeSlice(
2697+
buffer,
2698+
byteOffset,
2699+
byteOffset + byteLength
2700+
);
2701+
} catch (error) {
2702+
readableByteStreamControllerError(controller, error);
2703+
throw error;
2704+
}
2705+
readableByteStreamControllerEnqueueChunkToQueue(
2706+
controller,
2707+
cloneResult,
2708+
0,
2709+
byteLength
2710+
);
2711+
}
2712+
26662713
function readableByteStreamControllerEnqueueChunkToQueue(
26672714
controller,
26682715
buffer,
@@ -2678,6 +2725,29 @@ function readableByteStreamControllerEnqueueChunkToQueue(
26782725
controller[kState].queueTotalSize += byteLength;
26792726
}
26802727

2728+
function readableByteStreamControllerEnqueueDetachedPullIntoToQueue(
2729+
controller,
2730+
desc
2731+
) {
2732+
const {
2733+
buffer,
2734+
byteOffset,
2735+
bytesFilled,
2736+
type,
2737+
} = desc;
2738+
assert(type === 'none');
2739+
2740+
if (bytesFilled > 0) {
2741+
readableByteStreamControllerEnqueueClonedChunkToQueue(
2742+
controller,
2743+
buffer,
2744+
byteOffset,
2745+
bytesFilled
2746+
);
2747+
}
2748+
readableByteStreamControllerShiftPendingPullInto(controller);
2749+
}
2750+
26812751
function readableByteStreamControllerFillPullIntoDescriptorFromQueue(
26822752
controller,
26832753
desc) {
@@ -2773,6 +2843,7 @@ function readableByteStreamControllerRespondInReadableState(
27732843
buffer,
27742844
bytesFilled,
27752845
byteLength,
2846+
type,
27762847
} = desc;
27772848

27782849
if (bytesFilled + bytesWritten > byteLength)
@@ -2783,6 +2854,17 @@ function readableByteStreamControllerRespondInReadableState(
27832854
bytesWritten,
27842855
desc);
27852856

2857+
if (type === 'none') {
2858+
readableByteStreamControllerEnqueueDetachedPullIntoToQueue(
2859+
controller,
2860+
desc
2861+
);
2862+
readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(
2863+
controller
2864+
);
2865+
return;
2866+
}
2867+
27862868
if (desc.bytesFilled < desc.elementSize)
27872869
return;
27882870

test/wpt/status/streams.json

-8
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,9 @@
1616
"fail": {
1717
"expected": [
1818
"ReadableStream with byte source: enqueue() discards auto-allocated BYOB request",
19-
"ReadableStream with byte source: releaseLock() with pending read(view), read(view) on second reader, respond()",
20-
"ReadableStream with byte source: releaseLock() with pending read(view), read(view) on second reader with 1 element Uint16Array, respond(1)",
21-
"ReadableStream with byte source: releaseLock() with pending read(view), read(view) on second reader with 2 element Uint8Array, respond(3)",
22-
"ReadableStream with byte source: releaseLock() with pending read(view), read(view) on second reader, respondWithNewView()",
2319
"ReadableStream with byte source: releaseLock() with pending read(view), read(view) on second reader, enqueue()",
24-
"ReadableStream with byte source: releaseLock() with pending read(view), read(view) on second reader, close(), respond(0)",
25-
"ReadableStream with byte source: autoAllocateChunkSize, releaseLock() with pending read(), read() on second reader, respond()",
2620
"ReadableStream with byte source: autoAllocateChunkSize, releaseLock() with pending read(), read() on second reader, enqueue()",
27-
"ReadableStream with byte source: autoAllocateChunkSize, releaseLock() with pending read(), read(view) on second reader, respond()",
2821
"ReadableStream with byte source: autoAllocateChunkSize, releaseLock() with pending read(), read(view) on second reader, enqueue()",
29-
"ReadableStream with byte source: read(view) with 1 element Uint16Array, respond(1), releaseLock(), read(view) on second reader with 1 element Uint16Array, respond(1)",
3022
"ReadableStream with byte source: read(view) with 1 element Uint16Array, respond(1), releaseLock(), read() on second reader, enqueue()"
3123
]
3224
}

0 commit comments

Comments
 (0)