@@ -17,7 +17,7 @@ use crate::ptr;
17
17
use crate :: thread;
18
18
use crate :: time:: Instant ;
19
19
20
- use crate :: sync:: atomic:: { AtomicBool , AtomicIsize , AtomicUsize , Ordering } ;
20
+ use crate :: sync:: atomic:: { AtomicBool , AtomicIsize , AtomicPtr , Ordering } ;
21
21
use crate :: sync:: mpsc:: blocking:: { self , SignalToken } ;
22
22
use crate :: sync:: mpsc:: spsc_queue as spsc;
23
23
use crate :: sync:: mpsc:: Receiver ;
@@ -27,15 +27,16 @@ const DISCONNECTED: isize = isize::MIN;
27
27
const MAX_STEALS : isize = 5 ;
28
28
#[ cfg( not( test) ) ]
29
29
const MAX_STEALS : isize = 1 << 20 ;
30
+ const EMPTY : * mut u8 = ptr:: null_mut ( ) ; // initial state: no data, no blocked receiver
30
31
31
32
pub struct Packet < T > {
32
33
// internal queue for all messages
33
34
queue : spsc:: Queue < Message < T > , ProducerAddition , ConsumerAddition > ,
34
35
}
35
36
36
37
struct ProducerAddition {
37
- cnt : AtomicIsize , // How many items are on this channel
38
- to_wake : AtomicUsize , // SignalToken for the blocked thread to wake up
38
+ cnt : AtomicIsize , // How many items are on this channel
39
+ to_wake : AtomicPtr < u8 > , // SignalToken for the blocked thread to wake up
39
40
40
41
port_dropped : AtomicBool , // flag if the channel has been destroyed.
41
42
}
@@ -71,7 +72,7 @@ impl<T> Packet<T> {
71
72
128 ,
72
73
ProducerAddition {
73
74
cnt : AtomicIsize :: new ( 0 ) ,
74
- to_wake : AtomicUsize :: new ( 0 ) ,
75
+ to_wake : AtomicPtr :: new ( EMPTY ) ,
75
76
76
77
port_dropped : AtomicBool :: new ( false ) ,
77
78
} ,
@@ -147,17 +148,17 @@ impl<T> Packet<T> {
147
148
// Consumes ownership of the 'to_wake' field.
148
149
fn take_to_wake ( & self ) -> SignalToken {
149
150
let ptr = self . queue . producer_addition ( ) . to_wake . load ( Ordering :: SeqCst ) ;
150
- self . queue . producer_addition ( ) . to_wake . store ( 0 , Ordering :: SeqCst ) ;
151
- assert ! ( ptr != 0 ) ;
152
- unsafe { SignalToken :: cast_from_usize ( ptr) }
151
+ self . queue . producer_addition ( ) . to_wake . store ( EMPTY , Ordering :: SeqCst ) ;
152
+ assert ! ( ptr != EMPTY ) ;
153
+ unsafe { SignalToken :: from_raw ( ptr) }
153
154
}
154
155
155
156
// Decrements the count on the channel for a sleeper, returning the sleeper
156
157
// back if it shouldn't sleep. Note that this is the location where we take
157
158
// steals into account.
158
159
fn decrement ( & self , token : SignalToken ) -> Result < ( ) , SignalToken > {
159
- assert_eq ! ( self . queue. producer_addition( ) . to_wake. load( Ordering :: SeqCst ) , 0 ) ;
160
- let ptr = unsafe { token. cast_to_usize ( ) } ;
160
+ assert_eq ! ( self . queue. producer_addition( ) . to_wake. load( Ordering :: SeqCst ) , EMPTY ) ;
161
+ let ptr = unsafe { token. to_raw ( ) } ;
161
162
self . queue . producer_addition ( ) . to_wake . store ( ptr, Ordering :: SeqCst ) ;
162
163
163
164
let steals = unsafe { ptr:: replace ( self . queue . consumer_addition ( ) . steals . get ( ) , 0 ) } ;
@@ -176,8 +177,8 @@ impl<T> Packet<T> {
176
177
}
177
178
}
178
179
179
- self . queue . producer_addition ( ) . to_wake . store ( 0 , Ordering :: SeqCst ) ;
180
- Err ( unsafe { SignalToken :: cast_from_usize ( ptr) } )
180
+ self . queue . producer_addition ( ) . to_wake . store ( EMPTY , Ordering :: SeqCst ) ;
181
+ Err ( unsafe { SignalToken :: from_raw ( ptr) } )
181
182
}
182
183
183
184
pub fn recv ( & self , deadline : Option < Instant > ) -> Result < T , Failure < T > > {
@@ -376,7 +377,7 @@ impl<T> Packet<T> {
376
377
// of time until the data is actually sent.
377
378
if was_upgrade {
378
379
assert_eq ! ( unsafe { * self . queue. consumer_addition( ) . steals. get( ) } , 0 ) ;
379
- assert_eq ! ( self . queue. producer_addition( ) . to_wake. load( Ordering :: SeqCst ) , 0 ) ;
380
+ assert_eq ! ( self . queue. producer_addition( ) . to_wake. load( Ordering :: SeqCst ) , EMPTY ) ;
380
381
return Ok ( true ) ;
381
382
}
382
383
@@ -389,7 +390,7 @@ impl<T> Packet<T> {
389
390
// If we were previously disconnected, then we know for sure that there
390
391
// is no thread in to_wake, so just keep going
391
392
let has_data = if prev == DISCONNECTED {
392
- assert_eq ! ( self . queue. producer_addition( ) . to_wake. load( Ordering :: SeqCst ) , 0 ) ;
393
+ assert_eq ! ( self . queue. producer_addition( ) . to_wake. load( Ordering :: SeqCst ) , EMPTY ) ;
393
394
true // there is data, that data is that we're disconnected
394
395
} else {
395
396
let cur = prev + steals + 1 ;
@@ -412,7 +413,7 @@ impl<T> Packet<T> {
412
413
if prev < 0 {
413
414
drop ( self . take_to_wake ( ) ) ;
414
415
} else {
415
- while self . queue . producer_addition ( ) . to_wake . load ( Ordering :: SeqCst ) != 0 {
416
+ while self . queue . producer_addition ( ) . to_wake . load ( Ordering :: SeqCst ) != EMPTY {
416
417
thread:: yield_now ( ) ;
417
418
}
418
419
}
@@ -451,6 +452,6 @@ impl<T> Drop for Packet<T> {
451
452
// `to_wake`, so this assert cannot be removed with also removing
452
453
// the `to_wake` assert.
453
454
assert_eq ! ( self . queue. producer_addition( ) . cnt. load( Ordering :: SeqCst ) , DISCONNECTED ) ;
454
- assert_eq ! ( self . queue. producer_addition( ) . to_wake. load( Ordering :: SeqCst ) , 0 ) ;
455
+ assert_eq ! ( self . queue. producer_addition( ) . to_wake. load( Ordering :: SeqCst ) , EMPTY ) ;
455
456
}
456
457
}
0 commit comments