Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bpo-43352: Add a Barrier object in asyncio lib #24903

Merged
merged 101 commits into from
Mar 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
101 commits
Select commit Hold shift + click to select a range
2cd5ce0
Initial commit
YvesDup Mar 15, 2021
7dfa8ef
make patchcheck
YvesDup Mar 15, 2021
f19ea24
make patchcheck
YvesDup Mar 15, 2021
f4bb33a
Update test_locks.py
YvesDup Mar 16, 2021
5f14a7d
Update locks.py
YvesDup Mar 16, 2021
214fd35
Update test_locks.py
YvesDup Mar 16, 2021
619f60a
Update from patch
YvesDup Mar 16, 2021
04d4f69
remove cpython/doc/*.rst.bak
YvesDup Mar 31, 2021
c2e438a
📜🤖 Added by blurb_it.
blurb-it[bot] Mar 31, 2021
97598f3
Update 2021-03-31-15-22-45.bpo-43352.nSjMuE.rst
YvesDup Apr 2, 2021
dfea4c2
Update documentation about add of Barrier object
YvesDup Apr 11, 2021
8a7d7fa
Merge branch 'fix-issue-43352' of https://github.com/YvesDup/cpython …
YvesDup Apr 11, 2021
44fe563
Update asyncio-sync.rst
YvesDup Apr 30, 2021
e23c860
Update locks.py
YvesDup May 17, 2021
6a51dc6
Update test_locks.py
YvesDup May 17, 2021
04f72a3
Update asyncio-sync.rst
YvesDup May 17, 2021
7a00453
Update asyncio-sync.rst
YvesDup May 17, 2021
3517e33
Update asyncio-sync.rst
YvesDup May 18, 2021
c56e7c6
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup May 19, 2021
8715b34
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup May 19, 2021
d15b2a6
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup May 19, 2021
ed1b361
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup May 19, 2021
7504a8f
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup May 19, 2021
e3b23c2
Initial commit
YvesDup Mar 15, 2021
bc3d17a
make patchcheck
YvesDup Mar 15, 2021
a8b7b1a
make patchcheck
YvesDup Mar 15, 2021
32cbefb
Update test_locks.py
YvesDup Mar 16, 2021
e5002bc
Update locks.py
YvesDup Mar 16, 2021
b12599c
Update test_locks.py
YvesDup Mar 16, 2021
f12123e
Update from patch
YvesDup Mar 16, 2021
9ef7109
remove cpython/doc/*.rst.bak
YvesDup Mar 31, 2021
7b65fbc
📜🤖 Added by blurb_it.
blurb-it[bot] Mar 31, 2021
9b60d07
Update 2021-03-31-15-22-45.bpo-43352.nSjMuE.rst
YvesDup Apr 2, 2021
576cd53
Update documentation about add of Barrier object
YvesDup Apr 11, 2021
6106f21
Update asyncio-sync.rst
YvesDup Apr 30, 2021
0384561
Update locks.py
YvesDup May 17, 2021
16515ef
Update test_locks.py
YvesDup May 17, 2021
fa217fd
Update asyncio-sync.rst
YvesDup May 17, 2021
a97f4a3
Update asyncio-sync.rst
YvesDup May 17, 2021
b598470
Update asyncio-sync.rst
YvesDup May 18, 2021
3098105
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup May 19, 2021
2f55acc
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup May 19, 2021
f056865
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup May 19, 2021
d9008f9
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup May 19, 2021
71b923c
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup May 19, 2021
41bbbee
Merge remote-tracking branch 'upstream/main' into fix-issue-43352
YvesDup May 29, 2021
606a898
Merge branch 'fix-issue-43352' of github.com:YvesDup/cpython into fix…
YvesDup May 29, 2021
8e302d7
Merge remote-tracking branch 'upstream/main' into fix-issue-43352
YvesDup May 29, 2021
58226e0
Update Doc/library/asyncio-sync.rst
YvesDup Sep 9, 2021
0096cdc
Update Doc/library/asyncio-sync.rst
YvesDup Sep 9, 2021
f275a83
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Dec 3, 2021
e90d556
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Dec 3, 2021
408512d
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Dec 3, 2021
069ed28
Merge branch 'python:main' into fix-issue-43352
YvesDup Feb 18, 2022
b7bb530
Merge branch 'python:main' into fix-issue-43352
YvesDup Feb 21, 2022
49de223
bpo-43352: Add a Barrier object to asyncio synchronized primitives (…
YvesDup Feb 21, 2022
fa4623c
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Feb 22, 2022
20fb88e
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Feb 22, 2022
809de71
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Feb 22, 2022
28fc263
Merge branch 'python:main' into fix-issue-43352
YvesDup Feb 22, 2022
f66cd2c
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Feb 22, 2022
62272b3
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Feb 22, 2022
f7dbf9b
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Feb 25, 2022
cc64b69
Merge branch 'python:main' into fix-issue-43352
YvesDup Mar 7, 2022
28eba9b
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Mar 16, 2022
cca2918
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Mar 16, 2022
6d3806f
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Mar 16, 2022
a2467f5
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Mar 16, 2022
90e5d04
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Mar 16, 2022
bea7e7f
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Mar 16, 2022
e97d417
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Mar 17, 2022
b60a931
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Mar 17, 2022
34cfc75
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Mar 17, 2022
c7bbebe
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Mar 18, 2022
fe20bdc
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Mar 18, 2022
edfd7d4
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Mar 18, 2022
27e1768
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Mar 18, 2022
892f389
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Mar 18, 2022
5b8962a
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Mar 21, 2022
63e0688
Update Doc/library/asybpo-43352: Add a Barrier object to asyncio sync…
YvesDup Mar 21, 2022
eb42246
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Mar 21, 2022
e29d0c0
Update Doc/library/asyncio-sync.rstbpo-43352: Add a Barrier object to…
YvesDup Mar 21, 2022
6857cf2
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Mar 21, 2022
d4d5b55
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Mar 21, 2022
5bb6d7e
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Mar 21, 2022
66f023a
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Mar 21, 2022
e5d229c
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Mar 22, 2022
b0d17da
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Mar 22, 2022
1c16176
po-43352: Add a Barrier object to asyncio synchronized primitives (GH…
YvesDup Mar 22, 2022
151ca5d
po-43352: Add a Barrier object to asyncio synchronized primitives (GH…
YvesDup Mar 22, 2022
37efb91
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Mar 23, 2022
337189e
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Mar 23, 2022
ee59e3a
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Mar 23, 2022
df5bd42
Rewrite repr(), use enum for state, drop _count_blocked used by tests…
asvetlov Mar 24, 2022
f1acd93
Inline method
asvetlov Mar 24, 2022
1bb3297
Move BrokenBarrierError to exceptions.py
asvetlov Mar 24, 2022
d82f1a5
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Mar 25, 2022
f89dc5f
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Mar 25, 2022
896e274
Tune docs
asvetlov Mar 25, 2022
b7bdf40
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Mar 25, 2022
3c30290
Bump
asvetlov Mar 25, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions Doc/library/asyncio-api-index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,16 @@ Threading-like synchronization primitives that can be used in Tasks.
* - :class:`BoundedSemaphore`
- A bounded semaphore.

* - :class:`Barrier`
- A barrier object.


.. rubric:: Examples

* :ref:`Using asyncio.Event <asyncio_example_sync_event>`.

* :ref:`Using asyncio.Barrier <asyncio_example_barrier>`.

* See also the documentation of asyncio
:ref:`synchronization primitives <asyncio-sync>`.

Expand All @@ -206,6 +211,9 @@ Exceptions
* - :exc:`asyncio.CancelledError`
- Raised when a Task is cancelled. See also :meth:`Task.cancel`.

* - :exc:`asyncio.BrokenBarrierError`
- Raised when a Barrier is broken. See also :meth:`Barrier.wait`.


.. rubric:: Examples

Expand Down
110 changes: 110 additions & 0 deletions Doc/library/asyncio-sync.rst
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ asyncio has the following basic synchronization primitives:
* :class:`Condition`
* :class:`Semaphore`
* :class:`BoundedSemaphore`
* :class:`Barrier`


---------
Expand Down Expand Up @@ -340,6 +341,115 @@ BoundedSemaphore
.. versionchanged:: 3.10
Removed the *loop* parameter.


Barrier
=======

.. class:: Barrier(parties, action=None)

A barrier object. Not thread-safe.

A barrier is a simple synchronization primitive that allows to block until
*parties* number of tasks are waiting on it.
Tasks can wait on the :meth:`~Barrier.wait` method and would be blocked until
the specified number of tasks end up waiting on :meth:`~Barrier.wait`.
At that point all of the waiting tasks would unblock simultaneously.

:keyword:`async with` can be used as an alternative to awaiting on
:meth:`~Barrier.wait`.

The barrier can be reused any number of times.

.. _asyncio_example_barrier:

Example::

async def example_barrier():
# barrier with 3 parties
b = asyncio.Barrier(3)

# create 2 new waiting tasks
asyncio.create_task(b.wait())
asyncio.create_task(b.wait())

await asyncio.sleep(0)
print(b)

# The third .wait() call passes the barrier
await b.wait()
print(b)
print("barrier passed")

await asyncio.sleep(0)
print(b)

asyncio.run(example_barrier())

Result of this example is::

<asyncio.locks.Barrier object at 0x... [filling, waiters:2/3]>
<asyncio.locks.Barrier object at 0x... [draining, waiters:0/3]>
barrier passed
<asyncio.locks.Barrier object at 0x... [filling, waiters:0/3]>

.. versionadded:: 3.11

.. coroutinemethod:: wait()

Pass the barrier. When all the tasks party to the barrier have called
this function, they are all unblocked simultaneously.

When a waiting or blocked task in the barrier is cancelled,
this task exits the barrier which stays in the same state.
If the state of the barrier is "filling", the number of waiting task
decreases by 1.

The return value is an integer in the range of 0 to ``parties-1``, different
for each task. This can be used to select a task to do some special
housekeeping, e.g.::

...
async with barrier as position:
if position == 0:
# Only one task print this
print('End of *draining phasis*')

This method may raise a :class:`BrokenBarrierError` exception if the
barrier is broken or reset while a task is waiting.
It could raise a :exc:`CancelledError` if a task is cancelled.

.. coroutinemethod:: reset()

Return the barrier to the default, empty state. Any tasks waiting on it
will receive the :class:`BrokenBarrierError` exception.

If a barrier is broken it may be better to just leave it and create a new one.

.. coroutinemethod:: abort()

Put the barrier into a broken state. This causes any active or future
calls to :meth:`wait` to fail with the :class:`BrokenBarrierError`.
Use this for example if one of the taks needs to abort, to avoid infinite
waiting tasks.

.. attribute:: parties

The number of tasks required to pass the barrier.

.. attribute:: n_waiting

The number of tasks currently waiting in the barrier while filling.

.. attribute:: broken

A boolean that is ``True`` if the barrier is in the broken state.


.. exception:: BrokenBarrierError

This exception, a subclass of :exc:`RuntimeError`, is raised when the
:class:`Barrier` object is reset or broken.

---------


Expand Down
7 changes: 6 additions & 1 deletion Lib/asyncio/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
"""asyncio exceptions."""


__all__ = ('CancelledError', 'InvalidStateError', 'TimeoutError',
__all__ = ('BrokenBarrierError',
'CancelledError', 'InvalidStateError', 'TimeoutError',
'IncompleteReadError', 'LimitOverrunError',
'SendfileNotAvailableError')

Expand Down Expand Up @@ -55,3 +56,7 @@ def __init__(self, message, consumed):

def __reduce__(self):
return type(self), (self.args[0], self.consumed)


class BrokenBarrierError(RuntimeError):
"""Barrier is broken by barrier.abort() call."""
157 changes: 155 additions & 2 deletions Lib/asyncio/locks.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
"""Synchronization primitives."""

__all__ = ('Lock', 'Event', 'Condition', 'Semaphore', 'BoundedSemaphore')
__all__ = ('Lock', 'Event', 'Condition', 'Semaphore',
'BoundedSemaphore', 'Barrier')

import collections
import enum

from . import exceptions
from . import mixins


class _ContextManagerMixin:
async def __aenter__(self):
await self.acquire()
Expand Down Expand Up @@ -412,3 +413,155 @@ def release(self):
if self._value >= self._bound_value:
raise ValueError('BoundedSemaphore released too many times')
super().release()



class _BarrierState(enum.Enum):
FILLING = 'filling'
DRAINING = 'draining'
RESETTING = 'resetting'
BROKEN = 'broken'


class Barrier(mixins._LoopBoundMixin):
"""Asyncio equivalent to threading.Barrier

Implements a Barrier primitive.
Useful for synchronizing a fixed number of tasks at known synchronization
points. Tasks block on 'wait()' and are simultaneously awoken once they
have all made their call.
"""

def __init__(self, parties):
"""Create a barrier, initialised to 'parties' tasks."""
if parties < 1:
raise ValueError('parties must be > 0')

self._cond = Condition() # notify all tasks when state changes

self._parties = parties
self._state = _BarrierState.FILLING
self._count = 0 # count tasks in Barrier

def __repr__(self):
res = super().__repr__()
extra = f'{self._state.value}'
if not self.broken:
extra += f', waiters:{self.n_waiting}/{self.parties}'
return f'<{res[1:-1]} [{extra}]>'

async def __aenter__(self):
# wait for the barrier reaches the parties number
# when start draining release and return index of waited task
return await self.wait()

async def __aexit__(self, *args):
pass

async def wait(self):
"""Wait for the barrier.

When the specified number of tasks have started waiting, they are all
simultaneously awoken.
Returns an unique and individual index number from 0 to 'parties-1'.
"""
async with self._cond:
await self._block() # Block while the barrier drains or resets.
try:
index = self._count
self._count += 1
if index + 1 == self._parties:
# We release the barrier
await self._release()
else:
await self._wait()
return index
finally:
self._count -= 1
# Wake up any tasks waiting for barrier to drain.
self._exit()

async def _block(self):
# Block until the barrier is ready for us,
# or raise an exception if it is broken.
#
# It is draining or resetting, wait until done
# unless a CancelledError occurs
await self._cond.wait_for(
lambda: self._state not in (
_BarrierState.DRAINING, _BarrierState.RESETTING
)
)

# see if the barrier is in a broken state
if self._state is _BarrierState.BROKEN:
raise exceptions.BrokenBarrierError("Barrier aborted")

async def _release(self):
# Release the tasks waiting in the barrier.

# Enter draining state.
# Next waiting tasks will be blocked until the end of draining.
self._state = _BarrierState.DRAINING
self._cond.notify_all()

async def _wait(self):
# Wait in the barrier until we are released. Raise an exception
# if the barrier is reset or broken.

# wait for end of filling
# unless a CancelledError occurs
await self._cond.wait_for(lambda: self._state is not _BarrierState.FILLING)

if self._state in (_BarrierState.BROKEN, _BarrierState.RESETTING):
raise exceptions.BrokenBarrierError("Abort or reset of barrier")

def _exit(self):
# If we are the last tasks to exit the barrier, signal any tasks
# waiting for the barrier to drain.
if self._count == 0:
if self._state in (_BarrierState.RESETTING, _BarrierState.DRAINING):
self._state = _BarrierState.FILLING
self._cond.notify_all()

async def reset(self):
"""Reset the barrier to the initial state.

Any tasks currently waiting will get the BrokenBarrier exception
raised.
"""
async with self._cond:
if self._count > 0:
if self._state is not _BarrierState.RESETTING:
#reset the barrier, waking up tasks
self._state = _BarrierState.RESETTING
else:
self._state = _BarrierState.FILLING
self._cond.notify_all()

async def abort(self):
"""Place the barrier into a 'broken' state.

Useful in case of error. Any currently waiting tasks and tasks
attempting to 'wait()' will have BrokenBarrierError raised.
"""
async with self._cond:
self._state = _BarrierState.BROKEN
self._cond.notify_all()

@property
def parties(self):
"""Return the number of tasks required to trip the barrier."""
return self._parties

@property
def n_waiting(self):
"""Return the number of tasks currently waiting at the barrier."""
if self._state is _BarrierState.FILLING:
return self._count
return 0

@property
def broken(self):
"""Return True if the barrier is in a broken state."""
return self._state is _BarrierState.BROKEN
Loading