Skip to content

Commit 348707f

Browse files
jasnelldanielleadams
authored andcommitted
lib: make AbortSignal cloneable/transferable
Allows for using `AbortSignal` across worker threads and contexts. ```js const ac = new AbortController(); const mc = new MessageChannel(); mc.port1.onmessage = ({ data }) => { data.addEventListener('abort', () => { console.log('aborted!'); }); }; mc.port2.postMessage(ac.signal, [ac.signal]); ``` Signed-off-by: James M Snell <[email protected]> PR-URL: #41050 Refs: whatwg/dom#948 Reviewed-By: Anna Henningsen <[email protected]> Reviewed-By: Robert Nagy <[email protected]>
1 parent 28761de commit 348707f

File tree

2 files changed

+169
-5
lines changed

2 files changed

+169
-5
lines changed

lib/internal/abort_controller.js

+91-5
Original file line numberDiff line numberDiff line change
@@ -47,13 +47,37 @@ const {
4747
setTimeout,
4848
} = require('timers');
4949

50-
const kAborted = Symbol('kAborted');
51-
const kReason = Symbol('kReason');
52-
const kTimeout = Symbol('kTimeout');
50+
const {
51+
messaging_deserialize_symbol: kDeserialize,
52+
messaging_transfer_symbol: kTransfer,
53+
messaging_transfer_list_symbol: kTransferList
54+
} = internalBinding('symbols');
5355

54-
const timeOutSignals = new SafeSet();
56+
let _MessageChannel;
57+
let makeTransferable;
58+
59+
// Loading the MessageChannel and makeTransferable have to be done lazily
60+
// because otherwise we'll end up with a require cycle that ends up with
61+
// an incomplete initialization of abort_controller.
62+
63+
function lazyMessageChannel() {
64+
_MessageChannel ??= require('internal/worker/io').MessageChannel;
65+
return new _MessageChannel();
66+
}
67+
68+
function lazyMakeTransferable(obj) {
69+
makeTransferable ??=
70+
require('internal/worker/js_transferable').makeTransferable;
71+
return makeTransferable(obj);
72+
}
5573

5674
const clearTimeoutRegistry = new SafeFinalizationRegistry(clearTimeout);
75+
const timeOutSignals = new SafeSet();
76+
77+
const kAborted = Symbol('kAborted');
78+
const kReason = Symbol('kReason');
79+
const kCloneData = Symbol('kCloneData');
80+
const kTimeout = Symbol('kTimeout');
5781

5882
function customInspect(self, obj, depth, options) {
5983
if (depth < 0)
@@ -172,7 +196,68 @@ class AbortSignal extends EventTarget {
172196
timeOutSignals.delete(this);
173197
}
174198
}
199+
200+
[kTransfer]() {
201+
validateAbortSignal(this);
202+
const aborted = this.aborted;
203+
if (aborted) {
204+
const reason = this.reason;
205+
return {
206+
data: { aborted, reason },
207+
deserializeInfo: 'internal/abort_controller:ClonedAbortSignal',
208+
};
209+
}
210+
211+
const { port1, port2 } = this[kCloneData];
212+
this[kCloneData] = undefined;
213+
214+
this.addEventListener('abort', () => {
215+
port1.postMessage(this.reason);
216+
port1.close();
217+
}, { once: true });
218+
219+
return {
220+
data: { port: port2 },
221+
deserializeInfo: 'internal/abort_controller:ClonedAbortSignal',
222+
};
223+
}
224+
225+
[kTransferList]() {
226+
if (!this.aborted) {
227+
const { port1, port2 } = lazyMessageChannel();
228+
port1.unref();
229+
port2.unref();
230+
this[kCloneData] = {
231+
port1,
232+
port2,
233+
};
234+
return [port2];
235+
}
236+
return [];
237+
}
238+
239+
[kDeserialize]({ aborted, reason, port }) {
240+
if (aborted) {
241+
this[kAborted] = aborted;
242+
this[kReason] = reason;
243+
return;
244+
}
245+
246+
port.onmessage = ({ data }) => {
247+
abortSignal(this, data);
248+
port.close();
249+
port.onmessage = undefined;
250+
};
251+
// The receiving port, by itself, should never keep the event loop open.
252+
// The unref() has to be called *after* setting the onmessage handler.
253+
port.unref();
254+
}
255+
}
256+
257+
function ClonedAbortSignal() {
258+
return createAbortSignal();
175259
}
260+
ClonedAbortSignal.prototype[kDeserialize] = () => {};
176261

177262
ObjectDefineProperties(AbortSignal.prototype, {
178263
aborted: { enumerable: true }
@@ -192,7 +277,7 @@ function createAbortSignal(aborted = false, reason = undefined) {
192277
ObjectSetPrototypeOf(signal, AbortSignal.prototype);
193278
signal[kAborted] = aborted;
194279
signal[kReason] = reason;
195-
return signal;
280+
return lazyMakeTransferable(signal);
196281
}
197282

198283
function abortSignal(signal, reason) {
@@ -259,4 +344,5 @@ module.exports = {
259344
kAborted,
260345
AbortController,
261346
AbortSignal,
347+
ClonedAbortSignal,
262348
};
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const { ok, strictEqual } = require('assert');
5+
const { setImmediate: pause } = require('timers/promises');
6+
7+
function deferred() {
8+
let res;
9+
const promise = new Promise((resolve) => res = resolve);
10+
return { res, promise };
11+
}
12+
13+
(async () => {
14+
const ac = new AbortController();
15+
const mc = new MessageChannel();
16+
17+
const deferred1 = deferred();
18+
const deferred2 = deferred();
19+
const resolvers = [deferred1, deferred2];
20+
21+
mc.port1.onmessage = common.mustCall(({ data }) => {
22+
data.addEventListener('abort', common.mustCall(() => {
23+
strictEqual(data.reason, 'boom');
24+
}));
25+
resolvers.shift().res();
26+
}, 2);
27+
28+
mc.port2.postMessage(ac.signal, [ac.signal]);
29+
30+
// Can be cloned/transferd multiple times and they all still work
31+
mc.port2.postMessage(ac.signal, [ac.signal]);
32+
33+
mc.port2.close();
34+
35+
// Although we're using transfer semantics, the local AbortSignal
36+
// is still usable locally.
37+
ac.signal.addEventListener('abort', common.mustCall(() => {
38+
strictEqual(ac.signal.reason, 'boom');
39+
}));
40+
41+
await Promise.all([ deferred1.promise, deferred2.promise ]);
42+
43+
ac.abort('boom');
44+
45+
// Because the postMessage used by the underlying AbortSignal
46+
// takes at least one turn of the event loop to be processed,
47+
// and because it is unref'd, it won't, by itself, keep the
48+
// event loop open long enough for the test to complete, so
49+
// we schedule two back to back turns of the event to ensure
50+
// the loop runs long enough for the test to complete.
51+
await pause();
52+
await pause();
53+
54+
})().then(common.mustCall());
55+
56+
{
57+
const signal = AbortSignal.abort('boom');
58+
ok(signal.aborted);
59+
strictEqual(signal.reason, 'boom');
60+
const mc = new MessageChannel();
61+
mc.port1.onmessage = common.mustCall(({ data }) => {
62+
ok(data instanceof AbortSignal);
63+
ok(data.aborted);
64+
strictEqual(data.reason, 'boom');
65+
mc.port1.close();
66+
});
67+
mc.port2.postMessage(signal, [signal]);
68+
}
69+
70+
{
71+
// The cloned AbortSignal does not keep the event loop open
72+
// waiting for the abort to be triggered.
73+
const ac = new AbortController();
74+
const mc = new MessageChannel();
75+
mc.port1.onmessage = common.mustCall();
76+
mc.port2.postMessage(ac.signal, [ac.signal]);
77+
mc.port2.close();
78+
}

0 commit comments

Comments
 (0)