Skip to content

Commit dffe7d8

Browse files
debadree25danielleadams
authored andcommitted
stream: enable usage of webstreams on compose()
Refs: #39316 PR-URL: #46675 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Benjamin Gruenbaum <[email protected]> Reviewed-By: Robert Nagy <[email protected]> Reviewed-By: James M Snell <[email protected]>
1 parent 06e0dd3 commit dffe7d8

File tree

4 files changed

+619
-52
lines changed

4 files changed

+619
-52
lines changed

doc/api/stream.md

+6-1
Original file line numberDiff line numberDiff line change
@@ -2769,11 +2769,16 @@ const server = http.createServer((req, res) => {
27692769

27702770
<!-- YAML
27712771
added: v16.9.0
2772+
changes:
2773+
- version: REPLACEME
2774+
pr-url: https://github.com/nodejs/node/pull/46675
2775+
description: Added support for webstreams.
27722776
-->
27732777

27742778
> Stability: 1 - `stream.compose` is experimental.
27752779
2776-
* `streams` {Stream\[]|Iterable\[]|AsyncIterable\[]|Function\[]}
2780+
* `streams` {Stream\[]|Iterable\[]|AsyncIterable\[]|Function\[]|
2781+
ReadableStream\[]|WritableStream\[]|TransformStream\[]}
27772782
* Returns: {stream.Duplex}
27782783

27792784
Combines two or more streams into a `Duplex` stream that writes to the

lib/internal/streams/compose.js

+128-50
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ const {
77
isNodeStream,
88
isReadable,
99
isWritable,
10+
isWebStream,
11+
isTransformStream,
12+
isWritableStream,
13+
isReadableStream,
1014
} = require('internal/streams/utils');
1115
const {
1216
AbortError,
@@ -15,6 +19,7 @@ const {
1519
ERR_MISSING_ARGS,
1620
},
1721
} = require('internal/errors');
22+
const eos = require('internal/streams/end-of-stream');
1823

1924
module.exports = function compose(...streams) {
2025
if (streams.length === 0) {
@@ -37,18 +42,32 @@ module.exports = function compose(...streams) {
3742
}
3843

3944
for (let n = 0; n < streams.length; ++n) {
40-
if (!isNodeStream(streams[n])) {
45+
if (!isNodeStream(streams[n]) && !isWebStream(streams[n])) {
4146
// TODO(ronag): Add checks for non streams.
4247
continue;
4348
}
44-
if (n < streams.length - 1 && !isReadable(streams[n])) {
49+
if (
50+
n < streams.length - 1 &&
51+
!(
52+
isReadable(streams[n]) ||
53+
isReadableStream(streams[n]) ||
54+
isTransformStream(streams[n])
55+
)
56+
) {
4557
throw new ERR_INVALID_ARG_VALUE(
4658
`streams[${n}]`,
4759
orgStreams[n],
4860
'must be readable',
4961
);
5062
}
51-
if (n > 0 && !isWritable(streams[n])) {
63+
if (
64+
n > 0 &&
65+
!(
66+
isWritable(streams[n]) ||
67+
isWritableStream(streams[n]) ||
68+
isTransformStream(streams[n])
69+
)
70+
) {
5271
throw new ERR_INVALID_ARG_VALUE(
5372
`streams[${n}]`,
5473
orgStreams[n],
@@ -79,8 +98,16 @@ module.exports = function compose(...streams) {
7998
const head = streams[0];
8099
const tail = pipeline(streams, onfinished);
81100

82-
const writable = !!isWritable(head);
83-
const readable = !!isReadable(tail);
101+
const writable = !!(
102+
isWritable(head) ||
103+
isWritableStream(head) ||
104+
isTransformStream(head)
105+
);
106+
const readable = !!(
107+
isReadable(tail) ||
108+
isReadableStream(tail) ||
109+
isTransformStream(tail)
110+
);
84111

85112
// TODO(ronag): Avoid double buffering.
86113
// Implement Writable/Readable/Duplex traits.
@@ -94,28 +121,55 @@ module.exports = function compose(...streams) {
94121
});
95122

96123
if (writable) {
97-
d._write = function(chunk, encoding, callback) {
98-
if (head.write(chunk, encoding)) {
99-
callback();
100-
} else {
101-
ondrain = callback;
102-
}
103-
};
104-
105-
d._final = function(callback) {
106-
head.end();
107-
onfinish = callback;
108-
};
124+
if (isNodeStream(head)) {
125+
d._write = function(chunk, encoding, callback) {
126+
if (head.write(chunk, encoding)) {
127+
callback();
128+
} else {
129+
ondrain = callback;
130+
}
131+
};
132+
133+
d._final = function(callback) {
134+
head.end();
135+
onfinish = callback;
136+
};
137+
138+
head.on('drain', function() {
139+
if (ondrain) {
140+
const cb = ondrain;
141+
ondrain = null;
142+
cb();
143+
}
144+
});
145+
} else if (isWebStream(head)) {
146+
const writable = isTransformStream(head) ? head.writable : head;
147+
const writer = writable.getWriter();
148+
149+
d._write = async function(chunk, encoding, callback) {
150+
try {
151+
await writer.ready;
152+
writer.write(chunk).catch(() => {});
153+
callback();
154+
} catch (err) {
155+
callback(err);
156+
}
157+
};
158+
159+
d._final = async function(callback) {
160+
try {
161+
await writer.ready;
162+
writer.close().catch(() => {});
163+
onfinish = callback;
164+
} catch (err) {
165+
callback(err);
166+
}
167+
};
168+
}
109169

110-
head.on('drain', function() {
111-
if (ondrain) {
112-
const cb = ondrain;
113-
ondrain = null;
114-
cb();
115-
}
116-
});
170+
const toRead = isTransformStream(tail) ? tail.readable : tail;
117171

118-
tail.on('finish', function() {
172+
eos(toRead, () => {
119173
if (onfinish) {
120174
const cb = onfinish;
121175
onfinish = null;
@@ -125,32 +179,54 @@ module.exports = function compose(...streams) {
125179
}
126180

127181
if (readable) {
128-
tail.on('readable', function() {
129-
if (onreadable) {
130-
const cb = onreadable;
131-
onreadable = null;
132-
cb();
133-
}
134-
});
135-
136-
tail.on('end', function() {
137-
d.push(null);
138-
});
139-
140-
d._read = function() {
141-
while (true) {
142-
const buf = tail.read();
143-
144-
if (buf === null) {
145-
onreadable = d._read;
146-
return;
182+
if (isNodeStream(tail)) {
183+
tail.on('readable', function() {
184+
if (onreadable) {
185+
const cb = onreadable;
186+
onreadable = null;
187+
cb();
147188
}
148-
149-
if (!d.push(buf)) {
150-
return;
189+
});
190+
191+
tail.on('end', function() {
192+
d.push(null);
193+
});
194+
195+
d._read = function() {
196+
while (true) {
197+
const buf = tail.read();
198+
if (buf === null) {
199+
onreadable = d._read;
200+
return;
201+
}
202+
203+
if (!d.push(buf)) {
204+
return;
205+
}
151206
}
152-
}
153-
};
207+
};
208+
} else if (isWebStream(tail)) {
209+
const readable = isTransformStream(tail) ? tail.readable : tail;
210+
const reader = readable.getReader();
211+
d._read = async function() {
212+
while (true) {
213+
try {
214+
const { value, done } = await reader.read();
215+
216+
if (!d.push(value)) {
217+
return;
218+
}
219+
220+
if (done) {
221+
d.push(null);
222+
return;
223+
}
224+
} catch {
225+
return;
226+
}
227+
}
228+
};
229+
}
154230
}
155231

156232
d._destroy = function(err, callback) {
@@ -166,7 +242,9 @@ module.exports = function compose(...streams) {
166242
callback(err);
167243
} else {
168244
onclose = callback;
169-
destroyer(tail, err);
245+
if (isNodeStream(tail)) {
246+
destroyer(tail, err);
247+
}
170248
}
171249
};
172250

lib/internal/streams/pipeline.js

+2-1
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,7 @@ function pipelineImpl(streams, callback, opts) {
286286
throw new ERR_INVALID_RETURN_VALUE(
287287
'Iterable, AsyncIterable or Stream', 'source', ret);
288288
}
289-
} else if (isIterable(stream) || isReadableNodeStream(stream)) {
289+
} else if (isIterable(stream) || isReadableNodeStream(stream) || isTransformStream(stream)) {
290290
ret = stream;
291291
} else {
292292
ret = Duplex.from(stream);
@@ -385,6 +385,7 @@ function pipelineImpl(streams, callback, opts) {
385385
finishCount++;
386386
pumpToWeb(ret, stream, finish, { end });
387387
} else if (isTransformStream(ret)) {
388+
finishCount++;
388389
pumpToWeb(ret.readable, stream, finish, { end });
389390
} else {
390391
throw new ERR_INVALID_ARG_TYPE(

0 commit comments

Comments
 (0)