Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 352cbd0

Browse files
meixgdanielleadams
authored andcommittedApr 24, 2022
stream: use .chunk when calling adapters's writev
Fix: #42157 PR-URL: #42161 Fixes: #42157 Reviewed-By: Benjamin Gruenbaum <[email protected]> Reviewed-By: Matteo Collina <[email protected]>
1 parent cd914a5 commit 352cbd0

File tree

3 files changed

+406
-4
lines changed

3 files changed

+406
-4
lines changed
 

‎lib/internal/webstreams/adapters.js

+6-4
Original file line numberDiff line numberDiff line change
@@ -228,8 +228,9 @@ function newStreamWritableFromWritableStream(writableStream, options = {}) {
228228

229229
writev(chunks, callback) {
230230
function done(error) {
231+
error = error.filter((e) => e);
231232
try {
232-
callback(error);
233+
callback(error.length === 0 ? undefined : error);
233234
} catch (error) {
234235
// In a next tick because this is happening within
235236
// a promise context, and if there are any errors
@@ -247,7 +248,7 @@ function newStreamWritableFromWritableStream(writableStream, options = {}) {
247248
PromiseAll(
248249
ArrayPrototypeMap(
249250
chunks,
250-
(chunk) => writer.write(chunk))),
251+
(data) => writer.write(data.chunk))),
251252
done,
252253
done);
253254
},
@@ -633,8 +634,9 @@ function newStreamDuplexFromReadableWritablePair(pair = {}, options = {}) {
633634

634635
writev(chunks, callback) {
635636
function done(error) {
637+
error = error.filter((e) => e);
636638
try {
637-
callback(error);
639+
callback(error.length === 0 ? undefined : error);
638640
} catch (error) {
639641
// In a next tick because this is happening within
640642
// a promise context, and if there are any errors
@@ -652,7 +654,7 @@ function newStreamDuplexFromReadableWritablePair(pair = {}, options = {}) {
652654
PromiseAll(
653655
ArrayPrototypeMap(
654656
chunks,
655-
(chunk) => writer.write(chunk))),
657+
(data) => writer.write(data.chunk))),
656658
done,
657659
done);
658660
},
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
// Flags: --no-warnings --expose-internals
2+
'use strict';
3+
4+
const common = require('../common');
5+
6+
const assert = require('assert');
7+
8+
const {
9+
TransformStream,
10+
} = require('stream/web');
11+
12+
const {
13+
newStreamDuplexFromReadableWritablePair,
14+
} = require('internal/webstreams/adapters');
15+
16+
const {
17+
finished,
18+
pipeline,
19+
Readable,
20+
Writable,
21+
} = require('stream');
22+
23+
const {
24+
kState,
25+
} = require('internal/webstreams/util');
26+
27+
{
28+
const transform = new TransformStream();
29+
const duplex = newStreamDuplexFromReadableWritablePair(transform);
30+
31+
assert(transform.readable.locked);
32+
assert(transform.writable.locked);
33+
34+
duplex.destroy();
35+
36+
duplex.on('close', common.mustCall(() => {
37+
assert.strictEqual(transform.readable[kState].state, 'closed');
38+
assert.strictEqual(transform.writable[kState].state, 'errored');
39+
}));
40+
}
41+
42+
{
43+
const error = new Error('boom');
44+
const transform = new TransformStream();
45+
const duplex = newStreamDuplexFromReadableWritablePair(transform);
46+
47+
assert(transform.readable.locked);
48+
assert(transform.writable.locked);
49+
50+
duplex.destroy(error);
51+
duplex.on('error', common.mustCall((reason) => {
52+
assert.strictEqual(reason, error);
53+
}));
54+
55+
duplex.on('close', common.mustCall(() => {
56+
assert.strictEqual(transform.readable[kState].state, 'closed');
57+
assert.strictEqual(transform.writable[kState].state, 'errored');
58+
assert.strictEqual(transform.writable[kState].storedError, error);
59+
}));
60+
}
61+
62+
{
63+
const transform = new TransformStream();
64+
const duplex = new newStreamDuplexFromReadableWritablePair(transform);
65+
66+
duplex.end();
67+
duplex.resume();
68+
69+
duplex.on('close', common.mustCall(() => {
70+
assert.strictEqual(transform.readable[kState].state, 'closed');
71+
assert.strictEqual(transform.writable[kState].state, 'closed');
72+
}));
73+
}
74+
75+
{
76+
const ec = new TextEncoder();
77+
const dc = new TextDecoder();
78+
const transform = new TransformStream({
79+
transform(chunk, controller) {
80+
const text = dc.decode(chunk);
81+
controller.enqueue(ec.encode(text.toUpperCase()));
82+
}
83+
});
84+
const duplex = new newStreamDuplexFromReadableWritablePair(transform, {
85+
encoding: 'utf8',
86+
});
87+
88+
duplex.end('hello');
89+
duplex.on('data', common.mustCall((chunk) => {
90+
assert.strictEqual(chunk, 'HELLO');
91+
}));
92+
duplex.on('end', common.mustCall());
93+
94+
duplex.on('close', common.mustCall(() => {
95+
assert.strictEqual(transform.readable[kState].state, 'closed');
96+
assert.strictEqual(transform.writable[kState].state, 'closed');
97+
}));
98+
}
99+
100+
{
101+
const ec = new TextEncoder();
102+
const dc = new TextDecoder();
103+
const transform = new TransformStream({
104+
transform: common.mustCall((chunk, controller) => {
105+
const text = dc.decode(chunk);
106+
controller.enqueue(ec.encode(text.toUpperCase()));
107+
})
108+
});
109+
const duplex = new newStreamDuplexFromReadableWritablePair(transform, {
110+
encoding: 'utf8',
111+
});
112+
113+
finished(duplex, common.mustCall());
114+
115+
duplex.end('hello');
116+
duplex.resume();
117+
}
118+
119+
{
120+
const ec = new TextEncoder();
121+
const dc = new TextDecoder();
122+
const transform = new TransformStream({
123+
transform: common.mustCall((chunk, controller) => {
124+
const text = dc.decode(chunk);
125+
controller.enqueue(ec.encode(text.toUpperCase()));
126+
})
127+
});
128+
const duplex = new newStreamDuplexFromReadableWritablePair(transform, {
129+
encoding: 'utf8',
130+
});
131+
132+
const readable = new Readable({
133+
read() {
134+
readable.push(Buffer.from('hello'));
135+
readable.push(null);
136+
}
137+
});
138+
139+
const writable = new Writable({
140+
write: common.mustCall((chunk, encoding, callback) => {
141+
assert.strictEqual(dc.decode(chunk), 'HELLO');
142+
assert.strictEqual(encoding, 'buffer');
143+
callback();
144+
})
145+
});
146+
147+
finished(duplex, common.mustCall());
148+
pipeline(readable, duplex, writable, common.mustCall());
149+
}
150+
151+
{
152+
const transform = new TransformStream();
153+
const duplex = newStreamDuplexFromReadableWritablePair(transform);
154+
duplex.setEncoding('utf-8');
155+
duplex.on('data', common.mustCall((data) => {
156+
assert.strictEqual(data, 'hello');
157+
}, 5));
158+
159+
duplex.write(Buffer.from('hello'));
160+
duplex.write(Buffer.from('hello'));
161+
duplex.write(Buffer.from('hello'));
162+
duplex.write(Buffer.from('hello'));
163+
duplex.write(Buffer.from('hello'));
164+
165+
duplex.end();
166+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
// Flags: --no-warnings --expose-internals
2+
'use strict';
3+
4+
const common = require('../common');
5+
6+
const assert = require('assert');
7+
8+
const {
9+
WritableStream,
10+
} = require('stream/web');
11+
12+
const {
13+
newStreamWritableFromWritableStream,
14+
} = require('internal/webstreams/adapters');
15+
16+
const {
17+
finished,
18+
pipeline,
19+
Readable,
20+
} = require('stream');
21+
22+
const {
23+
kState,
24+
} = require('internal/webstreams/util');
25+
26+
class TestSource {
27+
constructor() {
28+
this.chunks = [];
29+
}
30+
31+
start(c) {
32+
this.controller = c;
33+
this.started = true;
34+
}
35+
36+
write(chunk) {
37+
this.chunks.push(chunk);
38+
}
39+
40+
close() {
41+
this.closed = true;
42+
}
43+
44+
abort(reason) {
45+
this.abortReason = reason;
46+
}
47+
}
48+
49+
[1, {}, false, []].forEach((arg) => {
50+
assert.throws(() => newStreamWritableFromWritableStream(arg), {
51+
code: 'ERR_INVALID_ARG_TYPE',
52+
});
53+
});
54+
55+
{
56+
// Ending the stream.Writable should close the writableStream
57+
const source = new TestSource();
58+
const writableStream = new WritableStream(source);
59+
const writable = newStreamWritableFromWritableStream(writableStream);
60+
61+
assert(writableStream.locked);
62+
63+
writable.end('chunk');
64+
65+
writable.on('close', common.mustCall(() => {
66+
assert(writableStream.locked);
67+
assert.strictEqual(writableStream[kState].state, 'closed');
68+
assert.strictEqual(source.chunks.length, 1);
69+
assert.deepStrictEqual(source.chunks[0], Buffer.from('chunk'));
70+
}));
71+
}
72+
73+
{
74+
// Destroying the stream.Writable without an error should close
75+
// the writableStream with no error.
76+
const source = new TestSource();
77+
const writableStream = new WritableStream(source);
78+
const writable = newStreamWritableFromWritableStream(writableStream);
79+
80+
assert(writableStream.locked);
81+
82+
writable.destroy();
83+
84+
writable.on('close', common.mustCall(() => {
85+
assert(writableStream.locked);
86+
assert.strictEqual(writableStream[kState].state, 'closed');
87+
assert.strictEqual(source.chunks.length, 0);
88+
}));
89+
}
90+
91+
{
92+
// Destroying the stream.Writable with an error should error
93+
// the writableStream
94+
const error = new Error('boom');
95+
const source = new TestSource();
96+
const writableStream = new WritableStream(source);
97+
const writable = newStreamWritableFromWritableStream(writableStream);
98+
99+
assert(writableStream.locked);
100+
101+
writable.destroy(error);
102+
103+
writable.on('error', common.mustCall((reason) => {
104+
assert.strictEqual(reason, error);
105+
}));
106+
107+
writable.on('close', common.mustCall(() => {
108+
assert(writableStream.locked);
109+
assert.strictEqual(writableStream[kState].state, 'errored');
110+
assert.strictEqual(writableStream[kState].storedError, error);
111+
assert.strictEqual(source.chunks.length, 0);
112+
}));
113+
}
114+
115+
{
116+
// Attempting to close, abort, or getWriter on writableStream
117+
// should fail because it is locked. An internal error in
118+
// writableStream should error the writable.
119+
const error = new Error('boom');
120+
const source = new TestSource();
121+
const writableStream = new WritableStream(source);
122+
const writable = newStreamWritableFromWritableStream(writableStream);
123+
124+
assert(writableStream.locked);
125+
126+
assert.rejects(writableStream.close(), {
127+
code: 'ERR_INVALID_STATE',
128+
});
129+
130+
assert.rejects(writableStream.abort(), {
131+
code: 'ERR_INVALID_STATE',
132+
});
133+
134+
assert.throws(() => writableStream.getWriter(), {
135+
code: 'ERR_INVALID_STATE',
136+
});
137+
138+
writable.on('error', common.mustCall((reason) => {
139+
assert.strictEqual(error, reason);
140+
}));
141+
142+
source.controller.error(error);
143+
}
144+
145+
{
146+
const source = new TestSource();
147+
const writableStream = new WritableStream(source);
148+
const writable = newStreamWritableFromWritableStream(writableStream);
149+
150+
writable.on('error', common.mustNotCall());
151+
writable.on('finish', common.mustCall());
152+
writable.on('close', common.mustCall(() => {
153+
assert.strictEqual(source.chunks.length, 1);
154+
assert.deepStrictEqual(source.chunks[0], Buffer.from('hello'));
155+
}));
156+
157+
writable.write('hello', common.mustCall());
158+
writable.end();
159+
}
160+
161+
{
162+
const source = new TestSource();
163+
const writableStream = new WritableStream(source);
164+
const writable =
165+
newStreamWritableFromWritableStream(writableStream, {
166+
decodeStrings: false,
167+
});
168+
169+
writable.on('error', common.mustNotCall());
170+
writable.on('finish', common.mustCall());
171+
writable.on('close', common.mustCall(() => {
172+
assert.strictEqual(source.chunks.length, 1);
173+
assert.strictEqual(source.chunks[0], 'hello');
174+
}));
175+
176+
writable.write('hello', common.mustCall());
177+
writable.end();
178+
}
179+
180+
{
181+
const source = new TestSource();
182+
const writableStream = new WritableStream(source);
183+
const writable =
184+
newStreamWritableFromWritableStream(
185+
writableStream, {
186+
objectMode: true
187+
});
188+
assert(writable.writableObjectMode);
189+
190+
writable.on('error', common.mustNotCall());
191+
writable.on('finish', common.mustCall());
192+
writable.on('close', common.mustCall(() => {
193+
assert.strictEqual(source.chunks.length, 1);
194+
assert.strictEqual(source.chunks[0], 'hello');
195+
}));
196+
197+
writable.write('hello', common.mustCall());
198+
writable.end();
199+
}
200+
201+
{
202+
const writableStream = new WritableStream({
203+
write: common.mustCall(5),
204+
close: common.mustCall(),
205+
});
206+
const writable = newStreamWritableFromWritableStream(writableStream);
207+
208+
finished(writable, common.mustCall());
209+
210+
writable.write('hello');
211+
writable.write('hello');
212+
writable.write('hello');
213+
writable.write('world');
214+
writable.write('world');
215+
writable.end();
216+
}
217+
218+
{
219+
const writableStream = new WritableStream({
220+
write: common.mustCall(2),
221+
close: common.mustCall(),
222+
});
223+
const writable = newStreamWritableFromWritableStream(writableStream);
224+
225+
const readable = new Readable({
226+
read() {
227+
readable.push(Buffer.from('hello'));
228+
readable.push(Buffer.from('world'));
229+
readable.push(null);
230+
}
231+
});
232+
233+
pipeline(readable, writable, common.mustCall());
234+
}

0 commit comments

Comments
 (0)
Please sign in to comment.