Skip to content

Commit baaa3b6

Browse files
committed
Auto merge of #96393 - joboet:pthread_parker, r=thomcc
std: directly use pthread in UNIX parker implementation `Mutex` and `Condvar` are being replaced by more efficient implementations, which need thread parking themselves (see #93740). Therefore we should use the `pthread` synchronization primitives directly. Also, we can avoid allocating the mutex and condition variable because the `Parker` struct is being placed in an `Arc` anyways. This basically is just a copy of the current `Mutex` and `Condvar` code, which will however be removed (again, see #93740). An alternative implementation could be to use dedicated private `OsMutex` and `OsCondvar` types, but all the other platforms supported by std actually have their own thread parking primitives. I used `Pin` to guarantee a stable address for the `Parker` struct, while the current implementation does not, rather using extra unsafe declaration. Since the thread struct is shared anyways, I assumed this would not add too much clutter while being clearer.
2 parents e85edd9 + 1285fb7 commit baaa3b6

File tree

9 files changed

+337
-31
lines changed

9 files changed

+337
-31
lines changed

library/std/src/sys/unix/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ pub mod stdio;
3939
pub mod thread;
4040
pub mod thread_local_dtor;
4141
pub mod thread_local_key;
42+
pub mod thread_parker;
4243
pub mod time;
4344

4445
#[cfg(target_os = "espidf")]
+265
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,265 @@
1+
//! Thread parking without `futex` using the `pthread` synchronization primitives.
2+
3+
#![cfg(not(any(
4+
target_os = "linux",
5+
target_os = "android",
6+
all(target_os = "emscripten", target_feature = "atomics")
7+
)))]
8+
9+
use crate::cell::UnsafeCell;
10+
use crate::marker::PhantomPinned;
11+
use crate::pin::Pin;
12+
use crate::ptr::addr_of_mut;
13+
use crate::sync::atomic::AtomicUsize;
14+
use crate::sync::atomic::Ordering::SeqCst;
15+
use crate::time::Duration;
16+
17+
const EMPTY: usize = 0;
18+
const PARKED: usize = 1;
19+
const NOTIFIED: usize = 2;
20+
21+
unsafe fn lock(lock: *mut libc::pthread_mutex_t) {
22+
let r = libc::pthread_mutex_lock(lock);
23+
debug_assert_eq!(r, 0);
24+
}
25+
26+
unsafe fn unlock(lock: *mut libc::pthread_mutex_t) {
27+
let r = libc::pthread_mutex_unlock(lock);
28+
debug_assert_eq!(r, 0);
29+
}
30+
31+
unsafe fn notify_one(cond: *mut libc::pthread_cond_t) {
32+
let r = libc::pthread_cond_signal(cond);
33+
debug_assert_eq!(r, 0);
34+
}
35+
36+
unsafe fn wait(cond: *mut libc::pthread_cond_t, lock: *mut libc::pthread_mutex_t) {
37+
let r = libc::pthread_cond_wait(cond, lock);
38+
debug_assert_eq!(r, 0);
39+
}
40+
41+
const TIMESPEC_MAX: libc::timespec =
42+
libc::timespec { tv_sec: <libc::time_t>::MAX, tv_nsec: 1_000_000_000 - 1 };
43+
44+
unsafe fn wait_timeout(
45+
cond: *mut libc::pthread_cond_t,
46+
lock: *mut libc::pthread_mutex_t,
47+
dur: Duration,
48+
) {
49+
// Use the system clock on systems that do not support pthread_condattr_setclock.
50+
// This unfortunately results in problems when the system time changes.
51+
#[cfg(any(target_os = "macos", target_os = "ios", target_os = "espidf"))]
52+
let (now, dur) = {
53+
use super::time::SystemTime;
54+
use crate::cmp::min;
55+
56+
// OSX implementation of `pthread_cond_timedwait` is buggy
57+
// with super long durations. When duration is greater than
58+
// 0x100_0000_0000_0000 seconds, `pthread_cond_timedwait`
59+
// in macOS Sierra return error 316.
60+
//
61+
// This program demonstrates the issue:
62+
// https://gist.github.com/stepancheg/198db4623a20aad2ad7cddb8fda4a63c
63+
//
64+
// To work around this issue, and possible bugs of other OSes, timeout
65+
// is clamped to 1000 years, which is allowable per the API of `park_timeout`
66+
// because of spurious wakeups.
67+
let dur = min(dur, Duration::from_secs(1000 * 365 * 86400));
68+
let now = SystemTime::now().t;
69+
(now, dur)
70+
};
71+
// Use the monotonic clock on other systems.
72+
#[cfg(not(any(target_os = "macos", target_os = "ios", target_os = "espidf")))]
73+
let (now, dur) = {
74+
use super::time::Timespec;
75+
76+
(Timespec::now(libc::CLOCK_MONOTONIC), dur)
77+
};
78+
79+
let timeout = now.checked_add_duration(&dur).map(|t| t.t).unwrap_or(TIMESPEC_MAX);
80+
let r = libc::pthread_cond_timedwait(cond, lock, &timeout);
81+
debug_assert!(r == libc::ETIMEDOUT || r == 0);
82+
}
83+
84+
pub struct Parker {
85+
state: AtomicUsize,
86+
lock: UnsafeCell<libc::pthread_mutex_t>,
87+
cvar: UnsafeCell<libc::pthread_cond_t>,
88+
// The `pthread` primitives require a stable address, so make this struct `!Unpin`.
89+
_pinned: PhantomPinned,
90+
}
91+
92+
impl Parker {
93+
/// Construct the UNIX parker in-place.
94+
///
95+
/// # Safety
96+
/// The constructed parker must never be moved.
97+
pub unsafe fn new(parker: *mut Parker) {
98+
// Use the default mutex implementation to allow for simpler initialization.
99+
// This could lead to undefined behaviour when deadlocking. This is avoided
100+
// by not deadlocking. Note in particular the unlocking operation before any
101+
// panic, as code after the panic could try to park again.
102+
addr_of_mut!((*parker).state).write(AtomicUsize::new(EMPTY));
103+
addr_of_mut!((*parker).lock).write(UnsafeCell::new(libc::PTHREAD_MUTEX_INITIALIZER));
104+
105+
cfg_if::cfg_if! {
106+
if #[cfg(any(
107+
target_os = "macos",
108+
target_os = "ios",
109+
target_os = "l4re",
110+
target_os = "android",
111+
target_os = "redox"
112+
))] {
113+
addr_of_mut!((*parker).cvar).write(UnsafeCell::new(libc::PTHREAD_COND_INITIALIZER));
114+
} else if #[cfg(target_os = "espidf")] {
115+
let r = libc::pthread_cond_init(addr_of_mut!((*parker).cvar).cast(), crate::ptr::null());
116+
assert_eq!(r, 0);
117+
} else {
118+
use crate::mem::MaybeUninit;
119+
let mut attr = MaybeUninit::<libc::pthread_condattr_t>::uninit();
120+
let r = libc::pthread_condattr_init(attr.as_mut_ptr());
121+
assert_eq!(r, 0);
122+
let r = libc::pthread_condattr_setclock(attr.as_mut_ptr(), libc::CLOCK_MONOTONIC);
123+
assert_eq!(r, 0);
124+
let r = libc::pthread_cond_init(addr_of_mut!((*parker).cvar).cast(), attr.as_ptr());
125+
assert_eq!(r, 0);
126+
let r = libc::pthread_condattr_destroy(attr.as_mut_ptr());
127+
assert_eq!(r, 0);
128+
}
129+
}
130+
}
131+
132+
// This implementation doesn't require `unsafe`, but other implementations
133+
// may assume this is only called by the thread that owns the Parker.
134+
pub unsafe fn park(self: Pin<&Self>) {
135+
// If we were previously notified then we consume this notification and
136+
// return quickly.
137+
if self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() {
138+
return;
139+
}
140+
141+
// Otherwise we need to coordinate going to sleep
142+
lock(self.lock.get());
143+
match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
144+
Ok(_) => {}
145+
Err(NOTIFIED) => {
146+
// We must read here, even though we know it will be `NOTIFIED`.
147+
// This is because `unpark` may have been called again since we read
148+
// `NOTIFIED` in the `compare_exchange` above. We must perform an
149+
// acquire operation that synchronizes with that `unpark` to observe
150+
// any writes it made before the call to unpark. To do that we must
151+
// read from the write it made to `state`.
152+
let old = self.state.swap(EMPTY, SeqCst);
153+
154+
unlock(self.lock.get());
155+
156+
assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
157+
return;
158+
} // should consume this notification, so prohibit spurious wakeups in next park.
159+
Err(_) => {
160+
unlock(self.lock.get());
161+
162+
panic!("inconsistent park state")
163+
}
164+
}
165+
166+
loop {
167+
wait(self.cvar.get(), self.lock.get());
168+
169+
match self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) {
170+
Ok(_) => break, // got a notification
171+
Err(_) => {} // spurious wakeup, go back to sleep
172+
}
173+
}
174+
175+
unlock(self.lock.get());
176+
}
177+
178+
// This implementation doesn't require `unsafe`, but other implementations
179+
// may assume this is only called by the thread that owns the Parker. Use
180+
// `Pin` to guarantee a stable address for the mutex and condition variable.
181+
pub unsafe fn park_timeout(self: Pin<&Self>, dur: Duration) {
182+
// Like `park` above we have a fast path for an already-notified thread, and
183+
// afterwards we start coordinating for a sleep.
184+
// return quickly.
185+
if self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() {
186+
return;
187+
}
188+
189+
lock(self.lock.get());
190+
match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
191+
Ok(_) => {}
192+
Err(NOTIFIED) => {
193+
// We must read again here, see `park`.
194+
let old = self.state.swap(EMPTY, SeqCst);
195+
unlock(self.lock.get());
196+
197+
assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
198+
return;
199+
} // should consume this notification, so prohibit spurious wakeups in next park.
200+
Err(_) => {
201+
unlock(self.lock.get());
202+
panic!("inconsistent park_timeout state")
203+
}
204+
}
205+
206+
// Wait with a timeout, and if we spuriously wake up or otherwise wake up
207+
// from a notification we just want to unconditionally set the state back to
208+
// empty, either consuming a notification or un-flagging ourselves as
209+
// parked.
210+
wait_timeout(self.cvar.get(), self.lock.get(), dur);
211+
212+
match self.state.swap(EMPTY, SeqCst) {
213+
NOTIFIED => unlock(self.lock.get()), // got a notification, hurray!
214+
PARKED => unlock(self.lock.get()), // no notification, alas
215+
n => {
216+
unlock(self.lock.get());
217+
panic!("inconsistent park_timeout state: {n}")
218+
}
219+
}
220+
}
221+
222+
pub fn unpark(self: Pin<&Self>) {
223+
// To ensure the unparked thread will observe any writes we made
224+
// before this call, we must perform a release operation that `park`
225+
// can synchronize with. To do that we must write `NOTIFIED` even if
226+
// `state` is already `NOTIFIED`. That is why this must be a swap
227+
// rather than a compare-and-swap that returns if it reads `NOTIFIED`
228+
// on failure.
229+
match self.state.swap(NOTIFIED, SeqCst) {
230+
EMPTY => return, // no one was waiting
231+
NOTIFIED => return, // already unparked
232+
PARKED => {} // gotta go wake someone up
233+
_ => panic!("inconsistent state in unpark"),
234+
}
235+
236+
// There is a period between when the parked thread sets `state` to
237+
// `PARKED` (or last checked `state` in the case of a spurious wake
238+
// up) and when it actually waits on `cvar`. If we were to notify
239+
// during this period it would be ignored and then when the parked
240+
// thread went to sleep it would never wake up. Fortunately, it has
241+
// `lock` locked at this stage so we can acquire `lock` to wait until
242+
// it is ready to receive the notification.
243+
//
244+
// Releasing `lock` before the call to `notify_one` means that when the
245+
// parked thread wakes it doesn't get woken only to have to wait for us
246+
// to release `lock`.
247+
unsafe {
248+
lock(self.lock.get());
249+
unlock(self.lock.get());
250+
notify_one(self.cvar.get());
251+
}
252+
}
253+
}
254+
255+
impl Drop for Parker {
256+
fn drop(&mut self) {
257+
unsafe {
258+
libc::pthread_cond_destroy(self.cvar.get_mut());
259+
libc::pthread_mutex_destroy(self.lock.get_mut());
260+
}
261+
}
262+
}
263+
264+
unsafe impl Sync for Parker {}
265+
unsafe impl Send for Parker {}

library/std/src/sys/unix/time.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ mod inner {
132132

133133
#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
134134
pub struct SystemTime {
135-
t: Timespec,
135+
pub(in crate::sys::unix) t: Timespec,
136136
}
137137

138138
pub const UNIX_EPOCH: SystemTime = SystemTime { t: Timespec::zero() };
@@ -279,7 +279,7 @@ mod inner {
279279

280280
#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
281281
pub struct SystemTime {
282-
t: Timespec,
282+
pub(in crate::sys::unix) t: Timespec,
283283
}
284284

285285
pub const UNIX_EPOCH: SystemTime = SystemTime { t: Timespec::zero() };

library/std/src/sys/windows/thread_parker.rs

+13-7
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
// [4]: Windows Internals, Part 1, ISBN 9780735671300
5959

6060
use crate::convert::TryFrom;
61+
use crate::pin::Pin;
6162
use crate::ptr;
6263
use crate::sync::atomic::{
6364
AtomicI8, AtomicPtr,
@@ -95,13 +96,16 @@ const NOTIFIED: i8 = 1;
9596
// Ordering::Release when writing NOTIFIED (the 'token') in unpark(), and using
9697
// Ordering::Acquire when reading this state in park() after waking up.
9798
impl Parker {
98-
pub fn new() -> Self {
99-
Self { state: AtomicI8::new(EMPTY) }
99+
/// Construct the Windows parker. The UNIX parker implementation
100+
/// requires this to happen in-place.
101+
pub unsafe fn new(parker: *mut Parker) {
102+
parker.write(Self { state: AtomicI8::new(EMPTY) });
100103
}
101104

102105
// Assumes this is only called by the thread that owns the Parker,
103-
// which means that `self.state != PARKED`.
104-
pub unsafe fn park(&self) {
106+
// which means that `self.state != PARKED`. This implementation doesn't require `Pin`,
107+
// but other implementations do.
108+
pub unsafe fn park(self: Pin<&Self>) {
105109
// Change NOTIFIED=>EMPTY or EMPTY=>PARKED, and directly return in the
106110
// first case.
107111
if self.state.fetch_sub(1, Acquire) == NOTIFIED {
@@ -132,8 +136,9 @@ impl Parker {
132136
}
133137

134138
// Assumes this is only called by the thread that owns the Parker,
135-
// which means that `self.state != PARKED`.
136-
pub unsafe fn park_timeout(&self, timeout: Duration) {
139+
// which means that `self.state != PARKED`. This implementation doesn't require `Pin`,
140+
// but other implementations do.
141+
pub unsafe fn park_timeout(self: Pin<&Self>, timeout: Duration) {
137142
// Change NOTIFIED=>EMPTY or EMPTY=>PARKED, and directly return in the
138143
// first case.
139144
if self.state.fetch_sub(1, Acquire) == NOTIFIED {
@@ -184,7 +189,8 @@ impl Parker {
184189
}
185190
}
186191

187-
pub fn unpark(&self) {
192+
// This implementation doesn't require `Pin`, but other implementations do.
193+
pub fn unpark(self: Pin<&Self>) {
188194
// Change PARKED=>NOTIFIED, EMPTY=>NOTIFIED, or NOTIFIED=>NOTIFIED, and
189195
// wake the thread in the first case.
190196
//

library/std/src/sys_common/thread_parker/futex.rs

+11-7
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use crate::pin::Pin;
12
use crate::sync::atomic::AtomicU32;
23
use crate::sync::atomic::Ordering::{Acquire, Release};
34
use crate::sys::futex::{futex_wait, futex_wake};
@@ -32,14 +33,15 @@ pub struct Parker {
3233
// Ordering::Release when writing NOTIFIED (the 'token') in unpark(), and using
3334
// Ordering::Acquire when checking for this state in park().
3435
impl Parker {
35-
#[inline]
36-
pub const fn new() -> Self {
37-
Parker { state: AtomicU32::new(EMPTY) }
36+
/// Construct the futex parker. The UNIX parker implementation
37+
/// requires this to happen in-place.
38+
pub unsafe fn new(parker: *mut Parker) {
39+
parker.write(Self { state: AtomicU32::new(EMPTY) });
3840
}
3941

4042
// Assumes this is only called by the thread that owns the Parker,
4143
// which means that `self.state != PARKED`.
42-
pub unsafe fn park(&self) {
44+
pub unsafe fn park(self: Pin<&Self>) {
4345
// Change NOTIFIED=>EMPTY or EMPTY=>PARKED, and directly return in the
4446
// first case.
4547
if self.state.fetch_sub(1, Acquire) == NOTIFIED {
@@ -58,8 +60,9 @@ impl Parker {
5860
}
5961

6062
// Assumes this is only called by the thread that owns the Parker,
61-
// which means that `self.state != PARKED`.
62-
pub unsafe fn park_timeout(&self, timeout: Duration) {
63+
// which means that `self.state != PARKED`. This implementation doesn't
64+
// require `Pin`, but other implementations do.
65+
pub unsafe fn park_timeout(self: Pin<&Self>, timeout: Duration) {
6366
// Change NOTIFIED=>EMPTY or EMPTY=>PARKED, and directly return in the
6467
// first case.
6568
if self.state.fetch_sub(1, Acquire) == NOTIFIED {
@@ -78,8 +81,9 @@ impl Parker {
7881
}
7982
}
8083

84+
// This implementation doesn't require `Pin`, but other implementations do.
8185
#[inline]
82-
pub fn unpark(&self) {
86+
pub fn unpark(self: Pin<&Self>) {
8387
// Change PARKED=>NOTIFIED, EMPTY=>NOTIFIED, or NOTIFIED=>NOTIFIED, and
8488
// wake the thread in the first case.
8589
//

0 commit comments

Comments
 (0)