Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Channel] re-implement using a state machine #235

Merged
merged 1 commit into from
Jan 12, 2023

Conversation

twittemb
Copy link
Contributor

@twittemb twittemb commented Nov 20, 2022

Hi

This PR suggests a new implementation for AsyncChannel. The idea was to:

  • streamline the implementation using the same paradigm (state machine/storage) as the recently remade operators (merge, zip, ...)
  • share some code between AsyncChannel and AsyncThrowingChannel since the state machine is the same
  • eventually improve the performance (I've seen a 20% increase in throughput)

Some remarks:

  • I've added a throughput measurement
  • I think there is a bug in the old/current implementation of AsyncChannel because when I tried to run it with the new throughput measurement I had a crash from time to time
  • with this new implementation we could imagine to split the producer from the consumer (same as what @FranzBusch pitched here https://forums.swift.org/t/pitch-convenience-async-throwing-stream-makestream-methods/61030 for AsyncStream). We just have to reference the same storage from an hypothetical AsyncChannel.Continuation and the AsyncChannel it self.

@phausler @FranzBusch what do you think ?

@phausler
Copy link
Member

The perf sounds fantastic! I will make sure to go over this with a fine tooth comb Monday.

@twittemb
Copy link
Contributor Author

twittemb commented Nov 21, 2022

For the record, I've used ad-hoc properties (hidden behind #if DEBUG instructions) so we can test the suspend/resume mechanism more precisely, particularly for task cancellation. I know this is not an ideal solution but I can't see anything better right now.

I guess this is a part of the resolution of #148

@FranzBusch
Copy link
Member

Currently on vacation but will try to give this a look soon! I think @phausler has some ideas around testing that doesn’t involve scattering debug only code.

@eaigner
Copy link

eaigner commented Dec 16, 2022

Just FYI, your bug is probably that you increment generation += 1. In swift this does not automatically overflow, but will crash if you reach Int.max.

@twittemb twittemb closed this Dec 16, 2022
@twittemb twittemb reopened this Dec 16, 2022
@phausler
Copy link
Member

Cancellation complicates things a decent amount; accounting for all the states and their cancellation counterparts ends up being roughly similar. The advantage with state machines is that the states are well known about which transition it can be and explicitly calls those out - so for subtile state manipulation that can cross boundaries of tasks it is useful to have a bit more # of lines of code (without much perf impact) to build known state transition clarity.

@FranzBusch
Copy link
Member

Agree with @phausler here LoC is not a good measurement here. AsyncChannel has some inherent complexities and state machines just surface the various transition edges way better.

Copy link
Member

@FranzBusch FranzBusch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some comments here! In general, I like the switch to a state machine it makes it very clear what is going on

@twittemb
Copy link
Contributor Author

Left some comments here! In general, I like the switch to a state machine it makes it very clear what is going on

Thanks for your review … I‘ll check it ASAP

@twittemb twittemb force-pushed the feature/async-channel branch 4 times, most recently from e4edbf0 to 1fcb631 Compare December 24, 2022 14:41
@twittemb
Copy link
Contributor Author

Left some comments here! In general, I like the switch to a state machine it makes it very clear what is going on

Hi @FranzBusch I've pushed a version with pretty much all the comments addressed.

@twittemb twittemb force-pushed the feature/async-channel branch from 1fcb631 to de3eb0f Compare January 10, 2023 09:43
@twittemb twittemb requested review from FranzBusch and phausler and removed request for FranzBusch and phausler January 10, 2023 09:43
Copy link
Member

@FranzBusch FranzBusch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks really good already. Left some more comments inline but we are getting close here! Thanks for all the work

case .terminated(.finished):
return .resumeConsumer(element: nil)

case .terminated(.failed(let error)):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that this is aligned with the rule that next() should throw an error once and after that it should return nil. This code right now throws the same error over and over again. Implementing the expected behaviour is going to get interesting since it means that we have to keep track of each iterator and what they saw. Maybe we can achieve this by creating a single ID for an iterator that we keep inside the iterator and use for every next() call on the state machine instead of generating an ID per next call.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not exactly what happens. When the state is .finished(.failure) and there are some suspended consumers then they all receive the failure and the state is set to .terminated(.finished). The subsequent call to next will receive “nil”.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@FranzBusch, @phausler before making any changes I'd like to be sure of the expected behaviour. I think that what I have now is aligned with the previous implementation of AsyncChannel.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I am misreading the code here as well. If the behaviour is the same then it should be good!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the unit test that checks that:

  func test_asyncThrowingChannel_resumes_producers_and_discards_additional_elements_when_fail_is_called() async throws {
 // Given: an AsyncThrowingChannel
 let sut = AsyncThrowingChannel<Int, Error>()

 // Given: 2 suspended send operations
 let task1 = Task {
   await sut.send(1)
 }

 let task2 = Task {
   await sut.send(2)
 }

 // When: failing the channel
 sut.fail(Failure())

 // Then: the send operations are resumed
 _ = await (task1.value, task2.value)

 // When: sending an extra value
 await sut.send(3)

 // Then: the send operation is resumed
 // Then: the iteration is resumed with a failure
 var collected = [Int]()
 do {
   for try await element in sut {
     collected.append(element)
   }
 } catch {
   XCTAssertTrue(collected.isEmpty)
   XCTAssertEqual(error as? Failure, Failure())
 }

 // When: requesting a next value
 var iterator = sut.makeAsyncIterator()
 let pastFailure = try await iterator.next()

 // Then: the past failure is nil
 XCTAssertNil(pastFailure)
}

Copy link
Contributor Author

@twittemb twittemb Jan 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep

func test_asyncThrowingChannel_resumes_consumers_when_fail_is_called() async throws {
  // Given: an AsyncThrowingChannel
  let sut = AsyncThrowingChannel<Int, Error>()

  // Given: 2 suspended iterations
  let task1 = Task<Int?, Error> {
    var iterator = sut.makeAsyncIterator()
    return try await iterator.next()
  }

  let task2 = Task<Int?, Error> {
    var iterator = sut.makeAsyncIterator()
    return try await iterator.next()
  }

  // When: failing the channel
  sut.fail(Failure())

  // Then: the iterations are resumed with the error
  do {
    _ = try await (task1.value, task2.value)
  } catch {
    XCTAssertEqual(error as? Failure, Failure())
  }

  // When: requesting a next value
  var iterator = sut.makeAsyncIterator()
  let pastFailure = try await iterator.next()

  // Then: the past failure is nil
  XCTAssertNil(pastFailure)
}

[UPDATE] although I could improve it a bit by asserting both task1 and task2 will fail independently ! hold on ...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm that's not really testing the same. I would like to see that test but with the same iterator calling next twice. Like this

  let task1 = Task<Int?, Error> {
    var iterator = sut.makeAsyncIterator()
    try await iterator.next() // Need to catch the values of both next and return them in an array to assert
    try await iterator.next()
  }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok.

Is that ok?

  func test_asyncThrowingChannel_resumes_consumer_when_fail_is_called() async throws {
    // Given: an AsyncThrowingChannel
    let sut = AsyncThrowingChannel<Int, Error>()

    // Given: suspended iteration
    let task = Task<Int?, Error> {
      var iterator = sut.makeAsyncIterator()

      do {
        _ = try await iterator.next()
      } catch {
        XCTAssertEqual(error as? Failure, Failure())
      }

      return try await iterator.next()
    }

    // When: failing the channel
    sut.fail(Failure())

    // Then: the iterations are resumed with the error and the next element is nil
    do {
      let collected = try await task.value
      XCTAssertNil(collected)
    } catch {
      XCTFail("The task should not fail, the past failure element should be nil, not a failure.")
    }
  }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think almost, just one slight change

  func test_asyncThrowingChannel_resumes_consumer_when_fail_is_called() async throws {
    // Given: an AsyncThrowingChannel
    let sut = AsyncThrowingChannel<Int, Error>()

    // Given: suspended iteration
    let task = Task<Int?, Error> {
      var iterator = sut.makeAsyncIterator()

      do {
        _ = try await iterator.next()
        XCTFail("We expect the above call to throw")
      } catch {
        XCTAssertEqual(error as? Failure, Failure())
      }

      return try await iterator.next()
    }

    // When: failing the channel
    sut.fail(Failure())

    // Then: the iterations are resumed with the error and the next element is nil
    do {
      let collected = try await task.value
      XCTAssertNil(collected)
    } catch {
      XCTFail("The task should not fail, the past failure element should be nil, not a failure.")
    }
  }

Would be great if you could add that test

Copy link
Contributor Author

@twittemb twittemb Jan 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to do a few tweaks in the state machine to make it pass ... nice catch.
I've pushed the updated version.

@twittemb
Copy link
Contributor Author

twittemb commented Jan 11, 2023

Looks really good already. Left some more comments inline but we are getting close here! Thanks for all the work

Thanks for the review. I'll address the comments later today or tomorrow.

Copy link
Member

@phausler phausler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall this looks pretty good; let me know when you are ready to land it.

@twittemb
Copy link
Contributor Author

Overall this looks pretty good; let me know when you are ready to land it.

Thanks for the review, I think I'll push something tomorrow.

@twittemb twittemb force-pushed the feature/async-channel branch from d7239ba to c52e06d Compare January 11, 2023 19:05
@twittemb twittemb force-pushed the feature/async-channel branch from c52e06d to c67b934 Compare January 12, 2023 09:41
@twittemb
Copy link
Contributor Author

twittemb commented Jan 12, 2023

Overall this looks pretty good; let me know when you are ready to land it.

I've addressed all the comments (minus one for the throwing behaviour).

In the end I can confirm a 20% increase in perfs + a more reliable implementation (the current one crashes from time to time during the throughput measurement)

@twittemb twittemb force-pushed the feature/async-channel branch from c67b934 to 446e0a2 Compare January 12, 2023 10:28
@twittemb twittemb requested a review from FranzBusch January 12, 2023 10:53
@twittemb twittemb force-pushed the feature/async-channel branch from 446e0a2 to 6afcc8c Compare January 12, 2023 11:19
Copy link
Member

@FranzBusch FranzBusch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One last nit, but looks good to me now! Thanks for all the work!

@twittemb twittemb force-pushed the feature/async-channel branch from 6afcc8c to 3b66bb6 Compare January 12, 2023 12:54
@twittemb
Copy link
Contributor Author

twittemb commented Jan 12, 2023

One last nit, but looks good to me now! Thanks for all the work!

Thanks a lot for your review.

@phausler ready to merge 👍.

@phausler phausler merged commit 0ebc805 into apple:main Jan 12, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants