Skip to content

Commit 7af9329

Browse files
committed
Auto merge of #95621 - saethlin:remove-mpsc-transmute, r=RalfJung
Remove ptr-int transmute in std::sync::mpsc Since #95340 landed, Miri with `-Zmiri-check-number-validity` produces an error on the test suites of some crates which implement concurrency tools<sup>*</sup>, because it seems like such crates tend to use `std::sync::mpsc` in their tests. This fixes the problem by storing pointer bytes in a pointer. <sup>*</sup> I have so far seen errors in the test suites of `once_cell`, `parking_lot`, and `crossbeam-utils`. (just updating the list for fun, idk) Also `threadpool`, `async-lock`, `futures-timer`, `fragile`, `scoped_threadpool`, `procfs`, `slog-async`, `scheduled-thread-pool`, `tokio-threadpool`, `mac`, `futures-cpupool`, `ntest`, `actix`, `zbus`, `jsonrpc-client-transports`, `fail`, `libp2p-gossipsub`, `parity-send-wrapper`, `async-broadcast,` `libp2p-relay`, `http-client`, `mockito`, `simple-mutex`, `surf`, `pollster`, and `pulse`. Then I turned the bot off.
2 parents 341883d + dec73f5 commit 7af9329

File tree

4 files changed

+50
-49
lines changed

4 files changed

+50
-49
lines changed

library/std/src/sync/mpsc/blocking.rs

+6-7
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
//! Generic support for building blocking abstractions.
22
3-
use crate::mem;
43
use crate::sync::atomic::{AtomicBool, Ordering};
54
use crate::sync::Arc;
65
use crate::thread::{self, Thread};
@@ -47,18 +46,18 @@ impl SignalToken {
4746
wake
4847
}
4948

50-
/// Converts to an unsafe usize value. Useful for storing in a pipe's state
49+
/// Converts to an unsafe raw pointer. Useful for storing in a pipe's state
5150
/// flag.
5251
#[inline]
53-
pub unsafe fn cast_to_usize(self) -> usize {
54-
mem::transmute(self.inner)
52+
pub unsafe fn to_raw(self) -> *mut u8 {
53+
Arc::into_raw(self.inner) as *mut u8
5554
}
5655

57-
/// Converts from an unsafe usize value. Useful for retrieving a pipe's state
56+
/// Converts from an unsafe raw pointer. Useful for retrieving a pipe's state
5857
/// flag.
5958
#[inline]
60-
pub unsafe fn cast_from_usize(signal_ptr: usize) -> SignalToken {
61-
SignalToken { inner: mem::transmute(signal_ptr) }
59+
pub unsafe fn from_raw(signal_ptr: *mut u8) -> SignalToken {
60+
SignalToken { inner: Arc::from_raw(signal_ptr as *mut Inner) }
6261
}
6362
}
6463

library/std/src/sync/mpsc/oneshot.rs

+12-12
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,15 @@ pub use self::UpgradeResult::*;
2727

2828
use crate::cell::UnsafeCell;
2929
use crate::ptr;
30-
use crate::sync::atomic::{AtomicUsize, Ordering};
30+
use crate::sync::atomic::{AtomicPtr, Ordering};
3131
use crate::sync::mpsc::blocking::{self, SignalToken};
3232
use crate::sync::mpsc::Receiver;
3333
use crate::time::Instant;
3434

3535
// Various states you can find a port in.
36-
const EMPTY: usize = 0; // initial state: no data, no blocked receiver
37-
const DATA: usize = 1; // data ready for receiver to take
38-
const DISCONNECTED: usize = 2; // channel is disconnected OR upgraded
36+
const EMPTY: *mut u8 = ptr::invalid_mut::<u8>(0); // initial state: no data, no blocked receiver
37+
const DATA: *mut u8 = ptr::invalid_mut::<u8>(1); // data ready for receiver to take
38+
const DISCONNECTED: *mut u8 = ptr::invalid_mut::<u8>(2); // channel is disconnected OR upgraded
3939
// Any other value represents a pointer to a SignalToken value. The
4040
// protocol ensures that when the state moves *to* a pointer,
4141
// ownership of the token is given to the packet, and when the state
@@ -44,7 +44,7 @@ const DISCONNECTED: usize = 2; // channel is disconnected OR upgraded
4444

4545
pub struct Packet<T> {
4646
// Internal state of the chan/port pair (stores the blocked thread as well)
47-
state: AtomicUsize,
47+
state: AtomicPtr<u8>,
4848
// One-shot data slot location
4949
data: UnsafeCell<Option<T>>,
5050
// when used for the second time, a oneshot channel must be upgraded, and
@@ -75,7 +75,7 @@ impl<T> Packet<T> {
7575
Packet {
7676
data: UnsafeCell::new(None),
7777
upgrade: UnsafeCell::new(NothingSent),
78-
state: AtomicUsize::new(EMPTY),
78+
state: AtomicPtr::new(EMPTY),
7979
}
8080
}
8181

@@ -108,7 +108,7 @@ impl<T> Packet<T> {
108108
// There is a thread waiting on the other end. We leave the 'DATA'
109109
// state inside so it'll pick it up on the other end.
110110
ptr => {
111-
SignalToken::cast_from_usize(ptr).signal();
111+
SignalToken::from_raw(ptr).signal();
112112
Ok(())
113113
}
114114
}
@@ -126,7 +126,7 @@ impl<T> Packet<T> {
126126
// like we're not empty, then immediately go through to `try_recv`.
127127
if self.state.load(Ordering::SeqCst) == EMPTY {
128128
let (wait_token, signal_token) = blocking::tokens();
129-
let ptr = unsafe { signal_token.cast_to_usize() };
129+
let ptr = unsafe { signal_token.to_raw() };
130130

131131
// race with senders to enter the blocking state
132132
if self.state.compare_exchange(EMPTY, ptr, Ordering::SeqCst, Ordering::SeqCst).is_ok() {
@@ -142,7 +142,7 @@ impl<T> Packet<T> {
142142
}
143143
} else {
144144
// drop the signal token, since we never blocked
145-
drop(unsafe { SignalToken::cast_from_usize(ptr) });
145+
drop(unsafe { SignalToken::from_raw(ptr) });
146146
}
147147
}
148148

@@ -218,7 +218,7 @@ impl<T> Packet<T> {
218218
}
219219

220220
// If someone's waiting, we gotta wake them up
221-
ptr => UpWoke(SignalToken::cast_from_usize(ptr)),
221+
ptr => UpWoke(SignalToken::from_raw(ptr)),
222222
}
223223
}
224224
}
@@ -229,7 +229,7 @@ impl<T> Packet<T> {
229229

230230
// If someone's waiting, we gotta wake them up
231231
ptr => unsafe {
232-
SignalToken::cast_from_usize(ptr).signal();
232+
SignalToken::from_raw(ptr).signal();
233233
},
234234
}
235235
}
@@ -301,7 +301,7 @@ impl<T> Packet<T> {
301301

302302
// We woke ourselves up from select.
303303
ptr => unsafe {
304-
drop(SignalToken::cast_from_usize(ptr));
304+
drop(SignalToken::from_raw(ptr));
305305
Ok(false)
306306
},
307307
}

library/std/src/sync/mpsc/shared.rs

+16-15
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use core::intrinsics::abort;
1515

1616
use crate::cell::UnsafeCell;
1717
use crate::ptr;
18-
use crate::sync::atomic::{AtomicBool, AtomicIsize, AtomicUsize, Ordering};
18+
use crate::sync::atomic::{AtomicBool, AtomicIsize, AtomicPtr, AtomicUsize, Ordering};
1919
use crate::sync::mpsc::blocking::{self, SignalToken};
2020
use crate::sync::mpsc::mpsc_queue as mpsc;
2121
use crate::sync::{Mutex, MutexGuard};
@@ -29,12 +29,13 @@ const MAX_REFCOUNT: usize = (isize::MAX) as usize;
2929
const MAX_STEALS: isize = 5;
3030
#[cfg(not(test))]
3131
const MAX_STEALS: isize = 1 << 20;
32+
const EMPTY: *mut u8 = ptr::null_mut(); // initial state: no data, no blocked receiver
3233

3334
pub struct Packet<T> {
3435
queue: mpsc::Queue<T>,
3536
cnt: AtomicIsize, // How many items are on this channel
3637
steals: UnsafeCell<isize>, // How many times has a port received without blocking?
37-
to_wake: AtomicUsize, // SignalToken for wake up
38+
to_wake: AtomicPtr<u8>, // SignalToken for wake up
3839

3940
// The number of channels which are currently using this packet.
4041
channels: AtomicUsize,
@@ -68,7 +69,7 @@ impl<T> Packet<T> {
6869
queue: mpsc::Queue::new(),
6970
cnt: AtomicIsize::new(0),
7071
steals: UnsafeCell::new(0),
71-
to_wake: AtomicUsize::new(0),
72+
to_wake: AtomicPtr::new(EMPTY),
7273
channels: AtomicUsize::new(2),
7374
port_dropped: AtomicBool::new(false),
7475
sender_drain: AtomicIsize::new(0),
@@ -93,8 +94,8 @@ impl<T> Packet<T> {
9394
pub fn inherit_blocker(&self, token: Option<SignalToken>, guard: MutexGuard<'_, ()>) {
9495
if let Some(token) = token {
9596
assert_eq!(self.cnt.load(Ordering::SeqCst), 0);
96-
assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
97-
self.to_wake.store(unsafe { token.cast_to_usize() }, Ordering::SeqCst);
97+
assert_eq!(self.to_wake.load(Ordering::SeqCst), EMPTY);
98+
self.to_wake.store(unsafe { token.to_raw() }, Ordering::SeqCst);
9899
self.cnt.store(-1, Ordering::SeqCst);
99100

100101
// This store is a little sketchy. What's happening here is that
@@ -250,10 +251,10 @@ impl<T> Packet<T> {
250251
unsafe {
251252
assert_eq!(
252253
self.to_wake.load(Ordering::SeqCst),
253-
0,
254+
EMPTY,
254255
"This is a known bug in the Rust standard library. See https://github.com/rust-lang/rust/issues/39364"
255256
);
256-
let ptr = token.cast_to_usize();
257+
let ptr = token.to_raw();
257258
self.to_wake.store(ptr, Ordering::SeqCst);
258259

259260
let steals = ptr::replace(self.steals.get(), 0);
@@ -272,8 +273,8 @@ impl<T> Packet<T> {
272273
}
273274
}
274275

275-
self.to_wake.store(0, Ordering::SeqCst);
276-
drop(SignalToken::cast_from_usize(ptr));
276+
self.to_wake.store(EMPTY, Ordering::SeqCst);
277+
drop(SignalToken::from_raw(ptr));
277278
Abort
278279
}
279280
}
@@ -415,9 +416,9 @@ impl<T> Packet<T> {
415416
// Consumes ownership of the 'to_wake' field.
416417
fn take_to_wake(&self) -> SignalToken {
417418
let ptr = self.to_wake.load(Ordering::SeqCst);
418-
self.to_wake.store(0, Ordering::SeqCst);
419-
assert!(ptr != 0);
420-
unsafe { SignalToken::cast_from_usize(ptr) }
419+
self.to_wake.store(EMPTY, Ordering::SeqCst);
420+
assert!(ptr != EMPTY);
421+
unsafe { SignalToken::from_raw(ptr) }
421422
}
422423

423424
////////////////////////////////////////////////////////////////////////////
@@ -462,15 +463,15 @@ impl<T> Packet<T> {
462463
let prev = self.bump(steals + 1);
463464

464465
if prev == DISCONNECTED {
465-
assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
466+
assert_eq!(self.to_wake.load(Ordering::SeqCst), EMPTY);
466467
true
467468
} else {
468469
let cur = prev + steals + 1;
469470
assert!(cur >= 0);
470471
if prev < 0 {
471472
drop(self.take_to_wake());
472473
} else {
473-
while self.to_wake.load(Ordering::SeqCst) != 0 {
474+
while self.to_wake.load(Ordering::SeqCst) != EMPTY {
474475
thread::yield_now();
475476
}
476477
}
@@ -494,7 +495,7 @@ impl<T> Drop for Packet<T> {
494495
// `to_wake`, so this assert cannot be removed with also removing
495496
// the `to_wake` assert.
496497
assert_eq!(self.cnt.load(Ordering::SeqCst), DISCONNECTED);
497-
assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
498+
assert_eq!(self.to_wake.load(Ordering::SeqCst), EMPTY);
498499
assert_eq!(self.channels.load(Ordering::SeqCst), 0);
499500
}
500501
}

library/std/src/sync/mpsc/stream.rs

+16-15
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use crate::ptr;
1717
use crate::thread;
1818
use crate::time::Instant;
1919

20-
use crate::sync::atomic::{AtomicBool, AtomicIsize, AtomicUsize, Ordering};
20+
use crate::sync::atomic::{AtomicBool, AtomicIsize, AtomicPtr, Ordering};
2121
use crate::sync::mpsc::blocking::{self, SignalToken};
2222
use crate::sync::mpsc::spsc_queue as spsc;
2323
use crate::sync::mpsc::Receiver;
@@ -27,15 +27,16 @@ const DISCONNECTED: isize = isize::MIN;
2727
const MAX_STEALS: isize = 5;
2828
#[cfg(not(test))]
2929
const MAX_STEALS: isize = 1 << 20;
30+
const EMPTY: *mut u8 = ptr::null_mut(); // initial state: no data, no blocked receiver
3031

3132
pub struct Packet<T> {
3233
// internal queue for all messages
3334
queue: spsc::Queue<Message<T>, ProducerAddition, ConsumerAddition>,
3435
}
3536

3637
struct ProducerAddition {
37-
cnt: AtomicIsize, // How many items are on this channel
38-
to_wake: AtomicUsize, // SignalToken for the blocked thread to wake up
38+
cnt: AtomicIsize, // How many items are on this channel
39+
to_wake: AtomicPtr<u8>, // SignalToken for the blocked thread to wake up
3940

4041
port_dropped: AtomicBool, // flag if the channel has been destroyed.
4142
}
@@ -71,7 +72,7 @@ impl<T> Packet<T> {
7172
128,
7273
ProducerAddition {
7374
cnt: AtomicIsize::new(0),
74-
to_wake: AtomicUsize::new(0),
75+
to_wake: AtomicPtr::new(EMPTY),
7576

7677
port_dropped: AtomicBool::new(false),
7778
},
@@ -147,17 +148,17 @@ impl<T> Packet<T> {
147148
// Consumes ownership of the 'to_wake' field.
148149
fn take_to_wake(&self) -> SignalToken {
149150
let ptr = self.queue.producer_addition().to_wake.load(Ordering::SeqCst);
150-
self.queue.producer_addition().to_wake.store(0, Ordering::SeqCst);
151-
assert!(ptr != 0);
152-
unsafe { SignalToken::cast_from_usize(ptr) }
151+
self.queue.producer_addition().to_wake.store(EMPTY, Ordering::SeqCst);
152+
assert!(ptr != EMPTY);
153+
unsafe { SignalToken::from_raw(ptr) }
153154
}
154155

155156
// Decrements the count on the channel for a sleeper, returning the sleeper
156157
// back if it shouldn't sleep. Note that this is the location where we take
157158
// steals into account.
158159
fn decrement(&self, token: SignalToken) -> Result<(), SignalToken> {
159-
assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0);
160-
let ptr = unsafe { token.cast_to_usize() };
160+
assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), EMPTY);
161+
let ptr = unsafe { token.to_raw() };
161162
self.queue.producer_addition().to_wake.store(ptr, Ordering::SeqCst);
162163

163164
let steals = unsafe { ptr::replace(self.queue.consumer_addition().steals.get(), 0) };
@@ -176,8 +177,8 @@ impl<T> Packet<T> {
176177
}
177178
}
178179

179-
self.queue.producer_addition().to_wake.store(0, Ordering::SeqCst);
180-
Err(unsafe { SignalToken::cast_from_usize(ptr) })
180+
self.queue.producer_addition().to_wake.store(EMPTY, Ordering::SeqCst);
181+
Err(unsafe { SignalToken::from_raw(ptr) })
181182
}
182183

183184
pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure<T>> {
@@ -376,7 +377,7 @@ impl<T> Packet<T> {
376377
// of time until the data is actually sent.
377378
if was_upgrade {
378379
assert_eq!(unsafe { *self.queue.consumer_addition().steals.get() }, 0);
379-
assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0);
380+
assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), EMPTY);
380381
return Ok(true);
381382
}
382383

@@ -389,7 +390,7 @@ impl<T> Packet<T> {
389390
// If we were previously disconnected, then we know for sure that there
390391
// is no thread in to_wake, so just keep going
391392
let has_data = if prev == DISCONNECTED {
392-
assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0);
393+
assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), EMPTY);
393394
true // there is data, that data is that we're disconnected
394395
} else {
395396
let cur = prev + steals + 1;
@@ -412,7 +413,7 @@ impl<T> Packet<T> {
412413
if prev < 0 {
413414
drop(self.take_to_wake());
414415
} else {
415-
while self.queue.producer_addition().to_wake.load(Ordering::SeqCst) != 0 {
416+
while self.queue.producer_addition().to_wake.load(Ordering::SeqCst) != EMPTY {
416417
thread::yield_now();
417418
}
418419
}
@@ -451,6 +452,6 @@ impl<T> Drop for Packet<T> {
451452
// `to_wake`, so this assert cannot be removed with also removing
452453
// the `to_wake` assert.
453454
assert_eq!(self.queue.producer_addition().cnt.load(Ordering::SeqCst), DISCONNECTED);
454-
assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0);
455+
assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), EMPTY);
455456
}
456457
}

0 commit comments

Comments
 (0)