Skip to content

Commit a0b7a76

Browse files
authored
Condition/RecursiveLock: add ability to handle threads (#30061)
This extends Condition to assert that it may only be used in the single-threaded case (co-operatively scheduled), and then adds a thread-safe version of the same: `Threads.Condition`. Additionally, it also upgrades ReentrantLock, etc. to be thread-safe.
1 parent 046755c commit a0b7a76

23 files changed

+523
-465
lines changed

NEWS.md

+6
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,12 @@ New language features
77
* The `extrema` function now accepts a function argument in the same manner as `minimum` and
88
`maximum` ([#30323]).
99

10+
Multi-threading changes
11+
-----------------------
12+
13+
* The `Condition` type now has a thread-safe replacement, accessed as `Threads.Condition`.
14+
With that addition, task scheduling primitives such as `ReentrantLock` are now thread-safe ([#30061]).
15+
1016
Language changes
1117
----------------
1218

base/event.jl

+99-18
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,75 @@
11
# This file is a part of Julia. License is MIT: https://julialang.org/license
22

3+
## thread/task locking abstraction
4+
5+
"""
6+
AbstractLock
7+
8+
Abstract supertype describing types that
9+
implement the synchronization primitives:
10+
[`lock`](@ref), [`trylock`](@ref), [`unlock`](@ref), and [`islocked`](@ref).
11+
"""
12+
abstract type AbstractLock end
13+
function lock end
14+
function unlock end
15+
function trylock end
16+
function islocked end
17+
unlockall(l::AbstractLock) = unlock(l) # internal function for implementing `wait`
18+
relockall(l::AbstractLock, token::Nothing) = lock(l) # internal function for implementing `wait`
19+
assert_havelock(l::AbstractLock) = assert_havelock(l, Threads.threadid())
20+
assert_havelock(l::AbstractLock, tid::Integer) =
21+
(islocked(l) && tid == Threads.threadid()) ? nothing : error("concurrency violation detected")
22+
assert_havelock(l::AbstractLock, tid::Task) =
23+
(islocked(l) && tid === current_task()) ? nothing : error("concurrency violation detected")
24+
assert_havelock(l::AbstractLock, tid::Nothing) = error("concurrency violation detected")
25+
26+
"""
27+
AlwaysLockedST
28+
29+
This struct does not implement a real lock, but instead
30+
pretends to be always locked on the original thread it was allocated on,
31+
and simply ignores all other interactions.
32+
It also does not synchronize tasks; for that use a real lock such as [`RecursiveLock`](@ref).
33+
This can be used in the place of a real lock to, instead, simply and cheaply assert
34+
that the operation is only occurring on a single cooperatively-scheduled thread.
35+
It is thus functionally equivalent to allocating a real, recursive, task-unaware lock
36+
immediately calling `lock` on it, and then never calling a matching `unlock`,
37+
except that calling `lock` from another thread will throw a concurrency violation exception.
38+
"""
39+
struct AlwaysLockedST <: AbstractLock
40+
ownertid::Int16
41+
AlwaysLockedST() = new(Threads.threadid())
42+
end
43+
assert_havelock(l::AlwaysLockedST) = assert_havelock(l, l.ownertid)
44+
lock(l::AlwaysLockedST) = assert_havelock(l)
45+
unlock(l::AlwaysLockedST) = assert_havelock(l)
46+
trylock(l::AlwaysLockedST) = l.ownertid == Threads.threadid()
47+
islocked(::AlwaysLockedST) = true
48+
49+
350
## condition variables
451

552
"""
6-
Condition()
53+
GenericCondition
754
8-
Create an edge-triggered event source that tasks can wait for. Tasks that call [`wait`](@ref) on a
9-
`Condition` are suspended and queued. Tasks are woken up when [`notify`](@ref) is later called on
10-
the `Condition`. Edge triggering means that only tasks waiting at the time [`notify`](@ref) is
11-
called can be woken up. For level-triggered notifications, you must keep extra state to keep
12-
track of whether a notification has happened. The [`Channel`](@ref) type does
13-
this, and so can be used for level-triggered events.
55+
Abstract implementation of a condition object
56+
for synchonizing tasks objects with a given lock.
1457
"""
15-
mutable struct Condition
58+
struct GenericCondition{L<:AbstractLock}
1659
waitq::Vector{Any}
60+
lock::L
1761

18-
Condition() = new([])
62+
GenericCondition{L}() where {L<:AbstractLock} = new{L}([], L())
63+
GenericCondition{L}(l::L) where {L<:AbstractLock} = new{L}([], l)
64+
GenericCondition(l::AbstractLock) = new{typeof(l)}([], l)
1965
end
2066

67+
assert_havelock(c::GenericCondition) = assert_havelock(c.lock)
68+
lock(c::GenericCondition) = lock(c.lock)
69+
unlock(c::GenericCondition) = unlock(c.lock)
70+
trylock(c::GenericCondition) = trylock(c.lock)
71+
islocked(c::GenericCondition) = islocked(c.lock)
72+
2173
"""
2274
wait([x])
2375
@@ -37,16 +89,19 @@ restarted by an explicit call to [`schedule`](@ref) or [`yieldto`](@ref).
3789
Often `wait` is called within a `while` loop to ensure a waited-for condition is met before
3890
proceeding.
3991
"""
40-
function wait(c::Condition)
92+
function wait(c::GenericCondition)
4193
ct = current_task()
42-
94+
assert_havelock(c)
4395
push!(c.waitq, ct)
96+
token = unlockall(c.lock)
4497

4598
try
4699
return wait()
47100
catch
48101
filter!(x->x!==ct, c.waitq)
49102
rethrow()
103+
finally
104+
relockall(c.lock, token)
50105
end
51106
end
52107

@@ -59,26 +114,52 @@ is raised as an exception in the woken tasks.
59114
60115
Return the count of tasks woken up. Return 0 if no tasks are waiting on `condition`.
61116
"""
62-
notify(c::Condition, @nospecialize(arg = nothing); all=true, error=false) = notify(c, arg, all, error)
63-
function notify(c::Condition, arg, all, error)
117+
notify(c::GenericCondition, @nospecialize(arg = nothing); all=true, error=false) = notify(c, arg, all, error)
118+
function notify(c::GenericCondition, @nospecialize(arg), all, error)
119+
assert_havelock(c)
64120
cnt = 0
65121
if all
66122
cnt = length(c.waitq)
67123
for t in c.waitq
68-
error ? schedule(t, arg, error=error) : schedule(t, arg)
124+
schedule(t, arg, error=error)
69125
end
70126
empty!(c.waitq)
71127
elseif !isempty(c.waitq)
72128
cnt = 1
73129
t = popfirst!(c.waitq)
74-
error ? schedule(t, arg, error=error) : schedule(t, arg)
130+
schedule(t, arg, error=error)
75131
end
76-
cnt
132+
return cnt
77133
end
78134

79-
notify_error(c::Condition, err) = notify(c, err, true, true)
135+
notify_error(c::GenericCondition, err) = notify(c, err, true, true)
136+
137+
n_waiters(c::GenericCondition) = length(c.waitq)
138+
139+
"""
140+
isempty(condition)
141+
142+
Return `true` if no tasks are waiting on the condition, `false` otherwise.
143+
"""
144+
isempty(c::GenericCondition) = isempty(c.waitq)
145+
146+
147+
# default (Julia v1.0) is currently single-threaded
148+
# (although it uses MT-safe versions, when possible)
149+
"""
150+
Condition()
151+
152+
Create an edge-triggered event source that tasks can wait for. Tasks that call [`wait`](@ref) on a
153+
`Condition` are suspended and queued. Tasks are woken up when [`notify`](@ref) is later called on
154+
the `Condition`. Edge triggering means that only tasks waiting at the time [`notify`](@ref) is
155+
called can be woken up. For level-triggered notifications, you must keep extra state to keep
156+
track of whether a notification has happened. The [`Channel`](@ref) and [`Event`](@ref) types do
157+
this, and can be used for level-triggered events.
158+
159+
This object is NOT thread-safe. See [`Threads.Condition`](@ref) for a thread-safe version.
160+
"""
161+
const Condition = GenericCondition{AlwaysLockedST}
80162

81-
n_waiters(c::Condition) = length(c.waitq)
82163

83164
## scheduler and work queue
84165

base/exports.jl

+1
Original file line numberDiff line numberDiff line change
@@ -641,6 +641,7 @@ export
641641

642642
# tasks and conditions
643643
Condition,
644+
Event,
644645
current_task,
645646
islocked,
646647
istaskdone,

0 commit comments

Comments
 (0)