Skip to content

Commit 6b0a1f3

Browse files
Added an alternate fix for MemoryObjectReceiveStream.receive()` on asyncio (agronholm#595)
Co-authored-by: Ganden Schaffner <[email protected]>
1 parent 6307392 commit 6b0a1f3

File tree

4 files changed

+28
-8
lines changed

4 files changed

+28
-8
lines changed

Diff for: docs/versionhistory.rst

+5
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ This library adheres to `Semantic Versioning 2.0 <http://semver.org/>`_.
1616
tasks were spawned and an outer cancellation scope had been cancelled before
1717
- Ensured that exiting a ``TaskGroup`` always hits a yield point, regardless of
1818
whether there are running child tasks to be waited on
19+
- On asyncio, cancel scopes will defer cancelling tasks that are scheduled to resume
20+
with a finished future
1921
- Task groups on all backends now raise a single cancellation exception when an outer
2022
cancel scope is cancelled, and no exceptions other than cancellation exceptions are
2123
raised in the group
@@ -40,6 +42,9 @@ This library adheres to `Semantic Versioning 2.0 <http://semver.org/>`_.
4042
scope suppressed a cancellation exception
4143
- Fixed ``fail_after()`` raising an unwarranted ``TimeoutError`` when the cancel scope
4244
was cancelled before reaching its deadline
45+
- Fixed ``MemoryObjectReceiveStream.receive()`` causing the receiving task on asyncio to
46+
remain in a cancelled state if the operation was cancelled after an item was queued to
47+
be received by the task (but before the task could actually receive the item)
4348
- Removed unnecessary extra waiting cycle in ``Event.wait()`` on asyncio in the case
4449
where the event was not yet set
4550

Diff for: src/anyio/_backends/_asyncio.py

+4-2
Original file line numberDiff line numberDiff line change
@@ -468,8 +468,10 @@ def _deliver_cancellation(self) -> None:
468468
if task is not current and (
469469
task is self._host_task or _task_started(task)
470470
):
471-
self._cancel_calls += 1
472-
task.cancel()
471+
waiter = task._fut_waiter # type: ignore[attr-defined]
472+
if not isinstance(waiter, asyncio.Future) or not waiter.done():
473+
self._cancel_calls += 1
474+
task.cancel()
473475

474476
# Schedule another callback if there are still tasks left
475477
if should_retry:

Diff for: src/anyio/streams/memory.py

-6
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
ClosedResourceError,
1111
EndOfStream,
1212
WouldBlock,
13-
get_cancelled_exc_class,
1413
)
1514
from ..abc import Event, ObjectReceiveStream, ObjectSendStream
1615
from ..lowlevel import checkpoint
@@ -104,11 +103,6 @@ async def receive(self) -> T_co:
104103

105104
try:
106105
await receive_event.wait()
107-
except get_cancelled_exc_class():
108-
# Ignore the immediate cancellation if we already received an item, so
109-
# as not to lose it
110-
if not container:
111-
raise
112106
finally:
113107
self._state.waiting_receivers.pop(receive_event, None)
114108

Diff for: tests/test_synchronization.py

+19
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,25 @@ async def task() -> None:
172172
assert task_started
173173
assert not event_set
174174

175+
async def test_event_wait_before_set_before_cancel(self) -> None:
176+
setter_started = waiter_woke = False
177+
178+
async def setter() -> None:
179+
nonlocal setter_started
180+
setter_started = True
181+
assert not event.is_set()
182+
event.set()
183+
tg.cancel_scope.cancel()
184+
185+
event = Event()
186+
async with create_task_group() as tg:
187+
tg.start_soon(setter)
188+
await event.wait()
189+
waiter_woke = True
190+
191+
assert setter_started
192+
assert waiter_woke
193+
175194
async def test_statistics(self) -> None:
176195
async def waiter() -> None:
177196
await event.wait()

0 commit comments

Comments
 (0)