Skip to content

Commit a057510

Browse files
Warkanlockdanielleadams
authored andcommitted
stream: initial approach to include strategy options on Readable.toWeb()
PR-URL: #43515 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Benjamin Gruenbaum <[email protected]> Reviewed-By: James M Snell <[email protected]>
1 parent f32aec8 commit a057510

File tree

4 files changed

+109
-12
lines changed

4 files changed

+109
-12
lines changed

doc/api/stream.md

+5-1
Original file line numberDiff line numberDiff line change
@@ -2789,7 +2789,7 @@ added:
27892789

27902790
Returns whether the stream is readable.
27912791

2792-
### `stream.Readable.toWeb(streamReadable)`
2792+
### `stream.Readable.toWeb(streamReadable[, options])`
27932793

27942794
<!-- YAML
27952795
added: v17.0.0
@@ -2798,6 +2798,10 @@ added: v17.0.0
27982798
> Stability: 1 - Experimental
27992799
28002800
* `streamReadable` {stream.Readable}
2801+
* `options` {Object}
2802+
* `strategy` {Object}
2803+
* `highWaterMark` {number}
2804+
* `size` {Function}
28012805
* Returns: {ReadableStream}
28022806

28032807
### `stream.Writable.fromWeb(writableStream[, options])`

lib/internal/streams/readable.js

+4-2
Original file line numberDiff line numberDiff line change
@@ -1405,8 +1405,10 @@ Readable.fromWeb = function(readableStream, options) {
14051405
options);
14061406
};
14071407

1408-
Readable.toWeb = function(streamReadable) {
1409-
return lazyWebStreams().newReadableStreamFromStreamReadable(streamReadable);
1408+
Readable.toWeb = function(streamReadable, options) {
1409+
return lazyWebStreams().newReadableStreamFromStreamReadable(
1410+
streamReadable,
1411+
options);
14101412
};
14111413

14121414
Readable.wrap = function(src, options) {

lib/internal/webstreams/adapters.js

+25-9
Original file line numberDiff line numberDiff line change
@@ -359,10 +359,14 @@ function newStreamWritableFromWritableStream(writableStream, options = kEmptyObj
359359
}
360360

361361
/**
362+
* @typedef {import('./queuingstrategies').QueuingStrategy} QueuingStrategy
362363
* @param {Readable} streamReadable
364+
* @param {{
365+
* strategy : QueuingStrategy
366+
* }} [options]
363367
* @returns {ReadableStream}
364368
*/
365-
function newReadableStreamFromStreamReadable(streamReadable) {
369+
function newReadableStreamFromStreamReadable(streamReadable, options = kEmptyObject) {
366370
// Not using the internal/streams/utils isReadableNodeStream utility
367371
// here because it will return false if streamReadable is a Duplex
368372
// whose readable option is false. For a Duplex that is not readable,
@@ -382,14 +386,26 @@ function newReadableStreamFromStreamReadable(streamReadable) {
382386

383387
const objectMode = streamReadable.readableObjectMode;
384388
const highWaterMark = streamReadable.readableHighWaterMark;
385-
// When not running in objectMode explicitly, we just fall
386-
// back to a minimal strategy that just specifies the highWaterMark
387-
// and no size algorithm. Using a ByteLengthQueuingStrategy here
388-
// is unnecessary.
389-
const strategy =
390-
objectMode ?
391-
new CountQueuingStrategy({ highWaterMark }) :
392-
{ highWaterMark };
389+
390+
const evaluateStrategyOrFallback = (strategy) => {
391+
// If there is a strategy available, use it
392+
if (strategy)
393+
return strategy;
394+
395+
if (objectMode) {
396+
// When running in objectMode explicitly but no strategy, we just fall
397+
// back to CountQueuingStrategy
398+
return new CountQueuingStrategy({ highWaterMark });
399+
}
400+
401+
// When not running in objectMode explicitly, we just fall
402+
// back to a minimal strategy that just specifies the highWaterMark
403+
// and no size algorithm. Using a ByteLengthQueuingStrategy here
404+
// is unnecessary.
405+
return { highWaterMark };
406+
};
407+
408+
const strategy = evaluateStrategyOrFallback(options?.strategy);
393409

394410
let controller;
395411

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
'use strict';
2+
const common = require('../common');
3+
const { Readable } = require('stream');
4+
const assert = require('assert');
5+
const { strictEqual } = require('assert');
6+
7+
{
8+
// Strategy 2
9+
const streamData = ['a', 'b', 'c', null];
10+
11+
// Fulfill a Readable object
12+
const readable = new Readable({
13+
read: common.mustCall(() => {
14+
process.nextTick(() => {
15+
readable.push(streamData.shift());
16+
});
17+
}, streamData.length),
18+
});
19+
20+
// Use helper to convert it to a Web ReadableStream using ByteLength strategy
21+
const readableStream = Readable.toWeb(readable, {
22+
strategy: new ByteLengthQueuingStrategy({ highWaterMark: 1 }),
23+
});
24+
25+
assert(!readableStream.locked);
26+
readableStream.getReader().read().then(common.mustCall());
27+
}
28+
29+
{
30+
// Strategy 2
31+
const streamData = ['a', 'b', 'c', null];
32+
33+
// Fulfill a Readable object
34+
const readable = new Readable({
35+
read: common.mustCall(() => {
36+
process.nextTick(() => {
37+
readable.push(streamData.shift());
38+
});
39+
}, streamData.length),
40+
});
41+
42+
// Use helper to convert it to a Web ReadableStream using Count strategy
43+
const readableStream = Readable.toWeb(readable, {
44+
strategy: new CountQueuingStrategy({ highWaterMark: 1 }),
45+
});
46+
47+
assert(!readableStream.locked);
48+
readableStream.getReader().read().then(common.mustCall());
49+
}
50+
51+
{
52+
const desireSizeExpected = 2;
53+
54+
const stringStream = new ReadableStream(
55+
{
56+
start(controller) {
57+
// Check if the strategy is being assigned on the init of the ReadableStream
58+
strictEqual(controller.desiredSize, desireSizeExpected);
59+
controller.enqueue('a');
60+
controller.enqueue('b');
61+
controller.close();
62+
},
63+
},
64+
new CountQueuingStrategy({ highWaterMark: desireSizeExpected })
65+
);
66+
67+
const reader = stringStream.getReader();
68+
69+
reader.read().then(common.mustCall());
70+
reader.read().then(common.mustCall());
71+
reader.read().then(({ value, done }) => {
72+
strictEqual(value, undefined);
73+
strictEqual(done, true);
74+
});
75+
}

0 commit comments

Comments
 (0)