Skip to content

Commit d03acd7

Browse files
YvesDup1st1asvetlov
authored
bpo-43352: Add a Barrier object in asyncio lib (GH-24903)
Co-authored-by: Yury Selivanov <[email protected]> Co-authored-by: Andrew Svetlov <[email protected]>
1 parent 20e6e56 commit d03acd7

File tree

6 files changed

+856
-5
lines changed

6 files changed

+856
-5
lines changed

Doc/library/asyncio-api-index.rst

+8
Original file line numberDiff line numberDiff line change
@@ -186,11 +186,16 @@ Threading-like synchronization primitives that can be used in Tasks.
186186
* - :class:`BoundedSemaphore`
187187
- A bounded semaphore.
188188

189+
* - :class:`Barrier`
190+
- A barrier object.
191+
189192

190193
.. rubric:: Examples
191194

192195
* :ref:`Using asyncio.Event <asyncio_example_sync_event>`.
193196

197+
* :ref:`Using asyncio.Barrier <asyncio_example_barrier>`.
198+
194199
* See also the documentation of asyncio
195200
:ref:`synchronization primitives <asyncio-sync>`.
196201

@@ -206,6 +211,9 @@ Exceptions
206211
* - :exc:`asyncio.CancelledError`
207212
- Raised when a Task is cancelled. See also :meth:`Task.cancel`.
208213

214+
* - :exc:`asyncio.BrokenBarrierError`
215+
- Raised when a Barrier is broken. See also :meth:`Barrier.wait`.
216+
209217

210218
.. rubric:: Examples
211219

Doc/library/asyncio-sync.rst

+110
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ asyncio has the following basic synchronization primitives:
2828
* :class:`Condition`
2929
* :class:`Semaphore`
3030
* :class:`BoundedSemaphore`
31+
* :class:`Barrier`
3132

3233

3334
---------
@@ -340,6 +341,115 @@ BoundedSemaphore
340341
.. versionchanged:: 3.10
341342
Removed the *loop* parameter.
342343

344+
345+
Barrier
346+
=======
347+
348+
.. class:: Barrier(parties, action=None)
349+
350+
A barrier object. Not thread-safe.
351+
352+
A barrier is a simple synchronization primitive that allows to block until
353+
*parties* number of tasks are waiting on it.
354+
Tasks can wait on the :meth:`~Barrier.wait` method and would be blocked until
355+
the specified number of tasks end up waiting on :meth:`~Barrier.wait`.
356+
At that point all of the waiting tasks would unblock simultaneously.
357+
358+
:keyword:`async with` can be used as an alternative to awaiting on
359+
:meth:`~Barrier.wait`.
360+
361+
The barrier can be reused any number of times.
362+
363+
.. _asyncio_example_barrier:
364+
365+
Example::
366+
367+
async def example_barrier():
368+
# barrier with 3 parties
369+
b = asyncio.Barrier(3)
370+
371+
# create 2 new waiting tasks
372+
asyncio.create_task(b.wait())
373+
asyncio.create_task(b.wait())
374+
375+
await asyncio.sleep(0)
376+
print(b)
377+
378+
# The third .wait() call passes the barrier
379+
await b.wait()
380+
print(b)
381+
print("barrier passed")
382+
383+
await asyncio.sleep(0)
384+
print(b)
385+
386+
asyncio.run(example_barrier())
387+
388+
Result of this example is::
389+
390+
<asyncio.locks.Barrier object at 0x... [filling, waiters:2/3]>
391+
<asyncio.locks.Barrier object at 0x... [draining, waiters:0/3]>
392+
barrier passed
393+
<asyncio.locks.Barrier object at 0x... [filling, waiters:0/3]>
394+
395+
.. versionadded:: 3.11
396+
397+
.. coroutinemethod:: wait()
398+
399+
Pass the barrier. When all the tasks party to the barrier have called
400+
this function, they are all unblocked simultaneously.
401+
402+
When a waiting or blocked task in the barrier is cancelled,
403+
this task exits the barrier which stays in the same state.
404+
If the state of the barrier is "filling", the number of waiting task
405+
decreases by 1.
406+
407+
The return value is an integer in the range of 0 to ``parties-1``, different
408+
for each task. This can be used to select a task to do some special
409+
housekeeping, e.g.::
410+
411+
...
412+
async with barrier as position:
413+
if position == 0:
414+
# Only one task print this
415+
print('End of *draining phasis*')
416+
417+
This method may raise a :class:`BrokenBarrierError` exception if the
418+
barrier is broken or reset while a task is waiting.
419+
It could raise a :exc:`CancelledError` if a task is cancelled.
420+
421+
.. coroutinemethod:: reset()
422+
423+
Return the barrier to the default, empty state. Any tasks waiting on it
424+
will receive the :class:`BrokenBarrierError` exception.
425+
426+
If a barrier is broken it may be better to just leave it and create a new one.
427+
428+
.. coroutinemethod:: abort()
429+
430+
Put the barrier into a broken state. This causes any active or future
431+
calls to :meth:`wait` to fail with the :class:`BrokenBarrierError`.
432+
Use this for example if one of the taks needs to abort, to avoid infinite
433+
waiting tasks.
434+
435+
.. attribute:: parties
436+
437+
The number of tasks required to pass the barrier.
438+
439+
.. attribute:: n_waiting
440+
441+
The number of tasks currently waiting in the barrier while filling.
442+
443+
.. attribute:: broken
444+
445+
A boolean that is ``True`` if the barrier is in the broken state.
446+
447+
448+
.. exception:: BrokenBarrierError
449+
450+
This exception, a subclass of :exc:`RuntimeError`, is raised when the
451+
:class:`Barrier` object is reset or broken.
452+
343453
---------
344454

345455

Lib/asyncio/exceptions.py

+6-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
"""asyncio exceptions."""
22

33

4-
__all__ = ('CancelledError', 'InvalidStateError', 'TimeoutError',
4+
__all__ = ('BrokenBarrierError',
5+
'CancelledError', 'InvalidStateError', 'TimeoutError',
56
'IncompleteReadError', 'LimitOverrunError',
67
'SendfileNotAvailableError')
78

@@ -55,3 +56,7 @@ def __init__(self, message, consumed):
5556

5657
def __reduce__(self):
5758
return type(self), (self.args[0], self.consumed)
59+
60+
61+
class BrokenBarrierError(RuntimeError):
62+
"""Barrier is broken by barrier.abort() call."""

Lib/asyncio/locks.py

+155-2
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
"""Synchronization primitives."""
22

3-
__all__ = ('Lock', 'Event', 'Condition', 'Semaphore', 'BoundedSemaphore')
3+
__all__ = ('Lock', 'Event', 'Condition', 'Semaphore',
4+
'BoundedSemaphore', 'Barrier')
45

56
import collections
7+
import enum
68

79
from . import exceptions
810
from . import mixins
911
from . import tasks
1012

11-
1213
class _ContextManagerMixin:
1314
async def __aenter__(self):
1415
await self.acquire()
@@ -416,3 +417,155 @@ def release(self):
416417
if self._value >= self._bound_value:
417418
raise ValueError('BoundedSemaphore released too many times')
418419
super().release()
420+
421+
422+
423+
class _BarrierState(enum.Enum):
424+
FILLING = 'filling'
425+
DRAINING = 'draining'
426+
RESETTING = 'resetting'
427+
BROKEN = 'broken'
428+
429+
430+
class Barrier(mixins._LoopBoundMixin):
431+
"""Asyncio equivalent to threading.Barrier
432+
433+
Implements a Barrier primitive.
434+
Useful for synchronizing a fixed number of tasks at known synchronization
435+
points. Tasks block on 'wait()' and are simultaneously awoken once they
436+
have all made their call.
437+
"""
438+
439+
def __init__(self, parties):
440+
"""Create a barrier, initialised to 'parties' tasks."""
441+
if parties < 1:
442+
raise ValueError('parties must be > 0')
443+
444+
self._cond = Condition() # notify all tasks when state changes
445+
446+
self._parties = parties
447+
self._state = _BarrierState.FILLING
448+
self._count = 0 # count tasks in Barrier
449+
450+
def __repr__(self):
451+
res = super().__repr__()
452+
extra = f'{self._state.value}'
453+
if not self.broken:
454+
extra += f', waiters:{self.n_waiting}/{self.parties}'
455+
return f'<{res[1:-1]} [{extra}]>'
456+
457+
async def __aenter__(self):
458+
# wait for the barrier reaches the parties number
459+
# when start draining release and return index of waited task
460+
return await self.wait()
461+
462+
async def __aexit__(self, *args):
463+
pass
464+
465+
async def wait(self):
466+
"""Wait for the barrier.
467+
468+
When the specified number of tasks have started waiting, they are all
469+
simultaneously awoken.
470+
Returns an unique and individual index number from 0 to 'parties-1'.
471+
"""
472+
async with self._cond:
473+
await self._block() # Block while the barrier drains or resets.
474+
try:
475+
index = self._count
476+
self._count += 1
477+
if index + 1 == self._parties:
478+
# We release the barrier
479+
await self._release()
480+
else:
481+
await self._wait()
482+
return index
483+
finally:
484+
self._count -= 1
485+
# Wake up any tasks waiting for barrier to drain.
486+
self._exit()
487+
488+
async def _block(self):
489+
# Block until the barrier is ready for us,
490+
# or raise an exception if it is broken.
491+
#
492+
# It is draining or resetting, wait until done
493+
# unless a CancelledError occurs
494+
await self._cond.wait_for(
495+
lambda: self._state not in (
496+
_BarrierState.DRAINING, _BarrierState.RESETTING
497+
)
498+
)
499+
500+
# see if the barrier is in a broken state
501+
if self._state is _BarrierState.BROKEN:
502+
raise exceptions.BrokenBarrierError("Barrier aborted")
503+
504+
async def _release(self):
505+
# Release the tasks waiting in the barrier.
506+
507+
# Enter draining state.
508+
# Next waiting tasks will be blocked until the end of draining.
509+
self._state = _BarrierState.DRAINING
510+
self._cond.notify_all()
511+
512+
async def _wait(self):
513+
# Wait in the barrier until we are released. Raise an exception
514+
# if the barrier is reset or broken.
515+
516+
# wait for end of filling
517+
# unless a CancelledError occurs
518+
await self._cond.wait_for(lambda: self._state is not _BarrierState.FILLING)
519+
520+
if self._state in (_BarrierState.BROKEN, _BarrierState.RESETTING):
521+
raise exceptions.BrokenBarrierError("Abort or reset of barrier")
522+
523+
def _exit(self):
524+
# If we are the last tasks to exit the barrier, signal any tasks
525+
# waiting for the barrier to drain.
526+
if self._count == 0:
527+
if self._state in (_BarrierState.RESETTING, _BarrierState.DRAINING):
528+
self._state = _BarrierState.FILLING
529+
self._cond.notify_all()
530+
531+
async def reset(self):
532+
"""Reset the barrier to the initial state.
533+
534+
Any tasks currently waiting will get the BrokenBarrier exception
535+
raised.
536+
"""
537+
async with self._cond:
538+
if self._count > 0:
539+
if self._state is not _BarrierState.RESETTING:
540+
#reset the barrier, waking up tasks
541+
self._state = _BarrierState.RESETTING
542+
else:
543+
self._state = _BarrierState.FILLING
544+
self._cond.notify_all()
545+
546+
async def abort(self):
547+
"""Place the barrier into a 'broken' state.
548+
549+
Useful in case of error. Any currently waiting tasks and tasks
550+
attempting to 'wait()' will have BrokenBarrierError raised.
551+
"""
552+
async with self._cond:
553+
self._state = _BarrierState.BROKEN
554+
self._cond.notify_all()
555+
556+
@property
557+
def parties(self):
558+
"""Return the number of tasks required to trip the barrier."""
559+
return self._parties
560+
561+
@property
562+
def n_waiting(self):
563+
"""Return the number of tasks currently waiting at the barrier."""
564+
if self._state is _BarrierState.FILLING:
565+
return self._count
566+
return 0
567+
568+
@property
569+
def broken(self):
570+
"""Return True if the barrier is in a broken state."""
571+
return self._state is _BarrierState.BROKEN

0 commit comments

Comments
 (0)