Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix ToObservable operator can throw unhandled exception #2172

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

fedeAlterio
Copy link

@fedeAlterio fedeAlterio commented Oct 11, 2024

Fixes #1677
The issue was that exceptions thrown by GetAsyncEnumerator() were not being propagated to observer.OnError(), resulting in unhandled exceptions. This has been addressed by wrapping the GetAsyncEnumerator() call in a try-catch block

The same issue appears also on DisposeAsync(). This is more complex, because we cannot rely on the observer anymore, because it is already completed at that point.
Currently the exceptioncauses the process to crash directly. In my opinion this is bad since there is nothing that warns the caller about that, and also it does not offer any solution.

Observable.FromAsync already "solves" this problem, so I used the same approach here, creating an overload where is possible to specify if suppress the exception or not. If not, the exception would be routed to TaskScheduler.UnobservedTaskException, rather that be rethrown on the ThreadPool / Synchronization Context directly (causing with 99% probability the process to crash)

@fedeAlterio
Copy link
Author

@dotnet-policy-service agree

@fedeAlterio
Copy link
Author

Hi @idg10,

Apologies for the ping! Could you take a look at the fix when you have a chance? This bug is quite severe as it's causing the process to completely shut down. We’ve implemented internally a custom alternative as a .ToObservableSafe(), but it’s challenging to enforce this version consistently due to the presence of .ToObservable within System.Linq that seems really authoritative.
Thank you very much for your help!

@idg10
Copy link
Collaborator

idg10 commented Dec 6, 2024

Sorry for the delay - many of us at endjin have been unwell, and since nobody pays us to maintain Rx (and we only maintain System.Linq.Async because for historical reasons, it lives in the same repo) it tends to drop to the back of the queue under these circumstances.

Looking at this, I think there are 4 obvious options when GetAsyncEnumerator throws:

  1. terminate the process abruptly (not good, but apparently the current behaviour)
  2. throw the exception out of the call to Subscribe
  3. forward the exception to the subscriber's OnError
  4. quietly swallow the exceptions.

Although 2 is arguably logically like the right thing to do (because this is a case of failing even to get started) in practice this would involve sync-over-async, so we can't do that. For AsyncRx, SubscribeAsync could actually do this, but then you'd have a problem in which the two libraries ended up handling the same error quite differently.

So it does seem like 3 is the only reasonable option left if we want to report the exception. I see you've also implemented 4, allowing people to say they'd like exceptions to be swallowed. That's the one part of this I'm not sure about. I'm glad you've not made it the default, but I'm not sure whether we really want this at all. It seems to enable a bad practice.

What was your rationale for allowing ignoreExceptionsAfterUnsubscribe: true?

@fedeAlterio
Copy link
Author

Thank you for your answer.
I was trying to be consistent with Observable.FromAsync() behavior, but maybe i misunderstood it: https://github.com/dotnet/reactive/blob/7259cabf5456a21462caea8cb81da8e5d5405db2/Rx.NET/Source/src/System.Reactive/Concurrency/TaskObservationOptions.cs#L27C4-L50C28

Maybe we could just handle DisposeAsync exceptions like other exceptions? So invoking observer.OnError if the cancellation token is not canceled yet?

@idg10
Copy link
Collaborator

idg10 commented Dec 10, 2024

Ah yes, I remember that FromAsync change now. With that one, we added the flag so that the existing behaviour would continue to occur unless you asked for the new behaviour - it was essentially there as a backwards compatibility feature.

So firstly, I think that in the case where a subscriber is still attached we definitely want to report it to OnError. (And I believe that's what you've done.)

I'm now looking at this code and I'm a bit confused:

try
{
    e = _source.GetAsyncEnumerator(ctd.Token);
}
catch (Exception ex)
{
    if (!ctd.Token.IsCancellationRequested)
    {
        observer.OnError(ex);
    }

    return;
}

If if block. of ctd.Token.IsCancellationRequested is true, that means the called has unsubscribed. So in that case, we are definitely "after unsubscription" and it looks like we just swallow the exception in that case. But I'm not sure this can ever actually occur: this async method hasn't done an await yet, and GetAsyncEnumerator itself is not async, so I don't think there's any way we can have returned the CancellableDisposable to the called by the time we reach that if can we?

So I think we'd want to make the call to OnError unconditional wouldn't we?

But if I'm wrong about that, that implies that this is a second "after unsubscription" case, in which case we'd need to make this also pay attention to that 'ignore' flag.

@fedeAlterio
Copy link
Author

I agree that the check for unsubscription is useless if the exception is thrown synchronously. Thank you!

About that ignoreExceptionsAfterUnsubscribe flag, I think there are 2 paths:

  • We can remove it completely, swallowing any exception in case the unsubscription already happened (This would be similar to the current implementation)
  • We can add it

If we choose the latter, in the case the subscription is already disposed:

  • If the flag is falsemaybe we should rethrow all the exceptions (throw instead return in every catch clause). The Core method should also be a ValueTask so that exceptions will not cause the process to crash, but will be routed to TaskScheduler.UnobservedTaskException
  • if it is true, we should just return (similarly at what happens now)

Consider also that following that approach, if false is the default, we would be inserting a breaking change (TaskScheduler.UnobservedTaskException will now be called)

Also note with the current implementation, the process crashes also if any exception is thrown by DisposeAsync()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

The ToObservable operator can throw unhandled exception
2 participants