Skip to content

Commit c257fa5

Browse files
feat(transducers-async): add SyncOpts, update sync()
- add tests
1 parent 8a799ea commit c257fa5

File tree

3 files changed

+122
-32
lines changed

3 files changed

+122
-32
lines changed

Diff for: packages/transducers-async/src/merge.ts

+1-2
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,7 @@ export async function* merge<T>(
1616
let n = iters.length;
1717
const $remove = (id: number) => {
1818
iters.splice(id, 1);
19-
n--;
20-
if (!n) return true;
19+
if (!--n) return true;
2120
for (let i = id; i < n; i++) iters[i].id--;
2221
};
2322
// array of in-flight promises

Diff for: packages/transducers-async/src/sync.ts

+79-30
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,26 @@ export type SyncSources<T extends Record<NumOrString, any>> = {
44
[id in keyof T]: AsyncIterable<T[id]>;
55
};
66

7+
export interface SyncOpts {
8+
/**
9+
* If true, {@link sync} waits for new values from *all* remaining inputs
10+
* before a new tuple is produced. If false, that synchronization only
11+
* happens for the very first tuple and afterwards any changed input will
12+
* trigger a tuple update.
13+
*
14+
* @defaultValue false
15+
*/
16+
reset: boolean;
17+
/**
18+
* Only used if {@link SyncOpts.reset} is disabled. If true (default: false)
19+
* *no* input synchronization (waiting for values) is applied and
20+
* {@link sync} will emit potentially partially populated tuple objects for
21+
* each received input value. However, as with the default behavior, tuples
22+
* will retain the most recent consumed value from other inputs.
23+
*/
24+
mergeOnly: boolean;
25+
}
26+
727
/**
828
* Async iterator version of [thi.ng/rstream's sync()
929
* construct](https://docs.thi.ng/umbrella/rstream/functions/sync.html).
@@ -12,10 +32,20 @@ export type SyncSources<T extends Record<NumOrString, any>> = {
1232
* Also see {@link merge} for an alternative way of merging.
1333
*
1434
* @param src
35+
* @param opts
1536
*/
37+
export function sync<T extends Record<NumOrString, any>>(
38+
src: SyncSources<T>,
39+
opts?: Partial<SyncOpts> & { mergeOnly: true }
40+
): AsyncIterableIterator<Partial<T>>;
41+
export function sync<T extends Record<NumOrString, any>>(
42+
src: SyncSources<T>,
43+
opts?: Partial<SyncOpts>
44+
): AsyncIterableIterator<T>;
1645
export async function* sync<T extends Record<NumOrString, any>>(
17-
src: SyncSources<T>
18-
): AsyncIterableIterator<T> {
46+
src: SyncSources<T>,
47+
opts?: Partial<SyncOpts>
48+
) {
1949
let iters = <{ key: keyof T; id: number; iter: AsyncIterator<any> }[]>(
2050
Object.entries(src).map(([key, v], id) => ({
2151
key,
@@ -26,38 +56,57 @@ export async function* sync<T extends Record<NumOrString, any>>(
2656
let n = iters.length;
2757
const $remove = (id: number) => {
2858
iters.splice(id, 1);
29-
n--;
30-
if (!n) return true;
59+
if (!--n) return true;
3160
for (let i = id; i < n; i++) iters[i].id--;
3261
};
33-
// wait for all sources
34-
const initial = await Promise.all(iters.map(({ iter }) => iter.next()));
35-
// keep active iterators only, update successive IDs
36-
for (let i = 0; i < n; ) {
37-
if (initial[i].done) {
38-
initial.splice(i, 1);
39-
if ($remove(i)) return;
40-
} else i++;
41-
}
42-
// build initial tuple
43-
const tuple = initial.reduce(
44-
(acc, x, i) => ((acc[iters[i].key] = x.value), acc),
45-
<T>{}
46-
);
47-
yield { ...tuple };
48-
// array of in-flight promises
49-
const promises = iters.map((iter) =>
50-
iter.iter.next().then((res) => ({ iter, res }))
51-
);
52-
while (true) {
53-
const { iter, res } = await Promise.any(promises);
54-
if (res.done) {
55-
promises.splice(iter.id, 1);
56-
if ($remove(iter.id)) return;
62+
const $initial = async () => {
63+
// wait for all sources
64+
const res = await Promise.all(iters.map(({ iter }) => iter.next()));
65+
// keep active iterators only, update successive IDs
66+
for (let i = 0; i < n; ) {
67+
if (res[i].done) {
68+
res.splice(i, 1);
69+
if ($remove(i)) return;
70+
} else i++;
71+
}
72+
// build tuple
73+
return res.reduce(
74+
(acc, x, i) => ((acc[iters[i].key] = x.value), acc),
75+
<T>{}
76+
);
77+
};
78+
if (opts?.reset) {
79+
let tuple: T | undefined;
80+
let curr: T | undefined;
81+
while ((curr = await $initial())) {
82+
tuple = { ...tuple, ...curr };
83+
yield tuple;
84+
}
85+
} else {
86+
let tuple: T | undefined;
87+
if (opts?.mergeOnly) {
88+
tuple = <T>{};
5789
} else {
58-
tuple[iter.key] = res.value;
90+
tuple = await $initial();
91+
if (!tuple) return;
5992
yield { ...tuple };
60-
promises[iter.id] = iter.iter.next().then((res) => ({ res, iter }));
93+
}
94+
// array of in-flight promises
95+
const promises = iters.map((iter) =>
96+
iter.iter.next().then((res) => ({ iter, res }))
97+
);
98+
while (true) {
99+
const { iter, res } = await Promise.any(promises);
100+
if (res.done) {
101+
promises.splice(iter.id, 1);
102+
if ($remove(iter.id)) return;
103+
} else {
104+
tuple[iter.key] = res.value;
105+
yield { ...tuple };
106+
promises[iter.id] = iter.iter
107+
.next()
108+
.then((res) => ({ res, iter }));
109+
}
61110
}
62111
}
63112
}

Diff for: packages/transducers-async/test/main.test.ts

+42
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,48 @@ test(
240240
{ retry: 5 }
241241
);
242242

243+
test("sync (reset)", async (done) => {
244+
expect(
245+
await push(
246+
sync(
247+
{
248+
a: repeatedly((i) => i, 2),
249+
b: repeatedly((i) => i, 4),
250+
},
251+
{ reset: true }
252+
)
253+
)
254+
).toEqual([
255+
{ a: 0, b: 0 },
256+
{ a: 1, b: 1 },
257+
{ a: 1, b: 2 },
258+
{ a: 1, b: 3 },
259+
]);
260+
done();
261+
});
262+
263+
test("sync (merge only)", async (done) => {
264+
expect(
265+
await push(
266+
sync(
267+
{
268+
a: repeatedly((i) => i, 2),
269+
b: repeatedly((i) => i, 4),
270+
},
271+
{ mergeOnly: true }
272+
)
273+
)
274+
).toEqual([
275+
{ a: 0 },
276+
{ a: 0, b: 0 },
277+
{ a: 1, b: 0 },
278+
{ a: 1, b: 1 },
279+
{ a: 1, b: 2 },
280+
{ a: 1, b: 3 },
281+
]);
282+
done();
283+
});
284+
243285
test("take", async (done) => {
244286
expect(
245287
await transduce(

0 commit comments

Comments
 (0)