Skip to content

Commit 39e3ec1

Browse files
committed
operators: add withLatest(from:)
1 parent 68c8dc2 commit 39e3ec1

7 files changed

+1008
-0
lines changed
+161
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
# Feature name
2+
3+
* Proposal: [NNNN](NNNN-filename.md)
4+
* Authors: [Thibault Wittemberg](https://github.com/twittemb)
5+
* Review Manager: TBD
6+
* Status: **Awaiting implementation**
7+
8+
*During the review process, add the following fields as needed:*
9+
10+
* Implementation: [apple/swift-async-algorithms#NNNNN](https://github.com/apple/swift-async-algorithms/pull/NNNNN)
11+
* Decision Notes: [Rationale](https://forums.swift.org/), [Additional Commentary](https://forums.swift.org/)
12+
* Bugs: [NNNN](https://github.com/apple/swift-async-algorithms/issues)
13+
14+
## Introduction
15+
16+
There are several strategies when it comes to combining several sequences of events each having their own temporality. This proposal describes an operator that combines an async sequence values with the latest known values from other ones.
17+
18+
Swift forums thread: [[Pitch] withLatestFrom](https://forums.swift.org/t/pitch-withlatestfrom/56487/28)
19+
20+
## Motivation
21+
22+
Being able to combine values happening over time is a common practice in software engineering. The goal is to synchronize events from several sources by applying some strategies.
23+
24+
This is an area where reactive programming frameworks are particularly suited. Whether it is [Combine](https://developer.apple.com/documentation/combine), [RxSwift](https://github.com/ReactiveX/RxSwift) or [ReactiveCocoa](https://github.com/ReactiveCocoa/ReactiveSwift), they all provide operators that combine streams of events using some common patterns.
25+
26+
The field of possibilities is generally summarized by `zip` and `combineLatest`.
27+
28+
### zip
29+
30+
`zip` combines elements from several streams and delivers groups of elements. The returned stream waits until all upstream streams have produced an element, then delivers the latest elements from each stream as a tuple.
31+
32+
That kind of operator can be used to synchronize elements from several concurrent works. A common usecase is to synchronize values coming from concurrent network calls.
33+
34+
The following example from the [zip guide](https://github.com/apple/swift-async-algorithms/blob/main/Guides/Zip.md) illustrates the synchronization mechanism in the case of two streams of stock values:
35+
36+
37+
| Timestamp | appleFeed | nasdaqFeed | combined output |
38+
| ----------- | --------- | ---------- | ----------------------------- |
39+
| 11:40 AM | 173.91 | | |
40+
| 12:25 AM | | 14236.78 | AAPL: 173.91 NASDAQ: 14236.78 |
41+
| 12:40 AM | | 14218.34 | |
42+
| 1:15 PM | 173.00 | | AAPL: 173.00 NASDAQ: 14218.34 |
43+
44+
### combineLatest
45+
46+
The `combineLatest` operator behaves in a similar way to `zip`, but while `zip` produces elements only when each of the zipped streams have produced an element, `combineLatest` produces an element whenever any of the source stream produces one.
47+
48+
The following example from the [combineLatest guide](https://github.com/apple/swift-async-algorithms/blob/main/Guides/CombineLatest.md) illustrates the synchronization mechanism in the case of two streams of stock values:
49+
50+
51+
| Timestamp | appleFeed | nasdaqFeed | combined output |
52+
| ----------- | --------- | ---------- | ----------------------------- |
53+
| 11:40 AM | 173.91 | | |
54+
| 12:25 AM | | 14236.78 | AAPL: 173.91 NASDAQ: 14236.78 |
55+
| 12:40 AM | | 14218.34 | AAPL: 173.91 NASDAQ: 14218.34 |
56+
| 1:15 PM | 173.00 | | AAPL: 173.00 NASDAQ: 14218.34 |
57+
58+
59+
### When self should impose its pace!
60+
61+
With `zip` and `combineLatest` all streams have equal weight in the aggregation algorithm that forms the tuples. Input streams can be interchanged without changing the operator's behavior. We can see `zip` as an `AND` boolean operator and `combineLatest` as an `OR` boolean operator: in boolean algebra they are commutative properties.
62+
63+
There can be usecases where a particular stream should impose its pace to the others.
64+
65+
What if we want a new value of the tuple (`AAPL`, `NASDAQ`) to be produced **ONLY WHEN** the `appleFeed` produces an element?
66+
67+
Although `combineLatest` is close to the desired behavior, it is not exactly it: a new tuple will be produced also when `nasdaqFeed` produces a new element.
68+
69+
Following the stock example, the desired behavior would be:
70+
71+
| Timestamp | appleFeed | nasdaqFeed | combined output |
72+
| ----------- | --------- | ---------- | ----------------------------- |
73+
| 11:40 AM | 173.91 | | |
74+
| 12:25 AM | | 14236.78 | |
75+
| 12:40 AM | | 14218.34 | |
76+
| 1:15 PM | 173.00 | | AAPL: 173.00 NASDAQ: 14218.34 |
77+
78+
Unlike `zip` and `combineLatest`, we cannot interchange the 2 feeds without changing the awaited behavior.
79+
80+
## Proposed solution
81+
82+
We propose to introduce an new operator that applies to `self` (self being an `AsyncSequence`), and that takes other AsyncSequences as parameters.
83+
84+
The temporary name for this operator is: `.withLatest(from:)`.
85+
86+
`.withLatest(from:)` combines elements from `self` with elements from other asynchronous sequences and delivers groups of elements as tuples. The returned `AsyncSequence` produces elements when `self` produces an element and groups it with the latest known elements from the other sequences to form the output tuples.
87+
88+
89+
## Detailed design
90+
91+
This function family and the associated family of return types are prime candidates for variadic generics. Until that proposal is accepted, these will be implemented in terms of two- and three-base sequence cases.
92+
93+
```swift
94+
public extension AsyncSequence {
95+
func withLatest<Other: AsyncSequence>(from other: Other) -> AsyncWithLatestFromSequence<Self, Other> {
96+
AsyncWithLatestFromSequence(self, other)
97+
}
98+
99+
func withLatest<Other1: AsyncSequence, Other2: AsyncSequence>(from other1: Other1, _ other2: Other2) -> AsyncWithLatestFrom2Sequence<Self, Other> {
100+
AsyncWithLatestFrom2Sequence(self, other1, other2)
101+
}
102+
}
103+
104+
public struct AsyncWithLatestFromSequence<Base: AsyncSequence, Other: AsyncSequence> {
105+
public typealias Element = (Base.Element, Other.Element)
106+
public typealias AsyncIterator = Iterator
107+
108+
public struct Iterator: AsyncIteratorProtocol {
109+
public mutating func next() async rethrows -> Element?
110+
}
111+
112+
public func makeAsyncIterator() -> Iterator
113+
}
114+
115+
public struct AsyncWithLatestFrom2Sequence<Base: AsyncSequence, Other1: AsyncSequence, Other2: AsyncSequence> {
116+
public typealias Element = (Base.Element, Other1.Element, Other2.Element)
117+
public typealias AsyncIterator = Iterator
118+
119+
public struct Iterator: AsyncIteratorProtocol {
120+
public mutating func next() async rethrows -> Element?
121+
}
122+
123+
public func makeAsyncIterator() -> Iterator
124+
}
125+
```
126+
127+
The `withLatest(from:...)` function takes one or two asynchronous sequences as arguments and produces an `AsyncWithLatestFromSequence`/`AsyncWithLatestFrom2Sequence` which is an asynchronous sequence.
128+
129+
As we must know the latest elements from `others` to form the output tuple when `self` produces a new element, we must iterate over `others` asynchronously using Tasks.
130+
131+
For the first iteration of `AsyncWithLatestFromSequence` to produce an element, `AsyncWithLatestFromSequence` will wait for `self` and `others` to produce a first element.
132+
133+
Each subsequent iteration of an `AsyncWithLatestFromSequence` will wait for `self` to produce an element.
134+
135+
If self` terminates by returning nil from its iteration, the `AsyncWithLatestFromSequence` iteration is immediately considered unsatisfiable and returns nil and all iterations of other bases will be cancelled.
136+
137+
If `others` terminates by returning nil from their iteration, the `AsyncWithLatestFromSequence` iteration continues by agregating elements from `self` and last known elements from `others`.
138+
139+
If any iteration of `self` or `others` throws an error, then the `others` iterations are cancelled and the produced error is rethrown, terminating the iteration.
140+
141+
The source of throwing of `AsyncWithLatestFromSequence` is determined by `Self` and `Others`. That means that if `self` or any `other` can throw an error then the iteration of the `AsyncWithLatestFromSequence` can throw. If `self` and `others` cannot throw, then the `AsyncWithLatestFromSequence` cannot throw.
142+
143+
## Effect on API resilience
144+
145+
None.
146+
147+
## Alternatives names
148+
149+
Those alternate names were suggested:
150+
151+
- `zip(sampling: other1, other2, atRateOf: self)`
152+
- `zip(other1, other2, elementOn: .newElementFrom(self))`
153+
- `self.zipWhen(other1, other2)`
154+
155+
## Comparison with other libraries
156+
157+
[RxSwift](https://github.com/ReactiveX/RxSwift/blob/main/RxSwift/Observables/WithLatestFrom.swift) provides an implementation of such an operator under the name `withLatestFrom` ([RxMarble](https://rxmarbles.com/#withLatestFrom))
158+
159+
## Acknowledgments
160+
161+
Thanks to everyone on the forum for giving great feedback.

README.md

+2
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ This package is the home for these APIs. Development and API design take place o
2525
- [`merge(_:...)`](https://github.com/apple/swift-async-algorithms/blob/main/Sources/AsyncAlgorithms/AsyncAlgorithms.docc/Guides/Merge.md): Merges two or more asynchronous sequence into a single asynchronous sequence producing the elements of all of the underlying asynchronous sequences.
2626
- [`zip(_:...)`](https://github.com/apple/swift-async-algorithms/blob/main/Sources/AsyncAlgorithms/AsyncAlgorithms.docc/Guides/Zip.md): Creates an asynchronous sequence of pairs built out of underlying asynchronous sequences.
2727
- [`joined(separator:)`](https://github.com/apple/swift-async-algorithms/blob/main/Sources/AsyncAlgorithms/AsyncAlgorithms.docc/Guides/Joined.md): Concatenated elements of an asynchronous sequence of asynchronous sequences, inserting the given separator between each element.
28+
- [`zipLatest(from:)`](https://github.com/twittemb/swift-async-algorithms/blob/zipLatestFrom/Evolution/0000-implement-zipLatestFrom.md): Combines self with another AsyncSequence into a single AsyncSequence
29+
wher$
2830

2931
#### Creating asynchronous sequences
3032

0 commit comments

Comments
 (0)