Skip to content

Commit 07a0c1e

Browse files
authored
Remove copy and paste within MergeStorage (#275)
1 parent adb12bf commit 07a0c1e

File tree

1 file changed

+94
-254
lines changed

1 file changed

+94
-254
lines changed

Sources/AsyncAlgorithms/Merge/MergeStorage.swift

+94-254
Original file line numberDiff line numberDiff line change
@@ -147,262 +147,12 @@ final class MergeStorage<
147147
// sequences. We must store it to cancel it at the right times.
148148
let task = Task {
149149
await withThrowingTaskGroup(of: Void.self) { group in
150-
// For each upstream sequence we are adding a child task that
151-
// is consuming the upstream sequence
152-
group.addTask {
153-
var iterator1 = base1.makeAsyncIterator()
154-
155-
// This is our upstream consumption loop
156-
loop: while true {
157-
// We are creating a continuation before requesting the next
158-
// element from upstream. This continuation is only resumed
159-
// if the downstream consumer called `next` to signal his demand.
160-
try await withUnsafeThrowingContinuation { continuation in
161-
let action = self.lock.withLock {
162-
self.stateMachine.childTaskSuspended(continuation)
163-
}
164-
165-
switch action {
166-
case let .resumeContinuation(continuation):
167-
// This happens if there is outstanding demand
168-
// and we need to demand from upstream right away
169-
continuation.resume(returning: ())
170-
171-
case let .resumeContinuationWithError(continuation, error):
172-
// This happens if another upstream already failed or if
173-
// the task got cancelled.
174-
continuation.resume(throwing: error)
175-
176-
case .none:
177-
break
178-
}
179-
}
180-
181-
// We got signalled from the downstream that we have demand so let's
182-
// request a new element from the upstream
183-
if let element1 = try await iterator1.next() {
184-
let action = self.lock.withLock {
185-
self.stateMachine.elementProduced(element1)
186-
}
187-
188-
switch action {
189-
case let .resumeContinuation(continuation, element):
190-
// We had an outstanding demand and where the first
191-
// upstream to produce an element so we can forward it to
192-
// the downstream
193-
continuation.resume(returning: element)
194-
195-
case .none:
196-
break
197-
}
198-
199-
} else {
200-
// The upstream returned `nil` which indicates that it finished
201-
let action = self.lock.withLock {
202-
self.stateMachine.upstreamFinished()
203-
}
204-
205-
// All of this is mostly cleanup around the Task and the outstanding
206-
// continuations used for signalling.
207-
switch action {
208-
case let .resumeContinuationWithNilAndCancelTaskAndUpstreamContinuations(
209-
downstreamContinuation,
210-
task,
211-
upstreamContinuations
212-
):
213-
upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) }
214-
task.cancel()
215-
216-
downstreamContinuation.resume(returning: nil)
217-
218-
break loop
219-
220-
case let .cancelTaskAndUpstreamContinuations(
221-
task,
222-
upstreamContinuations
223-
):
224-
upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) }
225-
task.cancel()
226-
227-
break loop
228-
case .none:
229-
230-
break loop
231-
}
232-
}
233-
}
234-
}
235-
236-
// Copy from the above just using the base2 sequence
237-
group.addTask {
238-
var iterator2 = base2.makeAsyncIterator()
239-
240-
// This is our upstream consumption loop
241-
loop: while true {
242-
// We are creating a continuation before requesting the next
243-
// element from upstream. This continuation is only resumed
244-
// if the downstream consumer called `next` to signal his demand.
245-
try await withUnsafeThrowingContinuation { continuation in
246-
let action = self.lock.withLock {
247-
self.stateMachine.childTaskSuspended(continuation)
248-
}
249-
250-
switch action {
251-
case let .resumeContinuation(continuation):
252-
// This happens if there is outstanding demand
253-
// and we need to demand from upstream right away
254-
continuation.resume(returning: ())
255-
256-
case let .resumeContinuationWithError(continuation, error):
257-
// This happens if another upstream already failed or if
258-
// the task got cancelled.
259-
continuation.resume(throwing: error)
260-
261-
case .none:
262-
break
263-
}
264-
}
265-
266-
// We got signalled from the downstream that we have demand so let's
267-
// request a new element from the upstream
268-
if let element2 = try await iterator2.next() {
269-
let action = self.lock.withLock {
270-
self.stateMachine.elementProduced(element2)
271-
}
272-
273-
switch action {
274-
case let .resumeContinuation(continuation, element):
275-
// We had an outstanding demand and where the first
276-
// upstream to produce an element so we can forward it to
277-
// the downstream
278-
continuation.resume(returning: element)
279-
280-
case .none:
281-
break
282-
}
283-
284-
} else {
285-
// The upstream returned `nil` which indicates that it finished
286-
let action = self.lock.withLock {
287-
self.stateMachine.upstreamFinished()
288-
}
289-
290-
// All of this is mostly cleanup around the Task and the outstanding
291-
// continuations used for signalling.
292-
switch action {
293-
case let .resumeContinuationWithNilAndCancelTaskAndUpstreamContinuations(
294-
downstreamContinuation,
295-
task,
296-
upstreamContinuations
297-
):
298-
upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) }
299-
task.cancel()
300-
301-
downstreamContinuation.resume(returning: nil)
302-
303-
break loop
304-
305-
case let .cancelTaskAndUpstreamContinuations(
306-
task,
307-
upstreamContinuations
308-
):
309-
upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) }
310-
task.cancel()
311-
312-
break loop
313-
case .none:
314-
315-
break loop
316-
}
317-
}
318-
}
319-
}
150+
self.iterateAsyncSequence(base1, in: &group)
151+
self.iterateAsyncSequence(base2, in: &group)
320152

321153
// Copy from the above just using the base3 sequence
322154
if let base3 = base3 {
323-
group.addTask {
324-
var iterator3 = base3.makeAsyncIterator()
325-
326-
// This is our upstream consumption loop
327-
loop: while true {
328-
// We are creating a continuation before requesting the next
329-
// element from upstream. This continuation is only resumed
330-
// if the downstream consumer called `next` to signal his demand.
331-
try await withUnsafeThrowingContinuation { continuation in
332-
let action = self.lock.withLock {
333-
self.stateMachine.childTaskSuspended(continuation)
334-
}
335-
336-
switch action {
337-
case let .resumeContinuation(continuation):
338-
// This happens if there is outstanding demand
339-
// and we need to demand from upstream right away
340-
continuation.resume(returning: ())
341-
342-
case let .resumeContinuationWithError(continuation, error):
343-
// This happens if another upstream already failed or if
344-
// the task got cancelled.
345-
continuation.resume(throwing: error)
346-
347-
case .none:
348-
break
349-
}
350-
}
351-
352-
// We got signalled from the downstream that we have demand so let's
353-
// request a new element from the upstream
354-
if let element3 = try await iterator3.next() {
355-
let action = self.lock.withLock {
356-
self.stateMachine.elementProduced(element3)
357-
}
358-
359-
switch action {
360-
case let .resumeContinuation(continuation, element):
361-
// We had an outstanding demand and where the first
362-
// upstream to produce an element so we can forward it to
363-
// the downstream
364-
continuation.resume(returning: element)
365-
366-
case .none:
367-
break
368-
}
369-
370-
} else {
371-
// The upstream returned `nil` which indicates that it finished
372-
let action = self.lock.withLock {
373-
self.stateMachine.upstreamFinished()
374-
}
375-
376-
// All of this is mostly cleanup around the Task and the outstanding
377-
// continuations used for signalling.
378-
switch action {
379-
case let .resumeContinuationWithNilAndCancelTaskAndUpstreamContinuations(
380-
downstreamContinuation,
381-
task,
382-
upstreamContinuations
383-
):
384-
upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) }
385-
task.cancel()
386-
387-
downstreamContinuation.resume(returning: nil)
388-
389-
break loop
390-
391-
case let .cancelTaskAndUpstreamContinuations(
392-
task,
393-
upstreamContinuations
394-
):
395-
upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) }
396-
task.cancel()
397-
398-
break loop
399-
case .none:
400-
401-
break loop
402-
}
403-
}
404-
}
405-
}
155+
self.iterateAsyncSequence(base3, in: &group)
406156
}
407157

408158
while !group.isEmpty {
@@ -444,5 +194,95 @@ final class MergeStorage<
444194
// We need to inform our state machine that we started the Task
445195
stateMachine.taskStarted(task)
446196
}
447-
}
448197

198+
private func iterateAsyncSequence<AsyncSequence: _Concurrency.AsyncSequence>(
199+
_ base: AsyncSequence,
200+
in taskGroup: inout ThrowingTaskGroup<Void, Error>
201+
) where AsyncSequence.Element == Base1.Element {
202+
// For each upstream sequence we are adding a child task that
203+
// is consuming the upstream sequence
204+
taskGroup.addTask {
205+
var iterator = base.makeAsyncIterator()
206+
207+
// This is our upstream consumption loop
208+
loop: while true {
209+
// We are creating a continuation before requesting the next
210+
// element from upstream. This continuation is only resumed
211+
// if the downstream consumer called `next` to signal his demand.
212+
try await withUnsafeThrowingContinuation { continuation in
213+
let action = self.lock.withLock {
214+
self.stateMachine.childTaskSuspended(continuation)
215+
}
216+
217+
switch action {
218+
case let .resumeContinuation(continuation):
219+
// This happens if there is outstanding demand
220+
// and we need to demand from upstream right away
221+
continuation.resume(returning: ())
222+
223+
case let .resumeContinuationWithError(continuation, error):
224+
// This happens if another upstream already failed or if
225+
// the task got cancelled.
226+
continuation.resume(throwing: error)
227+
228+
case .none:
229+
break
230+
}
231+
}
232+
233+
// We got signalled from the downstream that we have demand so let's
234+
// request a new element from the upstream
235+
if let element1 = try await iterator.next() {
236+
let action = self.lock.withLock {
237+
self.stateMachine.elementProduced(element1)
238+
}
239+
240+
switch action {
241+
case let .resumeContinuation(continuation, element):
242+
// We had an outstanding demand and where the first
243+
// upstream to produce an element so we can forward it to
244+
// the downstream
245+
continuation.resume(returning: element)
246+
247+
case .none:
248+
break
249+
}
250+
251+
} else {
252+
// The upstream returned `nil` which indicates that it finished
253+
let action = self.lock.withLock {
254+
self.stateMachine.upstreamFinished()
255+
}
256+
257+
// All of this is mostly cleanup around the Task and the outstanding
258+
// continuations used for signalling.
259+
switch action {
260+
case let .resumeContinuationWithNilAndCancelTaskAndUpstreamContinuations(
261+
downstreamContinuation,
262+
task,
263+
upstreamContinuations
264+
):
265+
upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) }
266+
task.cancel()
267+
268+
downstreamContinuation.resume(returning: nil)
269+
270+
break loop
271+
272+
case let .cancelTaskAndUpstreamContinuations(
273+
task,
274+
upstreamContinuations
275+
):
276+
upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) }
277+
task.cancel()
278+
279+
break loop
280+
case .none:
281+
282+
break loop
283+
}
284+
}
285+
}
286+
}
287+
}
288+
}

0 commit comments

Comments
 (0)