Skip to content

Commit 8575097

Browse files
bors[bot]faern
andauthored
Merge #191
191: Add initial parking_lot_core tests r=Amanieu a=faern This library could use more tests, to be more robust. This was one of the things pointed out in the review of the integration into std in rust-lang/rust#56410 `WTF::ParkingLot` has a number of tests we can take inspiration from. And this is somewhat of a port of some of those tests. However, when I ported it 1:1 I found a race condition, so I had to implement the semaphore a bit differently. I also got away without the tests relying on a working mutex or condvar implementation. Because we can't make the `parking_lot_core` tests depend on the higher level `parking_lot`, nor do we want to depend on locking primitives in std if we aim to become the base for those implementations one day. Co-authored-by: Linus Färnstrand <[email protected]>
2 parents d2881a0 + e716214 commit 8575097

File tree

1 file changed

+273
-0
lines changed

1 file changed

+273
-0
lines changed

core/src/parking_lot.rs

+273
Original file line numberDiff line numberDiff line change
@@ -1389,3 +1389,276 @@ mod deadlock_impl {
13891389
cycles.iter().cloned().collect()
13901390
}
13911391
}
1392+
1393+
#[cfg(test)]
1394+
mod tests {
1395+
use super::{ThreadData, DEFAULT_PARK_TOKEN, DEFAULT_UNPARK_TOKEN};
1396+
use std::{
1397+
ptr,
1398+
sync::{
1399+
atomic::{AtomicIsize, AtomicPtr, AtomicUsize, Ordering},
1400+
Arc,
1401+
},
1402+
thread,
1403+
time::Duration,
1404+
};
1405+
1406+
/// Calls a closure for every `ThreadData` currently parked on a given key
1407+
fn for_each(key: usize, mut f: impl FnMut(&ThreadData)) {
1408+
let bucket = super::lock_bucket(key);
1409+
1410+
let mut current: *const ThreadData = bucket.queue_head.get();
1411+
while !current.is_null() {
1412+
let current_ref = unsafe { &*current };
1413+
if current_ref.key.load(Ordering::Relaxed) == key {
1414+
f(current_ref);
1415+
}
1416+
current = current_ref.next_in_queue.get();
1417+
}
1418+
1419+
// SAFETY: We hold the lock here, as required
1420+
unsafe { bucket.mutex.unlock() };
1421+
}
1422+
1423+
macro_rules! test {
1424+
( $( $name:ident(
1425+
repeats: $repeats:expr,
1426+
latches: $latches:expr,
1427+
delay: $delay:expr,
1428+
threads: $threads:expr,
1429+
single_unparks: $single_unparks:expr);
1430+
)* ) => {
1431+
$(#[test]
1432+
fn $name() {
1433+
let delay = Duration::from_micros($delay);
1434+
for _ in 0..$repeats {
1435+
run_parking_test($latches, delay, $threads, $single_unparks);
1436+
}
1437+
})*
1438+
};
1439+
}
1440+
1441+
test! {
1442+
unpark_all_one_fast(
1443+
repeats: 10000, latches: 1, delay: 0, threads: 1, single_unparks: 0
1444+
);
1445+
unpark_all_hundred_fast(
1446+
repeats: 100, latches: 1, delay: 0, threads: 100, single_unparks: 0
1447+
);
1448+
unpark_one_one_fast(
1449+
repeats: 1000, latches: 1, delay: 0, threads: 1, single_unparks: 1
1450+
);
1451+
unpark_one_hundred_fast(
1452+
repeats: 20, latches: 1, delay: 0, threads: 100, single_unparks: 100
1453+
);
1454+
unpark_one_fifty_then_fifty_all_fast(
1455+
repeats: 50, latches: 1, delay: 0, threads: 100, single_unparks: 50
1456+
);
1457+
unpark_all_one(
1458+
repeats: 100, latches: 1, delay: 10000, threads: 1, single_unparks: 0
1459+
);
1460+
unpark_all_hundred(
1461+
repeats: 100, latches: 1, delay: 10000, threads: 100, single_unparks: 0
1462+
);
1463+
unpark_one_one(
1464+
repeats: 10, latches: 1, delay: 10000, threads: 1, single_unparks: 1
1465+
);
1466+
unpark_one_fifty(
1467+
repeats: 1, latches: 1, delay: 10000, threads: 50, single_unparks: 50
1468+
);
1469+
unpark_one_fifty_then_fifty_all(
1470+
repeats: 2, latches: 1, delay: 10000, threads: 100, single_unparks: 50
1471+
);
1472+
hundred_unpark_all_one_fast(
1473+
repeats: 100, latches: 100, delay: 0, threads: 1, single_unparks: 0
1474+
);
1475+
hundred_unpark_all_one(
1476+
repeats: 1, latches: 100, delay: 10000, threads: 1, single_unparks: 0
1477+
);
1478+
}
1479+
1480+
fn run_parking_test(
1481+
num_latches: usize,
1482+
delay: Duration,
1483+
num_threads: usize,
1484+
num_single_unparks: usize,
1485+
) {
1486+
let mut tests = Vec::with_capacity(num_latches);
1487+
1488+
for _ in 0..num_latches {
1489+
let test = Arc::new(SingleLatchTest::new(num_threads));
1490+
let mut threads = Vec::with_capacity(num_threads);
1491+
for _ in 0..num_threads {
1492+
let test = test.clone();
1493+
threads.push(thread::spawn(move || test.run()));
1494+
}
1495+
tests.push((test, threads));
1496+
}
1497+
1498+
for unpark_index in 0..num_single_unparks {
1499+
thread::sleep(delay);
1500+
for (test, _) in &tests {
1501+
test.unpark_one(unpark_index);
1502+
}
1503+
}
1504+
1505+
for (test, threads) in tests {
1506+
test.finish(num_single_unparks);
1507+
for thread in threads {
1508+
thread.join().expect("Test thread panic");
1509+
}
1510+
}
1511+
}
1512+
1513+
struct SingleLatchTest {
1514+
semaphore: AtomicIsize,
1515+
num_awake: AtomicUsize,
1516+
/// Holds the pointer to the last *unprocessed* woken up thread.
1517+
last_awoken: AtomicPtr<ThreadData>,
1518+
/// Total number of threads participating in this test.
1519+
num_threads: usize,
1520+
}
1521+
1522+
impl SingleLatchTest {
1523+
pub fn new(num_threads: usize) -> Self {
1524+
Self {
1525+
// This implements a fair (FIFO) semaphore, and it starts out unavailable.
1526+
semaphore: AtomicIsize::new(0),
1527+
num_awake: AtomicUsize::new(0),
1528+
last_awoken: AtomicPtr::new(ptr::null_mut()),
1529+
num_threads,
1530+
}
1531+
}
1532+
1533+
pub fn run(&self) {
1534+
// Get one slot from the semaphore
1535+
self.down();
1536+
1537+
// Report back to the test verification code that this thread woke up
1538+
let this_thread_ptr = super::with_thread_data(|t| t as *const _ as *mut _);
1539+
self.last_awoken.store(this_thread_ptr, Ordering::SeqCst);
1540+
self.num_awake.fetch_add(1, Ordering::SeqCst);
1541+
}
1542+
1543+
pub fn unpark_one(&self, single_unpark_index: usize) {
1544+
// last_awoken should be null at all times except between self.up() and at the bottom
1545+
// of this method where it's reset to null again
1546+
assert!(self.last_awoken.load(Ordering::SeqCst).is_null());
1547+
1548+
let mut queue: Vec<*mut ThreadData> = Vec::with_capacity(self.num_threads);
1549+
for_each(self.semaphore_addr(), |thread_data| {
1550+
queue.push(thread_data as *const _ as *mut _);
1551+
});
1552+
assert!(queue.len() <= self.num_threads - single_unpark_index);
1553+
1554+
let num_awake_before_up = self.num_awake.load(Ordering::SeqCst);
1555+
1556+
self.up();
1557+
1558+
// Wait for a parked thread to wake up and update num_awake + last_awoken.
1559+
while self.num_awake.load(Ordering::SeqCst) != num_awake_before_up + 1 {
1560+
thread::yield_now();
1561+
}
1562+
1563+
// At this point the other thread should have set last_awoken inside the run() method
1564+
let last_awoken = self.last_awoken.load(Ordering::SeqCst);
1565+
assert!(!last_awoken.is_null());
1566+
if !queue.is_empty() && queue[0] != last_awoken {
1567+
panic!(
1568+
"Woke up wrong thread:\n\tqueue: {:?}\n\tlast awoken: {:?}",
1569+
queue, last_awoken
1570+
);
1571+
}
1572+
self.last_awoken.store(ptr::null_mut(), Ordering::SeqCst);
1573+
}
1574+
1575+
pub fn finish(&self, num_single_unparks: usize) {
1576+
// The amount of threads not unparked via unpark_one
1577+
let mut num_threads_left = self.num_threads.checked_sub(num_single_unparks).unwrap();
1578+
1579+
// Wake remaining threads up with unpark_all. Has to be in a loop, because there might
1580+
// still be threads that has not yet parked.
1581+
while num_threads_left > 0 {
1582+
let mut num_waiting_on_address = 0;
1583+
for_each(self.semaphore_addr(), |_thread_data| {
1584+
num_waiting_on_address += 1;
1585+
});
1586+
assert!(num_waiting_on_address <= num_threads_left);
1587+
1588+
let num_awake_before_unpark = self.num_awake.load(Ordering::SeqCst);
1589+
1590+
let num_unparked =
1591+
unsafe { super::unpark_all(self.semaphore_addr(), DEFAULT_UNPARK_TOKEN) };
1592+
assert!(num_unparked >= num_waiting_on_address);
1593+
assert!(num_unparked <= num_threads_left);
1594+
1595+
// Wait for all unparked threads to wake up and update num_awake + last_awoken.
1596+
while self.num_awake.load(Ordering::SeqCst)
1597+
!= num_awake_before_unpark + num_unparked
1598+
{
1599+
thread::yield_now()
1600+
}
1601+
1602+
num_threads_left = num_threads_left.checked_sub(num_unparked).unwrap();
1603+
}
1604+
// By now, all threads should have been woken up
1605+
assert_eq!(self.num_awake.load(Ordering::SeqCst), self.num_threads);
1606+
1607+
// Make sure no thread is parked on our semaphore address
1608+
let mut num_waiting_on_address = 0;
1609+
for_each(self.semaphore_addr(), |_thread_data| {
1610+
num_waiting_on_address += 1;
1611+
});
1612+
assert_eq!(num_waiting_on_address, 0);
1613+
}
1614+
1615+
pub fn down(&self) {
1616+
let old_semaphore_value = self.semaphore.fetch_sub(1, Ordering::SeqCst);
1617+
1618+
if old_semaphore_value > 0 {
1619+
// We acquired the semaphore. Done.
1620+
return;
1621+
}
1622+
1623+
// We need to wait.
1624+
let validate = || true;
1625+
let before_sleep = || {};
1626+
let timed_out = |_, _| {};
1627+
unsafe {
1628+
super::park(
1629+
self.semaphore_addr(),
1630+
validate,
1631+
before_sleep,
1632+
timed_out,
1633+
DEFAULT_PARK_TOKEN,
1634+
None,
1635+
);
1636+
}
1637+
}
1638+
1639+
pub fn up(&self) {
1640+
let old_semaphore_value = self.semaphore.fetch_add(1, Ordering::SeqCst);
1641+
1642+
// Check if anyone was waiting on the semaphore. If they were, then pass ownership to them.
1643+
if old_semaphore_value < 0 {
1644+
// We need to continue until we have actually unparked someone. It might be that
1645+
// the thread we want to pass ownership to has decremented the semaphore counter,
1646+
// but not yet parked.
1647+
loop {
1648+
match unsafe {
1649+
super::unpark_one(self.semaphore_addr(), |_| DEFAULT_UNPARK_TOKEN)
1650+
.unparked_threads
1651+
} {
1652+
1 => break,
1653+
0 => (),
1654+
i => panic!("Should not wake up {} threads", i),
1655+
}
1656+
}
1657+
}
1658+
}
1659+
1660+
fn semaphore_addr(&self) -> usize {
1661+
&self.semaphore as *const _ as usize
1662+
}
1663+
}
1664+
}

0 commit comments

Comments
 (0)