Skip to content

Commit b29d927

Browse files
rluvatonruyadorno
authored andcommitted
stream: improve readable webstream pipeTo
PR-URL: #49690 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Yagiz Nizipli <[email protected]> Reviewed-By: Moshe Atlow <[email protected]>
1 parent a304d1e commit b29d927

File tree

1 file changed

+34
-17
lines changed

1 file changed

+34
-17
lines changed

lib/internal/webstreams/readablestream.js

+34-17
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ const {
1414
ObjectCreate,
1515
ObjectDefineProperties,
1616
ObjectSetPrototypeOf,
17-
Promise,
1817
PromisePrototypeThen,
1918
PromiseResolve,
2019
PromiseReject,
@@ -1354,7 +1353,9 @@ function readableStreamPipeTo(
13541353

13551354
const promise = createDeferredPromise();
13561355

1357-
let currentWrite = PromiseResolve();
1356+
const state = {
1357+
currentWrite: PromiseResolve(),
1358+
};
13581359

13591360
// The error here can be undefined. The rejected arg
13601361
// tells us that the promise must be rejected even
@@ -1371,9 +1372,9 @@ function readableStreamPipeTo(
13711372
}
13721373

13731374
async function waitForCurrentWrite() {
1374-
const write = currentWrite;
1375+
const write = state.currentWrite;
13751376
await write;
1376-
if (write !== currentWrite)
1377+
if (write !== state.currentWrite)
13771378
await waitForCurrentWrite();
13781379
}
13791380

@@ -1464,20 +1465,14 @@ function readableStreamPipeTo(
14641465
async function step() {
14651466
if (shuttingDown)
14661467
return true;
1468+
14671469
await writer[kState].ready.promise;
1468-
return new Promise((resolve, reject) => {
1469-
readableStreamDefaultReaderRead(
1470-
reader,
1471-
{
1472-
[kChunk](chunk) {
1473-
currentWrite = writableStreamDefaultWriterWrite(writer, chunk);
1474-
setPromiseHandled(currentWrite);
1475-
resolve(false);
1476-
},
1477-
[kClose]: () => resolve(true),
1478-
[kError]: reject,
1479-
});
1480-
});
1470+
1471+
const promise = createDeferredPromise();
1472+
// eslint-disable-next-line no-use-before-define
1473+
readableStreamDefaultReaderRead(reader, new PipeToReadableStreamReadRequest(writer, state, promise));
1474+
1475+
return promise.promise;
14811476
}
14821477

14831478
async function run() {
@@ -1539,6 +1534,28 @@ function readableStreamPipeTo(
15391534
return promise.promise;
15401535
}
15411536

1537+
class PipeToReadableStreamReadRequest {
1538+
constructor(writer, state, promise) {
1539+
this.writer = writer;
1540+
this.state = state;
1541+
this.promise = promise;
1542+
}
1543+
1544+
[kChunk](chunk) {
1545+
this.state.currentWrite = writableStreamDefaultWriterWrite(this.writer, chunk);
1546+
setPromiseHandled(this.state.currentWrite);
1547+
this.promise.resolve(false);
1548+
}
1549+
1550+
[kClose]() {
1551+
this.promise.resolve(true);
1552+
}
1553+
1554+
[kError](error) {
1555+
this.promise.reject(error);
1556+
}
1557+
}
1558+
15421559
function readableStreamTee(stream, cloneForBranch2) {
15431560
if (isReadableByteStreamController(stream[kState].controller)) {
15441561
return readableByteStreamTee(stream);

0 commit comments

Comments
 (0)