Skip to content

Commit 05fecb9

Browse files
committed
std: allow after-main use of synchronization primitives
By creating an unnamed thread handle when the actual one has already been destroyed, synchronization primitives using thread parking can be used even outside the Rust runtime. This also fixes an inefficiency in the queue-based `RwLock`: if `thread::current` was not initialized yet, it will create a new handle on every parking attempt without initializing `thread::current`. The private `current_or_unnamed` function introduced here fixes this.
1 parent c1beb25 commit 05fecb9

File tree

10 files changed

+59
-26
lines changed

10 files changed

+59
-26
lines changed

std/src/lib.rs

-3
Original file line numberDiff line numberDiff line change
@@ -174,9 +174,6 @@
174174
//!
175175
//! - after-main use of thread-locals, which also affects additional features:
176176
//! - [`thread::current()`]
177-
//! - [`thread::scope()`]
178-
//! - [`sync::mpmc`]
179-
//! - [`sync::mpsc`]
180177
//! - before-main stdio file descriptors are not guaranteed to be open on unix platforms
181178
//!
182179
//!

std/src/sync/mpmc/array.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,8 @@ impl<T> Channel<T> {
346346
}
347347

348348
// Block the current thread.
349-
let sel = cx.wait_until(deadline);
349+
// SAFETY: the context belongs to the current thread.
350+
let sel = unsafe { cx.wait_until(deadline) };
350351

351352
match sel {
352353
Selected::Waiting => unreachable!(),
@@ -397,7 +398,8 @@ impl<T> Channel<T> {
397398
}
398399

399400
// Block the current thread.
400-
let sel = cx.wait_until(deadline);
401+
// SAFETY: the context belongs to the current thread.
402+
let sel = unsafe { cx.wait_until(deadline) };
401403

402404
match sel {
403405
Selected::Waiting => unreachable!(),

std/src/sync/mpmc/context.rs

+9-4
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ impl Context {
6969
inner: Arc::new(Inner {
7070
select: AtomicUsize::new(Selected::Waiting.into()),
7171
packet: AtomicPtr::new(ptr::null_mut()),
72-
thread: thread::current(),
72+
thread: thread::current_or_unnamed(),
7373
thread_id: current_thread_id(),
7474
}),
7575
}
@@ -112,8 +112,11 @@ impl Context {
112112
/// Waits until an operation is selected and returns it.
113113
///
114114
/// If the deadline is reached, `Selected::Aborted` will be selected.
115+
///
116+
/// # Safety
117+
/// This may only be called from the thread this `Context` belongs to.
115118
#[inline]
116-
pub fn wait_until(&self, deadline: Option<Instant>) -> Selected {
119+
pub unsafe fn wait_until(&self, deadline: Option<Instant>) -> Selected {
117120
loop {
118121
// Check whether an operation has been selected.
119122
let sel = Selected::from(self.inner.select.load(Ordering::Acquire));
@@ -126,7 +129,8 @@ impl Context {
126129
let now = Instant::now();
127130

128131
if now < end {
129-
thread::park_timeout(end - now);
132+
// SAFETY: guaranteed by caller.
133+
unsafe { self.inner.thread.park_timeout(end - now) };
130134
} else {
131135
// The deadline has been reached. Try aborting select.
132136
return match self.try_select(Selected::Aborted) {
@@ -135,7 +139,8 @@ impl Context {
135139
};
136140
}
137141
} else {
138-
thread::park();
142+
// SAFETY: guaranteed by caller.
143+
unsafe { self.inner.thread.park() };
139144
}
140145
}
141146
}

std/src/sync/mpmc/list.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -444,7 +444,8 @@ impl<T> Channel<T> {
444444
}
445445

446446
// Block the current thread.
447-
let sel = cx.wait_until(deadline);
447+
// SAFETY: the context belongs to the current thread.
448+
let sel = unsafe { cx.wait_until(deadline) };
448449

449450
match sel {
450451
Selected::Waiting => unreachable!(),

std/src/sync/mpmc/zero.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,8 @@ impl<T> Channel<T> {
190190
drop(inner);
191191

192192
// Block the current thread.
193-
let sel = cx.wait_until(deadline);
193+
// SAFETY: the context belongs to the current thread.
194+
let sel = unsafe { cx.wait_until(deadline) };
194195

195196
match sel {
196197
Selected::Waiting => unreachable!(),
@@ -257,7 +258,8 @@ impl<T> Channel<T> {
257258
drop(inner);
258259

259260
// Block the current thread.
260-
let sel = cx.wait_until(deadline);
261+
// SAFETY: the context belongs to the current thread.
262+
let sel = unsafe { cx.wait_until(deadline) };
261263

262264
match sel {
263265
Selected::Waiting => unreachable!(),

std/src/sys/sync/once/queue.rs

+5-4
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ const QUEUE_MASK: usize = !STATE_MASK;
9393
// use interior mutability.
9494
#[repr(align(4))] // Ensure the two lower bits are free to use as state bits.
9595
struct Waiter {
96-
thread: Cell<Option<Thread>>,
96+
thread: Thread,
9797
signaled: AtomicBool,
9898
next: Cell<*const Waiter>,
9999
}
@@ -238,7 +238,7 @@ fn wait(
238238
return_on_poisoned: bool,
239239
) -> StateAndQueue {
240240
let node = &Waiter {
241-
thread: Cell::new(Some(thread::current())),
241+
thread: thread::current_or_unnamed(),
242242
signaled: AtomicBool::new(false),
243243
next: Cell::new(ptr::null()),
244244
};
@@ -277,7 +277,8 @@ fn wait(
277277
// can park ourselves, the result could be this thread never gets
278278
// unparked. Luckily `park` comes with the guarantee that if it got
279279
// an `unpark` just before on an unparked thread it does not park.
280-
thread::park();
280+
// SAFETY: we retrieved this handle on the current thread above.
281+
unsafe { node.thread.park() }
281282
}
282283

283284
return state_and_queue.load(Acquire);
@@ -309,7 +310,7 @@ impl Drop for WaiterQueue<'_> {
309310
let mut queue = to_queue(current);
310311
while !queue.is_null() {
311312
let next = (*queue).next.get();
312-
let thread = (*queue).thread.take().unwrap();
313+
let thread = (*queue).thread.clone();
313314
(*queue).signaled.store(true, Release);
314315
thread.unpark();
315316
queue = next;

std/src/sys/sync/rwlock/queue.rs

+2-4
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ use crate::mem;
118118
use crate::ptr::{self, NonNull, null_mut, without_provenance_mut};
119119
use crate::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
120120
use crate::sync::atomic::{AtomicBool, AtomicPtr};
121-
use crate::thread::{self, Thread, ThreadId};
121+
use crate::thread::{self, Thread};
122122

123123
/// The atomic lock state.
124124
type AtomicState = AtomicPtr<()>;
@@ -217,9 +217,7 @@ impl Node {
217217
/// Prepare this node for waiting.
218218
fn prepare(&mut self) {
219219
// Fall back to creating an unnamed `Thread` handle to allow locking in TLS destructors.
220-
self.thread.get_or_init(|| {
221-
thread::try_current().unwrap_or_else(|| Thread::new_unnamed(ThreadId::new()))
222-
});
220+
self.thread.get_or_init(thread::current_or_unnamed);
223221
self.completed = AtomicBool::new(false);
224222
}
225223

std/src/thread/current.rs

+17
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,23 @@ pub(crate) fn try_current() -> Option<Thread> {
165165
}
166166
}
167167

168+
/// Gets a handle to the thread that invokes it. If the handle stored in thread-
169+
/// local storage was already destroyed, this creates a new unnamed temporary
170+
/// handle to allow thread parking in nearly all situations.
171+
pub(crate) fn current_or_unnamed() -> Thread {
172+
let current = CURRENT.get();
173+
if current > DESTROYED {
174+
unsafe {
175+
let current = ManuallyDrop::new(Thread::from_raw(current));
176+
(*current).clone()
177+
}
178+
} else if current == DESTROYED {
179+
Thread::new_unnamed(id::get_or_init())
180+
} else {
181+
init_current(current)
182+
}
183+
}
184+
168185
/// Gets a handle to the thread that invokes it.
169186
///
170187
/// # Examples

std/src/thread/mod.rs

+12-3
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ mod current;
186186

187187
#[stable(feature = "rust1", since = "1.0.0")]
188188
pub use current::current;
189-
pub(crate) use current::{current_id, drop_current, set_current, try_current};
189+
pub(crate) use current::{current_id, current_or_unnamed, drop_current, set_current, try_current};
190190

191191
////////////////////////////////////////////////////////////////////////////////
192192
// Thread-local storage
@@ -1126,9 +1126,9 @@ pub fn park_timeout_ms(ms: u32) {
11261126
#[stable(feature = "park_timeout", since = "1.4.0")]
11271127
pub fn park_timeout(dur: Duration) {
11281128
let guard = PanicGuard;
1129-
// SAFETY: park_timeout is called on the parker owned by this thread.
1129+
// SAFETY: park_timeout is called on a handle owned by this thread.
11301130
unsafe {
1131-
current().0.parker().park_timeout(dur);
1131+
current().park_timeout(dur);
11321132
}
11331133
// No panic occurred, do not abort.
11341134
forget(guard);
@@ -1426,6 +1426,15 @@ impl Thread {
14261426
unsafe { self.0.parker().park() }
14271427
}
14281428

1429+
/// Like the public [`park_timeout`], but callable on any handle. This is
1430+
/// used to allow parking in TLS destructors.
1431+
///
1432+
/// # Safety
1433+
/// May only be called from the thread to which this handle belongs.
1434+
pub(crate) unsafe fn park_timeout(&self, dur: Duration) {
1435+
unsafe { self.0.parker().park_timeout(dur) }
1436+
}
1437+
14291438
/// Atomically makes the handle's token available if it is not already.
14301439
///
14311440
/// Every thread is equipped with some basic low-level blocking support, via

std/src/thread/scoped.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use super::{Builder, JoinInner, Result, Thread, current, park};
1+
use super::{Builder, JoinInner, Result, Thread, current_or_unnamed};
22
use crate::marker::PhantomData;
33
use crate::panic::{AssertUnwindSafe, catch_unwind, resume_unwind};
44
use crate::sync::Arc;
@@ -140,7 +140,7 @@ where
140140
let scope = Scope {
141141
data: Arc::new(ScopeData {
142142
num_running_threads: AtomicUsize::new(0),
143-
main_thread: current(),
143+
main_thread: current_or_unnamed(),
144144
a_thread_panicked: AtomicBool::new(false),
145145
}),
146146
env: PhantomData,
@@ -152,7 +152,8 @@ where
152152

153153
// Wait until all the threads are finished.
154154
while scope.data.num_running_threads.load(Ordering::Acquire) != 0 {
155-
park();
155+
// SAFETY: this is the main thread, the handle belongs to us.
156+
unsafe { scope.data.main_thread.park() };
156157
}
157158

158159
// Throw any panic from `f`, or the return value of `f` if no thread panicked.

0 commit comments

Comments
 (0)