Skip to content

Commit 6fd82f1

Browse files
committed
std: implement the once_wait feature
1 parent ddff2b6 commit 6fd82f1

File tree

5 files changed

+247
-94
lines changed

5 files changed

+247
-94
lines changed

std/src/sync/once.rs

+41
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,47 @@ impl Once {
264264
self.inner.is_completed()
265265
}
266266

267+
/// Blocks the current thread until initialization has completed.
268+
///
269+
/// # Example
270+
///
271+
/// ```rust
272+
/// #![feature(once_wait)]
273+
///
274+
/// use std::sync::Once;
275+
/// use std::thread;
276+
///
277+
/// static READY: Once = Once::new();
278+
///
279+
/// let thread = thread::spawn(|| {
280+
/// READY.wait();
281+
/// println!("everything is ready");
282+
/// });
283+
///
284+
/// READY.call_once(|| println!("performing setup"));
285+
/// ```
286+
///
287+
/// # Panics
288+
///
289+
/// If this [`Once`] has been poisoned because an initialization closure has
290+
/// panicked, this method will also panic. Use [`wait_force`](Self::wait_force)
291+
/// if this behaviour is not desired.
292+
#[unstable(feature = "once_wait", issue = "127527")]
293+
pub fn wait(&self) {
294+
if !self.inner.is_completed() {
295+
self.inner.wait(false);
296+
}
297+
}
298+
299+
/// Blocks the current thread until initialization has completed, ignoring
300+
/// poisoning.
301+
#[unstable(feature = "once_wait", issue = "127527")]
302+
pub fn wait_force(&self) {
303+
if !self.inner.is_completed() {
304+
self.inner.wait(true);
305+
}
306+
}
307+
267308
/// Returns the current state of the `Once` instance.
268309
///
269310
/// Since this takes a mutable reference, no initialization can currently

std/src/sync/once_lock.rs

+28
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,34 @@ impl<T> OnceLock<T> {
167167
}
168168
}
169169

170+
/// Blocks the current thread until the cell is initialized.
171+
///
172+
/// # Example
173+
///
174+
/// Waiting for a computation on another thread to finish:
175+
/// ```rust
176+
/// #![feature(once_wait)]
177+
///
178+
/// use std::thread;
179+
/// use std::sync::OnceLock;
180+
///
181+
/// let value = OnceLock::new();
182+
///
183+
/// thread::scope(|s| {
184+
/// s.spawn(|| value.set(1 + 1));
185+
///
186+
/// let result = value.wait();
187+
/// assert_eq!(result, &2);
188+
/// })
189+
/// ```
190+
#[inline]
191+
#[unstable(feature = "once_wait", issue = "127527")]
192+
pub fn wait(&self) -> &T {
193+
self.once.wait_force();
194+
195+
unsafe { self.get_unchecked() }
196+
}
197+
170198
/// Sets the contents of this cell to `value`.
171199
///
172200
/// May block if another thread is currently attempting to initialize the cell. The cell is

std/src/sys/sync/once/futex.rs

+88-36
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use crate::sync::once::ExclusiveState;
66
use crate::sys::futex::{futex_wait, futex_wake_all};
77

88
// On some platforms, the OS is very nice and handles the waiter queue for us.
9-
// This means we only need one atomic value with 5 states:
9+
// This means we only need one atomic value with 4 states:
1010

1111
/// No initialization has run yet, and no thread is currently using the Once.
1212
const INCOMPLETE: u32 = 0;
@@ -17,16 +17,20 @@ const POISONED: u32 = 1;
1717
/// Some thread is currently attempting to run initialization. It may succeed,
1818
/// so all future threads need to wait for it to finish.
1919
const RUNNING: u32 = 2;
20-
/// Some thread is currently attempting to run initialization and there are threads
21-
/// waiting for it to finish.
22-
const QUEUED: u32 = 3;
2320
/// Initialization has completed and all future calls should finish immediately.
24-
const COMPLETE: u32 = 4;
21+
const COMPLETE: u32 = 3;
2522

26-
// Threads wait by setting the state to QUEUED and calling `futex_wait` on the state
23+
// An additional bit indicates whether there are waiting threads:
24+
25+
/// May only be set if the state is not COMPLETE.
26+
const QUEUED: u32 = 4;
27+
28+
// Threads wait by setting the QUEUED bit and calling `futex_wait` on the state
2729
// variable. When the running thread finishes, it will wake all waiting threads using
2830
// `futex_wake_all`.
2931

32+
const STATE_MASK: u32 = 0b11;
33+
3034
pub struct OnceState {
3135
poisoned: bool,
3236
set_state_to: Cell<u32>,
@@ -45,7 +49,7 @@ impl OnceState {
4549
}
4650

4751
struct CompletionGuard<'a> {
48-
state: &'a AtomicU32,
52+
state_and_queued: &'a AtomicU32,
4953
set_state_on_drop_to: u32,
5054
}
5155

@@ -54,64 +58,106 @@ impl<'a> Drop for CompletionGuard<'a> {
5458
// Use release ordering to propagate changes to all threads checking
5559
// up on the Once. `futex_wake_all` does its own synchronization, hence
5660
// we do not need `AcqRel`.
57-
if self.state.swap(self.set_state_on_drop_to, Release) == QUEUED {
58-
futex_wake_all(self.state);
61+
if self.state_and_queued.swap(self.set_state_on_drop_to, Release) & QUEUED != 0 {
62+
futex_wake_all(self.state_and_queued);
5963
}
6064
}
6165
}
6266

6367
pub struct Once {
64-
state: AtomicU32,
68+
state_and_queued: AtomicU32,
6569
}
6670

6771
impl Once {
6872
#[inline]
6973
pub const fn new() -> Once {
70-
Once { state: AtomicU32::new(INCOMPLETE) }
74+
Once { state_and_queued: AtomicU32::new(INCOMPLETE) }
7175
}
7276

7377
#[inline]
7478
pub fn is_completed(&self) -> bool {
7579
// Use acquire ordering to make all initialization changes visible to the
7680
// current thread.
77-
self.state.load(Acquire) == COMPLETE
81+
self.state_and_queued.load(Acquire) == COMPLETE
7882
}
7983

8084
#[inline]
8185
pub(crate) fn state(&mut self) -> ExclusiveState {
82-
match *self.state.get_mut() {
86+
match *self.state_and_queued.get_mut() {
8387
INCOMPLETE => ExclusiveState::Incomplete,
8488
POISONED => ExclusiveState::Poisoned,
8589
COMPLETE => ExclusiveState::Complete,
8690
_ => unreachable!("invalid Once state"),
8791
}
8892
}
8993

90-
// This uses FnMut to match the API of the generic implementation. As this
91-
// implementation is quite light-weight, it is generic over the closure and
92-
// so avoids the cost of dynamic dispatch.
9394
#[cold]
9495
#[track_caller]
95-
pub fn call(&self, ignore_poisoning: bool, f: &mut impl FnMut(&public::OnceState)) {
96-
let mut state = self.state.load(Acquire);
96+
pub fn wait(&self, ignore_poisoning: bool) {
97+
let mut state_and_queued = self.state_and_queued.load(Acquire);
9798
loop {
99+
let state = state_and_queued & STATE_MASK;
100+
let queued = state_and_queued & QUEUED != 0;
98101
match state {
102+
COMPLETE => return,
103+
POISONED if !ignore_poisoning => {
104+
// Panic to propagate the poison.
105+
panic!("Once instance has previously been poisoned");
106+
}
107+
_ => {
108+
// Set the QUEUED bit if it has not already been set.
109+
if !queued {
110+
state_and_queued += QUEUED;
111+
if let Err(new) = self.state_and_queued.compare_exchange_weak(
112+
state,
113+
state_and_queued,
114+
Relaxed,
115+
Acquire,
116+
) {
117+
state_and_queued = new;
118+
continue;
119+
}
120+
}
121+
122+
futex_wait(&self.state_and_queued, state_and_queued, None);
123+
state_and_queued = self.state_and_queued.load(Acquire);
124+
}
125+
}
126+
}
127+
}
128+
129+
#[cold]
130+
#[track_caller]
131+
pub fn call(&self, ignore_poisoning: bool, f: &mut dyn FnMut(&public::OnceState)) {
132+
let mut state_and_queued = self.state_and_queued.load(Acquire);
133+
loop {
134+
let state = state_and_queued & STATE_MASK;
135+
let queued = state_and_queued & QUEUED != 0;
136+
match state {
137+
COMPLETE => return,
99138
POISONED if !ignore_poisoning => {
100139
// Panic to propagate the poison.
101140
panic!("Once instance has previously been poisoned");
102141
}
103142
INCOMPLETE | POISONED => {
104143
// Try to register the current thread as the one running.
105-
if let Err(new) =
106-
self.state.compare_exchange_weak(state, RUNNING, Acquire, Acquire)
107-
{
108-
state = new;
144+
let next = RUNNING + if queued { QUEUED } else { 0 };
145+
if let Err(new) = self.state_and_queued.compare_exchange_weak(
146+
state_and_queued,
147+
next,
148+
Acquire,
149+
Acquire,
150+
) {
151+
state_and_queued = new;
109152
continue;
110153
}
154+
111155
// `waiter_queue` will manage other waiting threads, and
112156
// wake them up on drop.
113-
let mut waiter_queue =
114-
CompletionGuard { state: &self.state, set_state_on_drop_to: POISONED };
157+
let mut waiter_queue = CompletionGuard {
158+
state_and_queued: &self.state_and_queued,
159+
set_state_on_drop_to: POISONED,
160+
};
115161
// Run the function, letting it know if we're poisoned or not.
116162
let f_state = public::OnceState {
117163
inner: OnceState {
@@ -123,21 +169,27 @@ impl Once {
123169
waiter_queue.set_state_on_drop_to = f_state.inner.set_state_to.get();
124170
return;
125171
}
126-
RUNNING | QUEUED => {
127-
// Set the state to QUEUED if it is not already.
128-
if state == RUNNING
129-
&& let Err(new) =
130-
self.state.compare_exchange_weak(RUNNING, QUEUED, Relaxed, Acquire)
131-
{
132-
state = new;
133-
continue;
172+
_ => {
173+
// All other values must be RUNNING.
174+
assert!(state == RUNNING);
175+
176+
// Set the QUEUED bit if it is not already set.
177+
if !queued {
178+
state_and_queued += QUEUED;
179+
if let Err(new) = self.state_and_queued.compare_exchange_weak(
180+
state,
181+
state_and_queued,
182+
Relaxed,
183+
Acquire,
184+
) {
185+
state_and_queued = new;
186+
continue;
187+
}
134188
}
135189

136-
futex_wait(&self.state, QUEUED, None);
137-
state = self.state.load(Acquire);
190+
futex_wait(&self.state_and_queued, state_and_queued, None);
191+
state_and_queued = self.state_and_queued.load(Acquire);
138192
}
139-
COMPLETE => return,
140-
_ => unreachable!("state is never set to invalid values"),
141193
}
142194
}
143195
}

std/src/sys/sync/once/no_threads.rs

+6
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,12 @@ impl Once {
5555
}
5656
}
5757

58+
#[cold]
59+
#[track_caller]
60+
pub fn wait(&self, _ignore_poisoning: bool) {
61+
panic!("not implementable on this target");
62+
}
63+
5864
#[cold]
5965
#[track_caller]
6066
pub fn call(&self, ignore_poisoning: bool, f: &mut impl FnMut(&public::OnceState)) {

0 commit comments

Comments
 (0)