Skip to content

Fix memory streams incorrectly raising cancelled when *_nowait() is called immediately after cancelling send()/receive() #729

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions docs/versionhistory.rst
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,20 @@ This library adheres to `Semantic Versioning 2.0 <http://semver.org/>`_.
- Emit a ``ResourceWarning`` for ``MemoryObjectReceiveStream`` and
``MemoryObjectSendStream`` that were garbage collected without being closed (PR by
Andrey Kazantcev)
- Fixed memory object stream operations incorrectly raising cancelled under certain
conditions where it is too late to do so:

- Fixed memory object streams dropping items when
``MemoryObjectSendStream.send_nowait()`` was called immediately after cancelling the
scope of an ``await MemoryObjectReceiveStream.receive()`` call (`#728
<https://github.com/agronholm/anyio/issues/728>`_)

- Fixed ``MemoryObjectSendStream.send()`` raising cancelled despite succeeding when
``MemoryObjectReceiveStream.receive_nowait()`` is called immediately after
cancelling the scope of the ``MemoryObjectSendStream.send()`` call (`#729
<https://github.com/agronholm/anyio/issues/729>`_)

(PR by Ganden Schaffner)

**4.3.0**

Expand Down
12 changes: 12 additions & 0 deletions src/anyio/streams/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
ClosedResourceError,
EndOfStream,
WouldBlock,
get_cancelled_exc_class,
)
from ..abc import Event, ObjectReceiveStream, ObjectSendStream
from ..lowlevel import checkpoint
Expand Down Expand Up @@ -104,6 +105,11 @@ async def receive(self) -> T_co:

try:
await receive_event.wait()
except get_cancelled_exc_class():
# Ignore the immediate cancellation if we already received an item, so
# as not to lose it
if not container:
raise
finally:
self._state.waiting_receivers.pop(receive_event, None)

Expand Down Expand Up @@ -230,6 +236,12 @@ async def send(self, item: T_contra) -> None:
self._state.waiting_senders[send_event] = item
try:
await send_event.wait()
except get_cancelled_exc_class():
# Ignore the immediate cancellation if we already sent the item, so as
# to not indicate failure despite success
if send_event in self._state.waiting_senders:
del self._state.waiting_senders[send_event]
raise
Comment on lines +240 to +244
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be fine when the cancellation originates from a cancel scope, but not when a task was natively cancelled (Task.cancel(), because the originator of that cancellation expects the cancellation to be acted on and not ignored). Doing this will lead to hangs.

except BaseException:
self._state.waiting_senders.pop(send_event, None)
raise
Expand Down
128 changes: 112 additions & 16 deletions tests/streams/test_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
fail_after,
wait_all_tasks_blocked,
)
from anyio.abc import ObjectReceiveStream, ObjectSendStream
from anyio.abc import ObjectReceiveStream, ObjectSendStream, TaskStatus
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream

if sys.version_info < (3, 11):
Expand Down Expand Up @@ -298,34 +298,130 @@ async def receiver() -> None:
receive.close()


async def test_cancel_during_receive() -> None:
async def test_cancel_during_receive_after_send_nowait() -> None:
"""
Test that cancelling a pending receive() operation does not cause an item in the
stream to be lost.
Test that cancelling a pending receive() operation immediately after an item has
been sent to that receiver does not cause the item to be lost.

"""
receiver_scope = None

async def scoped_receiver() -> None:
nonlocal receiver_scope
async def scoped_receiver(task_status: TaskStatus[CancelScope]) -> None:
with CancelScope() as receiver_scope:
task_status.started(receiver_scope)
received.append(await receive.receive())

assert receiver_scope.cancel_called

received: list[str] = []
send, receive = create_memory_object_stream[str]()
async with create_task_group() as tg:
tg.start_soon(scoped_receiver)
await wait_all_tasks_blocked()
send.send_nowait("hello")
assert receiver_scope is not None
receiver_scope.cancel()
with send, receive:
async with create_task_group() as tg:
receiver_scope = await tg.start(scoped_receiver)
await wait_all_tasks_blocked()
send.send_nowait("hello")
receiver_scope.cancel()

assert received == ["hello"]
assert received == ["hello"]

send.close()
receive.close()

async def test_cancel_during_receive_before_send_nowait() -> None:
"""
Test that cancelling a pending receive() operation immediately before an item is
sent to that receiver does not cause the item to be lost.

Note: AnyIO's memory stream behavior here currently differs slightly from Trio's
memory channel behavior. Neither will lose items in this case, but Trio's memory
channels use abort_fn to have an extra stage during cancellation delivery, so with a
Trio memory channel send_nowait() will raise WouldBlock even if the receive()
operation has not raised Cancelled yet. This test is intended only as a regression
test for the bug where AnyIO dropped items in this situation; addressing the
(possible) issue where AnyIO behaves slightly differently from Trio in this
situation (in terms of when cancellation is delivered) will involve modifying this
test. See #728.

"""

async def scoped_receiver(task_status: TaskStatus[CancelScope]) -> None:
with CancelScope() as receiver_scope:
task_status.started(receiver_scope)
received.append(await receive.receive())

assert receiver_scope.cancel_called

received: list[str] = []
send, receive = create_memory_object_stream[str]()
with send, receive:
async with create_task_group() as tg:
receiver_scope = await tg.start(scoped_receiver)
await wait_all_tasks_blocked()
receiver_scope.cancel()
send.send_nowait("hello")

assert received == ["hello"]


async def test_cancel_during_send_after_receive_nowait() -> None:
"""
Test that cancelling a pending send() operation immediately after its item has been
received does not cause send() to raise cancelled after successfully sending the
item.

"""
sender_woke = False

async def scoped_sender(task_status: TaskStatus[CancelScope]) -> None:
nonlocal sender_woke
with CancelScope() as sender_scope:
task_status.started(sender_scope)
await send.send("hello")
sender_woke = True

send, receive = create_memory_object_stream[str]()
with send, receive:
async with create_task_group() as tg:
sender_scope = await tg.start(scoped_sender)
await wait_all_tasks_blocked()
assert receive.receive_nowait() == "hello"
sender_scope.cancel()

assert sender_woke


async def test_cancel_during_send_before_receive_nowait() -> None:
"""
Test that cancelling a pending send() operation immediately before its item is
received does not cause send() to raise cancelled after successfully sending the
item.

Note: AnyIO's memory stream behavior here currently differs slightly from Trio's
memory channel behavior. Neither will allow send() to successfully send an item but
still raise cancelled after, but Trio's memory channels use abort_fn to have an
extra stage during cancellation delivery, so with a Trio memory channel
receive_nowait() will raise WouldBlock even if the send() operation has not raised
Cancelled yet. This test is intended only as a regression test for the bug where
send() incorrectly raised cancelled in this situation; addressing the (possible)
issue where AnyIO behaves slightly differently from Trio in this situation (in terms
of when cancellation is delivered) will involve modifying this test. See #728.

"""
sender_woke = False

async def scoped_sender(task_status: TaskStatus[CancelScope]) -> None:
nonlocal sender_woke
with CancelScope() as sender_scope:
task_status.started(sender_scope)
await send.send("hello")
sender_woke = True

send, receive = create_memory_object_stream[str]()
with send, receive:
async with create_task_group() as tg:
sender_scope = await tg.start(scoped_sender)
await wait_all_tasks_blocked()
sender_scope.cancel()
assert receive.receive_nowait() == "hello"

assert sender_woke


async def test_close_receive_after_send() -> None:
Expand Down
19 changes: 19 additions & 0 deletions tests/test_synchronization.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,25 @@ async def setter() -> None:
assert setter_started
assert waiter_woke

async def test_event_wait_before_cancel_before_set(self) -> None:
setter_started = waiter_woke = False

async def setter() -> None:
nonlocal setter_started
setter_started = True
assert not event.is_set()
tg.cancel_scope.cancel()
event.set()

event = Event()
async with create_task_group() as tg:
tg.start_soon(setter)
await event.wait()
waiter_woke = True

assert setter_started
assert not waiter_woke

async def test_statistics(self) -> None:
async def waiter() -> None:
await event.wait()
Expand Down
Loading