Skip to content

Commit 6ed3367

Browse files
daeyeonjuanarbol
authored andcommitted
stream: add ReadableByteStream.tee()
This supports teeing readable byte streams to meet the latest web streams standards. Signed-off-by: Daeyeon Jeong [email protected] PR-URL: #44505 Refs: https://streams.spec.whatwg.org/#readable-stream-tee Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Benjamin Gruenbaum <[email protected]> Reviewed-By: Minwoo Jung <[email protected]> Reviewed-By: Antoine du Hamel <[email protected]>
1 parent 9d24c7a commit 6ed3367

File tree

5 files changed

+342
-12
lines changed

5 files changed

+342
-12
lines changed

doc/api/webstreams.md

+4
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,10 @@ is active.
303303

304304
<!-- YAML
305305
added: v16.5.0
306+
changes:
307+
- version: REPLACEME
308+
pr-url: https://github.com/nodejs/node/pull/44505
309+
description: Support teeing a readable byte stream.
306310
-->
307311

308312
* Returns: {ReadableStream\[]}

lib/internal/webstreams/readablestream.js

+291-11
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ const {
9393
ArrayBufferViewGetByteOffset,
9494
ArrayBufferGetByteLength,
9595
AsyncIterator,
96+
cloneAsUint8Array,
9697
copyArrayBuffer,
9798
customInspect,
9899
dequeueValue,
@@ -211,6 +212,7 @@ class ReadableStream {
211212
throw new ERR_INVALID_ARG_VALUE('source', 'Object', source);
212213
this[kState] = {
213214
disturbed: false,
215+
reader: undefined,
214216
state: 'readable',
215217
storedError: undefined,
216218
stream: undefined,
@@ -1103,7 +1105,6 @@ class ReadableByteStreamController {
11031105
chunk);
11041106
}
11051107
const chunkByteLength = ArrayBufferViewGetByteLength(chunk);
1106-
const chunkByteOffset = ArrayBufferViewGetByteOffset(chunk);
11071108
const chunkBuffer = ArrayBufferViewGetBuffer(chunk);
11081109
const chunkBufferByteLength = ArrayBufferGetByteLength(chunkBuffer);
11091110
if (chunkByteLength === 0 || chunkBufferByteLength === 0) {
@@ -1114,11 +1115,7 @@ class ReadableByteStreamController {
11141115
throw new ERR_INVALID_STATE.TypeError('Controller is already closed');
11151116
if (this[kState].stream[kState].state !== 'readable')
11161117
throw new ERR_INVALID_STATE.TypeError('ReadableStream is already closed');
1117-
readableByteStreamControllerEnqueue(
1118-
this,
1119-
chunkBuffer,
1120-
chunkByteLength,
1121-
chunkByteOffset);
1118+
readableByteStreamControllerEnqueue(this, chunk);
11221119
}
11231120

11241121
/**
@@ -1416,6 +1413,13 @@ function readableStreamPipeTo(
14161413
}
14171414

14181415
function readableStreamTee(stream, cloneForBranch2) {
1416+
if (isReadableByteStreamController(stream[kState].controller)) {
1417+
return readableByteStreamTee(stream);
1418+
}
1419+
return readableStreamDefaultTee(stream, cloneForBranch2);
1420+
}
1421+
1422+
function readableStreamDefaultTee(stream, cloneForBranch2) {
14191423
const reader = new ReadableStreamDefaultReader(stream);
14201424
let reading = false;
14211425
let canceled1 = false;
@@ -1510,6 +1514,282 @@ function readableStreamTee(stream, cloneForBranch2) {
15101514
return [branch1, branch2];
15111515
}
15121516

1517+
function readableByteStreamTee(stream) {
1518+
assert(isReadableStream(stream));
1519+
assert(isReadableByteStreamController(stream[kState].controller));
1520+
1521+
let reader = new ReadableStreamDefaultReader(stream);
1522+
let reading = false;
1523+
let readAgainForBranch1 = false;
1524+
let readAgainForBranch2 = false;
1525+
let canceled1 = false;
1526+
let canceled2 = false;
1527+
let reason1;
1528+
let reason2;
1529+
let branch1;
1530+
let branch2;
1531+
const cancelDeferred = createDeferredPromise();
1532+
1533+
function forwardReaderError(thisReader) {
1534+
PromisePrototypeThen(
1535+
thisReader[kState].close.promise,
1536+
undefined,
1537+
(error) => {
1538+
if (thisReader !== reader) {
1539+
return;
1540+
}
1541+
readableStreamDefaultControllerError(branch1[kState].controller, error);
1542+
readableStreamDefaultControllerError(branch2[kState].controller, error);
1543+
if (!canceled1 || !canceled2) {
1544+
cancelDeferred.resolve();
1545+
}
1546+
}
1547+
);
1548+
}
1549+
1550+
function pullWithDefaultReader() {
1551+
if (isReadableStreamBYOBReader(reader)) {
1552+
reader = new ReadableStreamDefaultReader(stream);
1553+
forwardReaderError(reader);
1554+
}
1555+
1556+
const readRequest = {
1557+
[kChunk](chunk) {
1558+
queueMicrotask(() => {
1559+
readAgainForBranch1 = false;
1560+
readAgainForBranch2 = false;
1561+
const chunk1 = chunk;
1562+
let chunk2 = chunk;
1563+
1564+
if (!canceled1 && !canceled2) {
1565+
try {
1566+
chunk2 = cloneAsUint8Array(chunk);
1567+
} catch (error) {
1568+
readableByteStreamControllerError(
1569+
branch1[kState].controller,
1570+
error
1571+
);
1572+
readableByteStreamControllerError(
1573+
branch2[kState].controller,
1574+
error
1575+
);
1576+
cancelDeferred.resolve(readableStreamCancel(stream, error));
1577+
return;
1578+
}
1579+
}
1580+
if (!canceled1) {
1581+
readableByteStreamControllerEnqueue(
1582+
branch1[kState].controller,
1583+
chunk1
1584+
);
1585+
}
1586+
if (!canceled2) {
1587+
readableByteStreamControllerEnqueue(
1588+
branch2[kState].controller,
1589+
chunk2
1590+
);
1591+
}
1592+
reading = false;
1593+
1594+
if (readAgainForBranch1) {
1595+
pull1Algorithm();
1596+
} else if (readAgainForBranch2) {
1597+
pull2Algorithm();
1598+
}
1599+
});
1600+
},
1601+
[kClose]() {
1602+
reading = false;
1603+
1604+
if (!canceled1) {
1605+
readableByteStreamControllerClose(branch1[kState].controller);
1606+
}
1607+
if (!canceled2) {
1608+
readableByteStreamControllerClose(branch2[kState].controller);
1609+
}
1610+
if (branch1[kState].controller[kState].pendingPullIntos.length > 0) {
1611+
readableByteStreamControllerRespond(branch1[kState].controller, 0);
1612+
}
1613+
if (branch2[kState].controller[kState].pendingPullIntos.length > 0) {
1614+
readableByteStreamControllerRespond(branch2[kState].controller, 0);
1615+
}
1616+
if (!canceled1 || !canceled2) {
1617+
cancelDeferred.resolve();
1618+
}
1619+
},
1620+
[kError]() {
1621+
reading = false;
1622+
},
1623+
};
1624+
1625+
readableStreamDefaultReaderRead(reader, readRequest);
1626+
}
1627+
1628+
function pullWithBYOBReader(view, forBranch2) {
1629+
if (isReadableStreamDefaultReader(reader)) {
1630+
reader = new ReadableStreamBYOBReader(stream);
1631+
forwardReaderError(reader);
1632+
}
1633+
1634+
const byobBranch = forBranch2 === true ? branch2 : branch1;
1635+
const otherBranch = forBranch2 === false ? branch2 : branch1;
1636+
const readIntoRequest = {
1637+
[kChunk](chunk) {
1638+
queueMicrotask(() => {
1639+
readAgainForBranch1 = false;
1640+
readAgainForBranch2 = false;
1641+
const byobCanceled = forBranch2 === true ? canceled2 : canceled1;
1642+
const otherCanceled = forBranch2 === false ? canceled2 : canceled1;
1643+
1644+
if (!otherCanceled) {
1645+
let clonedChunk;
1646+
1647+
try {
1648+
clonedChunk = cloneAsUint8Array(chunk);
1649+
} catch (error) {
1650+
readableByteStreamControllerError(
1651+
byobBranch[kState].controller,
1652+
error
1653+
);
1654+
readableByteStreamControllerError(
1655+
otherBranch[kState].controller,
1656+
error
1657+
);
1658+
cancelDeferred.resolve(readableStreamCancel(stream, error));
1659+
return;
1660+
}
1661+
if (!byobCanceled) {
1662+
readableByteStreamControllerRespondWithNewView(
1663+
byobBranch[kState].controller,
1664+
chunk
1665+
);
1666+
}
1667+
1668+
readableByteStreamControllerEnqueue(
1669+
otherBranch[kState].controller,
1670+
clonedChunk
1671+
);
1672+
} else if (!byobCanceled) {
1673+
readableByteStreamControllerRespondWithNewView(
1674+
byobBranch[kState].controller,
1675+
chunk
1676+
);
1677+
}
1678+
reading = false;
1679+
1680+
if (readAgainForBranch1) {
1681+
pull1Algorithm();
1682+
} else if (readAgainForBranch2) {
1683+
pull2Algorithm();
1684+
}
1685+
});
1686+
},
1687+
[kClose](chunk) {
1688+
reading = false;
1689+
1690+
const byobCanceled = forBranch2 === true ? canceled2 : canceled1;
1691+
const otherCanceled = forBranch2 === false ? canceled2 : canceled1;
1692+
1693+
if (!byobCanceled) {
1694+
readableByteStreamControllerClose(byobBranch[kState].controller);
1695+
}
1696+
if (!otherCanceled) {
1697+
readableByteStreamControllerClose(otherBranch[kState].controller);
1698+
}
1699+
if (chunk !== undefined) {
1700+
if (!byobCanceled) {
1701+
readableByteStreamControllerRespondWithNewView(
1702+
byobBranch[kState].controller,
1703+
chunk
1704+
);
1705+
}
1706+
if (
1707+
!otherCanceled &&
1708+
otherBranch[kState].controller[kState].pendingPullIntos.length > 0
1709+
) {
1710+
readableByteStreamControllerRespond(
1711+
otherBranch[kState].controller,
1712+
0
1713+
);
1714+
}
1715+
}
1716+
if (!byobCanceled || !otherCanceled) {
1717+
cancelDeferred.resolve();
1718+
}
1719+
},
1720+
[kError]() {
1721+
reading = false;
1722+
},
1723+
};
1724+
readableStreamBYOBReaderRead(reader, view, readIntoRequest);
1725+
}
1726+
1727+
function pull1Algorithm() {
1728+
if (reading) {
1729+
readAgainForBranch1 = true;
1730+
return PromiseResolve();
1731+
}
1732+
reading = true;
1733+
1734+
const byobRequest = branch1[kState].controller.byobRequest;
1735+
if (byobRequest === null) {
1736+
pullWithDefaultReader();
1737+
} else {
1738+
pullWithBYOBReader(byobRequest[kState].view, false);
1739+
}
1740+
return PromiseResolve();
1741+
}
1742+
1743+
function pull2Algorithm() {
1744+
if (reading) {
1745+
readAgainForBranch2 = true;
1746+
return PromiseResolve();
1747+
}
1748+
reading = true;
1749+
1750+
const byobRequest = branch2[kState].controller.byobRequest;
1751+
if (byobRequest === null) {
1752+
pullWithDefaultReader();
1753+
} else {
1754+
pullWithBYOBReader(byobRequest[kState].view, true);
1755+
}
1756+
return PromiseResolve();
1757+
}
1758+
1759+
function cancel1Algorithm(reason) {
1760+
canceled1 = true;
1761+
reason1 = reason;
1762+
if (canceled2) {
1763+
cancelDeferred.resolve(readableStreamCancel(stream, [reason1, reason2]));
1764+
}
1765+
return cancelDeferred.promise;
1766+
}
1767+
1768+
function cancel2Algorithm(reason) {
1769+
canceled2 = true;
1770+
reason2 = reason;
1771+
if (canceled1) {
1772+
cancelDeferred.resolve(readableStreamCancel(stream, [reason1, reason2]));
1773+
}
1774+
return cancelDeferred.promise;
1775+
}
1776+
1777+
branch1 = new ReadableStream({
1778+
type: 'bytes',
1779+
pull: pull1Algorithm,
1780+
cancel: cancel1Algorithm,
1781+
});
1782+
branch2 = new ReadableStream({
1783+
type: 'bytes',
1784+
pull: pull2Algorithm,
1785+
cancel: cancel2Algorithm,
1786+
});
1787+
1788+
forwardReaderError(reader);
1789+
1790+
return [branch1, branch2];
1791+
}
1792+
15131793
function readableByteStreamControllerConvertPullIntoDescriptor(desc) {
15141794
const {
15151795
buffer,
@@ -2273,18 +2553,18 @@ function readableByteStreamControllerFillHeadPullIntoDescriptor(
22732553
desc.bytesFilled += size;
22742554
}
22752555

2276-
function readableByteStreamControllerEnqueue(
2277-
controller,
2278-
buffer,
2279-
byteLength,
2280-
byteOffset) {
2556+
function readableByteStreamControllerEnqueue(controller, chunk) {
22812557
const {
22822558
closeRequested,
22832559
pendingPullIntos,
22842560
queue,
22852561
stream,
22862562
} = controller[kState];
22872563

2564+
const buffer = ArrayBufferViewGetBuffer(chunk);
2565+
const byteOffset = ArrayBufferViewGetByteOffset(chunk);
2566+
const byteLength = ArrayBufferViewGetByteLength(chunk);
2567+
22882568
if (closeRequested || stream[kState].state !== 'readable')
22892569
return;
22902570

0 commit comments

Comments
 (0)