Skip to content

Commit c60816a

Browse files
debadree25danielleadams
authored andcommitted
stream: add abort signal for ReadableStream and WritableStream
Refs: #39316 PR-URL: #46273 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Benjamin Gruenbaum <[email protected]> Reviewed-By: James M Snell <[email protected]>
1 parent 1395e36 commit c60816a

File tree

6 files changed

+233
-12
lines changed

6 files changed

+233
-12
lines changed

doc/api/stream.md

+40-2
Original file line numberDiff line numberDiff line change
@@ -3191,17 +3191,24 @@ readable.getReader().read().then((result) => {
31913191

31923192
<!-- YAML
31933193
added: v15.4.0
3194+
changes:
3195+
- version: REPLACEME
3196+
pr-url: https://github.com/nodejs/node/pull/46273
3197+
description: Added support for `ReadableStream` and
3198+
`WritableStream`.
31943199
-->
31953200

31963201
* `signal` {AbortSignal} A signal representing possible cancellation
3197-
* `stream` {Stream} a stream to attach a signal to
3202+
* `stream` {Stream|ReadableStream|WritableStream}
3203+
3204+
A stream to attach a signal to.
31983205

31993206
Attaches an AbortSignal to a readable or writeable stream. This lets code
32003207
control stream destruction using an `AbortController`.
32013208

32023209
Calling `abort` on the `AbortController` corresponding to the passed
32033210
`AbortSignal` will behave the same way as calling `.destroy(new AbortError())`
3204-
on the stream.
3211+
on the stream, and `controller.error(new AbortError())` for webstreams.
32053212

32063213
```js
32073214
const fs = require('node:fs');
@@ -3239,6 +3246,37 @@ const stream = addAbortSignal(
32393246
})();
32403247
```
32413248

3249+
Or using an `AbortSignal` with a ReadableStream:
3250+
3251+
```js
3252+
const controller = new AbortController();
3253+
const rs = new ReadableStream({
3254+
start(controller) {
3255+
controller.enqueue('hello');
3256+
controller.enqueue('world');
3257+
controller.close();
3258+
},
3259+
});
3260+
3261+
addAbortSignal(controller.signal, rs);
3262+
3263+
finished(rs, (err) => {
3264+
if (err) {
3265+
if (err.name === 'AbortError') {
3266+
// The operation was cancelled
3267+
}
3268+
}
3269+
});
3270+
3271+
const reader = rs.getReader();
3272+
3273+
reader.read().then(({ value, done }) => {
3274+
console.log(value); // hello
3275+
console.log(done); // false
3276+
controller.abort();
3277+
});
3278+
```
3279+
32423280
## API for stream implementers
32433281

32443282
<!--type=misc-->

lib/internal/streams/add-abort-signal.js

+16-9
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,12 @@ const {
55
codes,
66
} = require('internal/errors');
77

8+
const {
9+
isNodeStream,
10+
isWebStream,
11+
kControllerErrorFunction,
12+
} = require('internal/streams/utils');
13+
814
const eos = require('internal/streams/end-of-stream');
915
const { ERR_INVALID_ARG_TYPE } = codes;
1016

@@ -18,24 +24,25 @@ const validateAbortSignal = (signal, name) => {
1824
}
1925
};
2026

21-
function isNodeStream(obj) {
22-
return !!(obj && typeof obj.pipe === 'function');
23-
}
24-
2527
module.exports.addAbortSignal = function addAbortSignal(signal, stream) {
2628
validateAbortSignal(signal, 'signal');
27-
if (!isNodeStream(stream)) {
28-
throw new ERR_INVALID_ARG_TYPE('stream', 'stream.Stream', stream);
29+
if (!isNodeStream(stream) && !isWebStream(stream)) {
30+
throw new ERR_INVALID_ARG_TYPE('stream', ['ReadableStream', 'WritableStream', 'Stream'], stream);
2931
}
3032
return module.exports.addAbortSignalNoValidate(signal, stream);
3133
};
34+
3235
module.exports.addAbortSignalNoValidate = function(signal, stream) {
3336
if (typeof signal !== 'object' || !('aborted' in signal)) {
3437
return stream;
3538
}
36-
const onAbort = () => {
37-
stream.destroy(new AbortError(undefined, { cause: signal.reason }));
38-
};
39+
const onAbort = isNodeStream(stream) ?
40+
() => {
41+
stream.destroy(new AbortError(undefined, { cause: signal.reason }));
42+
} :
43+
() => {
44+
stream[kControllerErrorFunction](new AbortError(undefined, { cause: signal.reason }));
45+
};
3946
if (signal.aborted) {
4047
onAbort();
4148
} else {

lib/internal/streams/utils.js

+2
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ const kIsReadable = Symbol('kIsReadable');
1313
const kIsDisturbed = Symbol('kIsDisturbed');
1414

1515
const kIsClosedPromise = SymbolFor('nodejs.webstream.isClosedPromise');
16+
const kControllerErrorFunction = SymbolFor('nodejs.webstream.controllerErrorFunction');
1617

1718
function isReadableNodeStream(obj, strict = false) {
1819
return !!(
@@ -305,6 +306,7 @@ module.exports = {
305306
isReadable,
306307
kIsReadable,
307308
kIsClosedPromise,
309+
kControllerErrorFunction,
308310
isClosed,
309311
isDestroyed,
310312
isDuplexNodeStream,

lib/internal/webstreams/readablestream.js

+3-1
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ const {
8383
kIsErrored,
8484
kIsReadable,
8585
kIsClosedPromise,
86+
kControllerErrorFunction,
8687
} = require('internal/streams/utils');
8788

8889
const {
@@ -260,6 +261,7 @@ class ReadableStream {
260261
};
261262

262263
this[kIsClosedPromise] = createDeferredPromise();
264+
this[kControllerErrorFunction] = () => {};
263265

264266
// The spec requires handling of the strategy first
265267
// here. Specifically, if getting the size and
@@ -1891,7 +1893,6 @@ function readableStreamClose(stream) {
18911893
assert(stream[kState].state === 'readable');
18921894
stream[kState].state = 'closed';
18931895
stream[kIsClosedPromise].resolve();
1894-
18951896
const {
18961897
reader,
18971898
} = stream[kState];
@@ -2330,6 +2331,7 @@ function setupReadableStreamDefaultController(
23302331
stream,
23312332
};
23322333
stream[kState].controller = controller;
2334+
stream[kControllerErrorFunction] = FunctionPrototypeBind(controller.error, controller);
23332335

23342336
const startResult = startAlgorithm();
23352337

lib/internal/webstreams/writablestream.js

+4
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ const {
7171

7272
const {
7373
kIsClosedPromise,
74+
kControllerErrorFunction,
7475
} = require('internal/streams/utils');
7576

7677
const {
@@ -199,6 +200,7 @@ class WritableStream {
199200
};
200201

201202
this[kIsClosedPromise] = createDeferredPromise();
203+
this[kControllerErrorFunction] = () => {};
202204

203205
const size = extractSizeAlgorithm(strategy?.size);
204206
const highWaterMark = extractHighWaterMark(strategy?.highWaterMark, 1);
@@ -370,6 +372,7 @@ function TransferredWritableStream() {
370372
},
371373
};
372374
this[kIsClosedPromise] = createDeferredPromise();
375+
this[kControllerErrorFunction] = () => {};
373376
},
374377
[], WritableStream));
375378
}
@@ -1282,6 +1285,7 @@ function setupWritableStreamDefaultController(
12821285
writeAlgorithm,
12831286
};
12841287
stream[kState].controller = controller;
1288+
stream[kControllerErrorFunction] = FunctionPrototypeBind(controller.error, controller);
12851289

12861290
writableStreamUpdateBackpressure(
12871291
stream,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const { finished, addAbortSignal } = require('stream');
5+
const { ReadableStream, WritableStream } = require('stream/web');
6+
const assert = require('assert');
7+
8+
function createTestReadableStream() {
9+
return new ReadableStream({
10+
start(controller) {
11+
controller.enqueue('a');
12+
controller.enqueue('b');
13+
controller.enqueue('c');
14+
controller.close();
15+
}
16+
});
17+
}
18+
19+
function createTestWritableStream(values) {
20+
return new WritableStream({
21+
write(chunk) {
22+
values.push(chunk);
23+
}
24+
});
25+
}
26+
27+
{
28+
const rs = createTestReadableStream();
29+
30+
const reader = rs.getReader();
31+
32+
const ac = new AbortController();
33+
34+
addAbortSignal(ac.signal, rs);
35+
36+
finished(rs, common.mustCall((err) => {
37+
assert.strictEqual(err.name, 'AbortError');
38+
assert.rejects(reader.read(), /AbortError/).then(common.mustCall());
39+
assert.rejects(reader.closed, /AbortError/).then(common.mustCall());
40+
}));
41+
42+
reader.read().then(common.mustCall((result) => {
43+
assert.strictEqual(result.value, 'a');
44+
ac.abort();
45+
}));
46+
}
47+
48+
{
49+
const rs = createTestReadableStream();
50+
51+
const ac = new AbortController();
52+
53+
addAbortSignal(ac.signal, rs);
54+
55+
assert.rejects((async () => {
56+
for await (const chunk of rs) {
57+
if (chunk === 'b') {
58+
ac.abort();
59+
}
60+
}
61+
})(), /AbortError/).then(common.mustCall());
62+
}
63+
64+
{
65+
const rs1 = createTestReadableStream();
66+
67+
const rs2 = createTestReadableStream();
68+
69+
const ac = new AbortController();
70+
71+
addAbortSignal(ac.signal, rs1);
72+
addAbortSignal(ac.signal, rs2);
73+
74+
const reader1 = rs1.getReader();
75+
const reader2 = rs2.getReader();
76+
77+
finished(rs1, common.mustCall((err) => {
78+
assert.strictEqual(err.name, 'AbortError');
79+
assert.rejects(reader1.read(), /AbortError/).then(common.mustCall());
80+
assert.rejects(reader1.closed, /AbortError/).then(common.mustCall());
81+
}));
82+
83+
finished(rs2, common.mustCall((err) => {
84+
assert.strictEqual(err.name, 'AbortError');
85+
assert.rejects(reader2.read(), /AbortError/).then(common.mustCall());
86+
assert.rejects(reader2.closed, /AbortError/).then(common.mustCall());
87+
}));
88+
89+
ac.abort();
90+
}
91+
92+
{
93+
const rs = createTestReadableStream();
94+
95+
const { 0: rs1, 1: rs2 } = rs.tee();
96+
97+
const ac = new AbortController();
98+
99+
addAbortSignal(ac.signal, rs);
100+
101+
const reader1 = rs1.getReader();
102+
const reader2 = rs2.getReader();
103+
104+
finished(rs1, common.mustCall((err) => {
105+
assert.strictEqual(err.name, 'AbortError');
106+
assert.rejects(reader1.read(), /AbortError/).then(common.mustCall());
107+
assert.rejects(reader1.closed, /AbortError/).then(common.mustCall());
108+
}));
109+
110+
finished(rs2, common.mustCall((err) => {
111+
assert.strictEqual(err.name, 'AbortError');
112+
assert.rejects(reader2.read(), /AbortError/).then(common.mustCall());
113+
assert.rejects(reader2.closed, /AbortError/).then(common.mustCall());
114+
}));
115+
116+
ac.abort();
117+
}
118+
119+
{
120+
const values = [];
121+
const ws = createTestWritableStream(values);
122+
123+
const ac = new AbortController();
124+
125+
addAbortSignal(ac.signal, ws);
126+
127+
const writer = ws.getWriter();
128+
129+
finished(ws, common.mustCall((err) => {
130+
assert.strictEqual(err.name, 'AbortError');
131+
assert.deepStrictEqual(values, ['a']);
132+
assert.rejects(writer.write('b'), /AbortError/).then(common.mustCall());
133+
assert.rejects(writer.closed, /AbortError/).then(common.mustCall());
134+
}));
135+
136+
writer.write('a').then(() => {
137+
ac.abort();
138+
});
139+
}
140+
141+
{
142+
const values = [];
143+
144+
const ws1 = createTestWritableStream(values);
145+
const ws2 = createTestWritableStream(values);
146+
147+
const ac = new AbortController();
148+
149+
addAbortSignal(ac.signal, ws1);
150+
addAbortSignal(ac.signal, ws2);
151+
152+
const writer1 = ws1.getWriter();
153+
const writer2 = ws2.getWriter();
154+
155+
finished(ws1, common.mustCall((err) => {
156+
assert.strictEqual(err.name, 'AbortError');
157+
assert.rejects(writer1.write('a'), /AbortError/).then(common.mustCall());
158+
assert.rejects(writer1.closed, /AbortError/).then(common.mustCall());
159+
}));
160+
161+
finished(ws2, common.mustCall((err) => {
162+
assert.strictEqual(err.name, 'AbortError');
163+
assert.rejects(writer2.write('a'), /AbortError/).then(common.mustCall());
164+
assert.rejects(writer2.closed, /AbortError/).then(common.mustCall());
165+
}));
166+
167+
ac.abort();
168+
}

0 commit comments

Comments
 (0)