Skip to content

Commit 4a77a11

Browse files
daeyeonRafaelGSS
authored andcommitted
stream: add ReadableByteStream.tee()
This supports teeing readable byte streams to meet the latest web streams standards. Signed-off-by: Daeyeon Jeong [email protected] PR-URL: #44505 Refs: https://streams.spec.whatwg.org/#readable-stream-tee Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Benjamin Gruenbaum <[email protected]> Reviewed-By: Minwoo Jung <[email protected]> Reviewed-By: Antoine du Hamel <[email protected]>
1 parent 8752854 commit 4a77a11

File tree

6 files changed

+343
-53
lines changed

6 files changed

+343
-53
lines changed

doc/api/webstreams.md

+4
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,10 @@ is active.
299299

300300
<!-- YAML
301301
added: v16.5.0
302+
changes:
303+
- version: REPLACEME
304+
pr-url: https://github.com/nodejs/node/pull/44505
305+
description: Support teeing a readable byte stream.
302306
-->
303307

304308
* Returns: {ReadableStream\[]}

lib/internal/webstreams/readablestream.js

+293-11
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ const {
9595
ArrayBufferViewGetByteOffset,
9696
ArrayBufferGetByteLength,
9797
AsyncIterator,
98+
cloneAsUint8Array,
9899
copyArrayBuffer,
99100
customInspect,
100101
dequeueValue,
@@ -215,6 +216,7 @@ class ReadableStream {
215216
throw new ERR_INVALID_ARG_VALUE('source', 'Object', source);
216217
this[kState] = {
217218
disturbed: false,
219+
reader: undefined,
218220
state: 'readable',
219221
storedError: undefined,
220222
stream: undefined,
@@ -1111,7 +1113,6 @@ class ReadableByteStreamController {
11111113
chunk);
11121114
}
11131115
const chunkByteLength = ArrayBufferViewGetByteLength(chunk);
1114-
const chunkByteOffset = ArrayBufferViewGetByteOffset(chunk);
11151116
const chunkBuffer = ArrayBufferViewGetBuffer(chunk);
11161117
const chunkBufferByteLength = ArrayBufferGetByteLength(chunkBuffer);
11171118
if (chunkByteLength === 0 || chunkBufferByteLength === 0) {
@@ -1122,11 +1123,7 @@ class ReadableByteStreamController {
11221123
throw new ERR_INVALID_STATE.TypeError('Controller is already closed');
11231124
if (this[kState].stream[kState].state !== 'readable')
11241125
throw new ERR_INVALID_STATE.TypeError('ReadableStream is already closed');
1125-
readableByteStreamControllerEnqueue(
1126-
this,
1127-
chunkBuffer,
1128-
chunkByteLength,
1129-
chunkByteOffset);
1126+
readableByteStreamControllerEnqueue(this, chunk);
11301127
}
11311128

11321129
/**
@@ -1430,6 +1427,13 @@ function readableStreamPipeTo(
14301427
}
14311428

14321429
function readableStreamTee(stream, cloneForBranch2) {
1430+
if (isReadableByteStreamController(stream[kState].controller)) {
1431+
return readableByteStreamTee(stream);
1432+
}
1433+
return readableStreamDefaultTee(stream, cloneForBranch2);
1434+
}
1435+
1436+
function readableStreamDefaultTee(stream, cloneForBranch2) {
14331437
const reader = new ReadableStreamDefaultReader(stream);
14341438
let reading = false;
14351439
let canceled1 = false;
@@ -1524,6 +1528,284 @@ function readableStreamTee(stream, cloneForBranch2) {
15241528
return [branch1, branch2];
15251529
}
15261530

1531+
function readableByteStreamTee(stream) {
1532+
assert(isReadableStream(stream));
1533+
assert(isReadableByteStreamController(stream[kState].controller));
1534+
1535+
let reader = new ReadableStreamDefaultReader(stream);
1536+
let reading = false;
1537+
let readAgainForBranch1 = false;
1538+
let readAgainForBranch2 = false;
1539+
let canceled1 = false;
1540+
let canceled2 = false;
1541+
let reason1;
1542+
let reason2;
1543+
let branch1;
1544+
let branch2;
1545+
const cancelDeferred = createDeferredPromise();
1546+
1547+
function forwardReaderError(thisReader) {
1548+
PromisePrototypeThen(
1549+
thisReader[kState].close.promise,
1550+
undefined,
1551+
(error) => {
1552+
if (thisReader !== reader) {
1553+
return;
1554+
}
1555+
readableStreamDefaultControllerError(branch1[kState].controller, error);
1556+
readableStreamDefaultControllerError(branch2[kState].controller, error);
1557+
if (!canceled1 || !canceled2) {
1558+
cancelDeferred.resolve();
1559+
}
1560+
}
1561+
);
1562+
}
1563+
1564+
function pullWithDefaultReader() {
1565+
if (isReadableStreamBYOBReader(reader)) {
1566+
readableStreamBYOBReaderRelease(reader);
1567+
reader = new ReadableStreamDefaultReader(stream);
1568+
forwardReaderError(reader);
1569+
}
1570+
1571+
const readRequest = {
1572+
[kChunk](chunk) {
1573+
queueMicrotask(() => {
1574+
readAgainForBranch1 = false;
1575+
readAgainForBranch2 = false;
1576+
const chunk1 = chunk;
1577+
let chunk2 = chunk;
1578+
1579+
if (!canceled1 && !canceled2) {
1580+
try {
1581+
chunk2 = cloneAsUint8Array(chunk);
1582+
} catch (error) {
1583+
readableByteStreamControllerError(
1584+
branch1[kState].controller,
1585+
error
1586+
);
1587+
readableByteStreamControllerError(
1588+
branch2[kState].controller,
1589+
error
1590+
);
1591+
cancelDeferred.resolve(readableStreamCancel(stream, error));
1592+
return;
1593+
}
1594+
}
1595+
if (!canceled1) {
1596+
readableByteStreamControllerEnqueue(
1597+
branch1[kState].controller,
1598+
chunk1
1599+
);
1600+
}
1601+
if (!canceled2) {
1602+
readableByteStreamControllerEnqueue(
1603+
branch2[kState].controller,
1604+
chunk2
1605+
);
1606+
}
1607+
reading = false;
1608+
1609+
if (readAgainForBranch1) {
1610+
pull1Algorithm();
1611+
} else if (readAgainForBranch2) {
1612+
pull2Algorithm();
1613+
}
1614+
});
1615+
},
1616+
[kClose]() {
1617+
reading = false;
1618+
1619+
if (!canceled1) {
1620+
readableByteStreamControllerClose(branch1[kState].controller);
1621+
}
1622+
if (!canceled2) {
1623+
readableByteStreamControllerClose(branch2[kState].controller);
1624+
}
1625+
if (branch1[kState].controller[kState].pendingPullIntos.length > 0) {
1626+
readableByteStreamControllerRespond(branch1[kState].controller, 0);
1627+
}
1628+
if (branch2[kState].controller[kState].pendingPullIntos.length > 0) {
1629+
readableByteStreamControllerRespond(branch2[kState].controller, 0);
1630+
}
1631+
if (!canceled1 || !canceled2) {
1632+
cancelDeferred.resolve();
1633+
}
1634+
},
1635+
[kError]() {
1636+
reading = false;
1637+
},
1638+
};
1639+
1640+
readableStreamDefaultReaderRead(reader, readRequest);
1641+
}
1642+
1643+
function pullWithBYOBReader(view, forBranch2) {
1644+
if (isReadableStreamDefaultReader(reader)) {
1645+
readableStreamDefaultReaderRelease(reader);
1646+
reader = new ReadableStreamBYOBReader(stream);
1647+
forwardReaderError(reader);
1648+
}
1649+
1650+
const byobBranch = forBranch2 === true ? branch2 : branch1;
1651+
const otherBranch = forBranch2 === false ? branch2 : branch1;
1652+
const readIntoRequest = {
1653+
[kChunk](chunk) {
1654+
queueMicrotask(() => {
1655+
readAgainForBranch1 = false;
1656+
readAgainForBranch2 = false;
1657+
const byobCanceled = forBranch2 === true ? canceled2 : canceled1;
1658+
const otherCanceled = forBranch2 === false ? canceled2 : canceled1;
1659+
1660+
if (!otherCanceled) {
1661+
let clonedChunk;
1662+
1663+
try {
1664+
clonedChunk = cloneAsUint8Array(chunk);
1665+
} catch (error) {
1666+
readableByteStreamControllerError(
1667+
byobBranch[kState].controller,
1668+
error
1669+
);
1670+
readableByteStreamControllerError(
1671+
otherBranch[kState].controller,
1672+
error
1673+
);
1674+
cancelDeferred.resolve(readableStreamCancel(stream, error));
1675+
return;
1676+
}
1677+
if (!byobCanceled) {
1678+
readableByteStreamControllerRespondWithNewView(
1679+
byobBranch[kState].controller,
1680+
chunk
1681+
);
1682+
}
1683+
1684+
readableByteStreamControllerEnqueue(
1685+
otherBranch[kState].controller,
1686+
clonedChunk
1687+
);
1688+
} else if (!byobCanceled) {
1689+
readableByteStreamControllerRespondWithNewView(
1690+
byobBranch[kState].controller,
1691+
chunk
1692+
);
1693+
}
1694+
reading = false;
1695+
1696+
if (readAgainForBranch1) {
1697+
pull1Algorithm();
1698+
} else if (readAgainForBranch2) {
1699+
pull2Algorithm();
1700+
}
1701+
});
1702+
},
1703+
[kClose](chunk) {
1704+
reading = false;
1705+
1706+
const byobCanceled = forBranch2 === true ? canceled2 : canceled1;
1707+
const otherCanceled = forBranch2 === false ? canceled2 : canceled1;
1708+
1709+
if (!byobCanceled) {
1710+
readableByteStreamControllerClose(byobBranch[kState].controller);
1711+
}
1712+
if (!otherCanceled) {
1713+
readableByteStreamControllerClose(otherBranch[kState].controller);
1714+
}
1715+
if (chunk !== undefined) {
1716+
if (!byobCanceled) {
1717+
readableByteStreamControllerRespondWithNewView(
1718+
byobBranch[kState].controller,
1719+
chunk
1720+
);
1721+
}
1722+
if (
1723+
!otherCanceled &&
1724+
otherBranch[kState].controller[kState].pendingPullIntos.length > 0
1725+
) {
1726+
readableByteStreamControllerRespond(
1727+
otherBranch[kState].controller,
1728+
0
1729+
);
1730+
}
1731+
}
1732+
if (!byobCanceled || !otherCanceled) {
1733+
cancelDeferred.resolve();
1734+
}
1735+
},
1736+
[kError]() {
1737+
reading = false;
1738+
},
1739+
};
1740+
readableStreamBYOBReaderRead(reader, view, readIntoRequest);
1741+
}
1742+
1743+
function pull1Algorithm() {
1744+
if (reading) {
1745+
readAgainForBranch1 = true;
1746+
return PromiseResolve();
1747+
}
1748+
reading = true;
1749+
1750+
const byobRequest = branch1[kState].controller.byobRequest;
1751+
if (byobRequest === null) {
1752+
pullWithDefaultReader();
1753+
} else {
1754+
pullWithBYOBReader(byobRequest[kState].view, false);
1755+
}
1756+
return PromiseResolve();
1757+
}
1758+
1759+
function pull2Algorithm() {
1760+
if (reading) {
1761+
readAgainForBranch2 = true;
1762+
return PromiseResolve();
1763+
}
1764+
reading = true;
1765+
1766+
const byobRequest = branch2[kState].controller.byobRequest;
1767+
if (byobRequest === null) {
1768+
pullWithDefaultReader();
1769+
} else {
1770+
pullWithBYOBReader(byobRequest[kState].view, true);
1771+
}
1772+
return PromiseResolve();
1773+
}
1774+
1775+
function cancel1Algorithm(reason) {
1776+
canceled1 = true;
1777+
reason1 = reason;
1778+
if (canceled2) {
1779+
cancelDeferred.resolve(readableStreamCancel(stream, [reason1, reason2]));
1780+
}
1781+
return cancelDeferred.promise;
1782+
}
1783+
1784+
function cancel2Algorithm(reason) {
1785+
canceled2 = true;
1786+
reason2 = reason;
1787+
if (canceled1) {
1788+
cancelDeferred.resolve(readableStreamCancel(stream, [reason1, reason2]));
1789+
}
1790+
return cancelDeferred.promise;
1791+
}
1792+
1793+
branch1 = new ReadableStream({
1794+
type: 'bytes',
1795+
pull: pull1Algorithm,
1796+
cancel: cancel1Algorithm,
1797+
});
1798+
branch2 = new ReadableStream({
1799+
type: 'bytes',
1800+
pull: pull2Algorithm,
1801+
cancel: cancel2Algorithm,
1802+
});
1803+
1804+
forwardReaderError(reader);
1805+
1806+
return [branch1, branch2];
1807+
}
1808+
15271809
function readableByteStreamControllerConvertPullIntoDescriptor(desc) {
15281810
const {
15291811
buffer,
@@ -2317,18 +2599,18 @@ function readableByteStreamControllerFillHeadPullIntoDescriptor(
23172599
desc.bytesFilled += size;
23182600
}
23192601

2320-
function readableByteStreamControllerEnqueue(
2321-
controller,
2322-
buffer,
2323-
byteLength,
2324-
byteOffset) {
2602+
function readableByteStreamControllerEnqueue(controller, chunk) {
23252603
const {
23262604
closeRequested,
23272605
pendingPullIntos,
23282606
queue,
23292607
stream,
23302608
} = controller[kState];
23312609

2610+
const buffer = ArrayBufferViewGetBuffer(chunk);
2611+
const byteOffset = ArrayBufferViewGetByteOffset(chunk);
2612+
const byteLength = ArrayBufferViewGetByteLength(chunk);
2613+
23322614
if (closeRequested || stream[kState].state !== 'readable')
23332615
return;
23342616

0 commit comments

Comments
 (0)