Skip to content

Commit 5821331

Browse files
committed
Add a new ManualResetEvent synchronization primitive.
The primitive can be used to let one or multiple tasks wait for a certain event. Compared to the existing oneshot channel, the difference is that the ManualResetEvent doesn't carry a value, and is reusable. In addition to the thread-safe ManualResetEvent a non-allocating LocalManualResetEvent variant is provided. This can be e.g. used as a cancellation token in nested asynchronous functions.
1 parent 7aa83f6 commit 5821331

File tree

5 files changed

+648
-1
lines changed

5 files changed

+648
-1
lines changed

futures-channel/src/lib.rs

+2
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,5 @@ mod lock;
1818
pub mod mpsc;
1919
#[cfg(feature = "std")]
2020
pub mod oneshot;
21+
#[cfg(feature = "std")]
22+
pub mod manual_reset_event;
+348
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,348 @@
1+
//! An event for signalization between tasks
2+
3+
use futures_core::future::{Future, FusedFuture};
4+
use futures_core::task::{LocalWaker, Poll, Waker};
5+
use std::marker::{Pinned};
6+
use std::pin::Pin;
7+
use std::sync::Arc;
8+
use std::sync::Mutex;
9+
use std::ptr::null_mut;
10+
use std::cell::RefCell;
11+
12+
#[derive(Debug, PartialEq)]
13+
enum PollState {
14+
New,
15+
Waiting,
16+
Done,
17+
}
18+
19+
/// Tracks the WaitHandle registration state.
20+
/// Access to this struct is synchronized through the mutex in the Event.
21+
#[derive(Debug)]
22+
struct WaitHandleRegistration {
23+
/// The task handle of the waiting task
24+
task: Option<Waker>,
25+
/// Next WaitHandleRegistration in the intrusive list
26+
next: *mut WaitHandleRegistration,
27+
/// Current polling state
28+
state: PollState,
29+
/// Whether WaitHandle has been polled to completion
30+
terminated: bool,
31+
/// Prevents the WaitHandleRegistration from being moved.
32+
/// This is important, since the address of the WaitHandleRegistration must be stable once polled.
33+
_pin: Pinned,
34+
}
35+
36+
impl WaitHandleRegistration {
37+
/// Creates a new WaitHandleRegistration
38+
fn new() -> WaitHandleRegistration {
39+
WaitHandleRegistration {
40+
task: None,
41+
next: null_mut(),
42+
state: PollState::New,
43+
terminated: false,
44+
_pin: Pinned,
45+
}
46+
}
47+
}
48+
49+
#[derive(Debug)]
50+
struct WaitHandle {
51+
/// The ManualResetEvent this is associated with this WaitHandle
52+
event: Arc<Mutex<InnerEventState>>,
53+
/// Registration at the event
54+
reg: WaitHandleRegistration,
55+
}
56+
57+
#[derive(Debug)]
58+
struct LocalWaitHandle<'a> {
59+
/// The LocalManualResetEvent this is associated with this WaitHandle
60+
event: &'a RefCell<InnerEventState>,
61+
/// Registration at the event
62+
reg: WaitHandleRegistration,
63+
}
64+
65+
/// A synchronization primitive which can be either in the set or reset state.
66+
///
67+
/// Tasks can wait for the event to get set by obtaining a Future via poll_set.
68+
/// This Future will get fulfilled when the event had been set.
69+
#[derive(Debug, Clone)]
70+
pub struct ManualResetEvent {
71+
inner: Arc<Mutex<InnerEventState>>,
72+
}
73+
74+
// Automatic derive doesn't work due to the unsafe pointer in WaitHandleRegistration
75+
unsafe impl Send for ManualResetEvent {}
76+
unsafe impl Sync for ManualResetEvent {}
77+
78+
/// A synchronization primitive which can be either in the set or reset state.
79+
///
80+
/// Tasks can wait for the event to get set by obtaining a Future via poll_set.
81+
/// This Future will get fulfilled when the event had been set.
82+
#[derive(Debug)]
83+
pub struct LocalManualResetEvent {
84+
inner: RefCell<InnerEventState>,
85+
}
86+
87+
/// Internal state of the `ManualResetEvent` pair above
88+
#[derive(Debug)]
89+
struct InnerEventState {
90+
is_set: bool,
91+
waiters: *mut WaitHandleRegistration,
92+
}
93+
94+
impl InnerEventState {
95+
fn new(is_set: bool) -> InnerEventState {
96+
InnerEventState {
97+
is_set,
98+
waiters: null_mut(),
99+
}
100+
}
101+
102+
fn reset(&mut self) {
103+
self.is_set = false;
104+
}
105+
106+
fn set(&mut self) {
107+
if self.is_set != true {
108+
self.is_set = true;
109+
110+
// Wakeup all waiters
111+
// This happens inside the lock to make cancellation reliable
112+
// If we would access waiters outside of the lock, the pointers
113+
// may no longer be valid.
114+
// Typically this shouldn't be an issue, since waking a task should
115+
// only move it from the blocked into the ready state and not have
116+
// further side effects.
117+
118+
let mut waiter = self.waiters;
119+
self.waiters = null_mut();
120+
121+
unsafe {
122+
while waiter != null_mut() {
123+
let task = (*waiter).task.take();
124+
if let Some(ref handle) = task {
125+
handle.wake();
126+
}
127+
(*waiter).state = PollState::Done;
128+
waiter = (*waiter).next;
129+
}
130+
}
131+
}
132+
}
133+
134+
fn is_set(&self) -> bool {
135+
return self.is_set;
136+
}
137+
138+
/// Polls one WaitHandle for completion. If the event isn't set, the WaitHandle gets registered
139+
/// at the event, and will be signalled once ready.
140+
fn poll_waiter(
141+
&mut self,
142+
wait_handle: Pin<&mut WaitHandleRegistration>,
143+
lw: &LocalWaker,
144+
) -> Poll<()> {
145+
let wait_handle: &mut WaitHandleRegistration = unsafe { Pin::get_mut_unchecked(wait_handle) };
146+
let addr = wait_handle as *mut WaitHandleRegistration;
147+
148+
match wait_handle.state {
149+
PollState::New => {
150+
if self.is_set {
151+
// The event is already signaled
152+
wait_handle.state = PollState::Done;
153+
wait_handle.terminated = true;
154+
Poll::Ready(())
155+
}
156+
else {
157+
// Register the WaitHandle at the event
158+
wait_handle.task = Some(lw.clone().into_waker());
159+
wait_handle.state = PollState::Waiting;
160+
wait_handle.next = self.waiters;
161+
self.waiters = addr;
162+
Poll::Pending
163+
}
164+
},
165+
PollState::Waiting => {
166+
// The WaitHandle is already registered.
167+
// The event can't have been set, since this would change the
168+
// waitstate inside the mutex.
169+
Poll::Pending
170+
},
171+
PollState::Done => {
172+
// We had been woken up by the event.
173+
// This does not guarantee that the event is still set. It could
174+
// have been reset it in the meantime.
175+
wait_handle.terminated = true;
176+
Poll::Ready(())
177+
},
178+
}
179+
}
180+
181+
fn remove_waiter(&mut self, wait_handle: &mut WaitHandleRegistration) {
182+
let addr = wait_handle as *mut WaitHandleRegistration;
183+
184+
match wait_handle.state {
185+
PollState::Waiting => {
186+
// Remove the WaitHandle from the linked list
187+
if self.waiters == addr {
188+
self.waiters = wait_handle.next;
189+
} else {
190+
// Find the WaitHandle before us and link it to the one
191+
// behind us
192+
let mut iter = self.waiters;
193+
let mut found_addr = false;
194+
195+
unsafe {
196+
while iter != null_mut() {
197+
if (*iter).next == addr {
198+
(*iter).next = wait_handle.next;
199+
found_addr = true;
200+
break;
201+
} else {
202+
iter = (*iter).next;
203+
}
204+
}
205+
}
206+
207+
// Panic if the address isn't found. This can only happen if the contract was
208+
// violated, e.g. the WaitHandle got moved after the initial poll.
209+
assert!(found_addr, "Future could not be unregistered");
210+
}
211+
wait_handle.next = null_mut();
212+
wait_handle.state = PollState::Done;
213+
},
214+
_ => {},
215+
}
216+
}
217+
}
218+
219+
/// Creates a new LocalManualResetEvent in the given state
220+
pub fn local_manual_reset_event(is_set: bool) -> LocalManualResetEvent {
221+
LocalManualResetEvent {
222+
inner: RefCell::new(InnerEventState::new(is_set)),
223+
}
224+
}
225+
226+
impl<'a> LocalManualResetEvent {
227+
/// Sets the event.
228+
///
229+
/// Setting the event will notify all pending waiters.
230+
pub fn set(&self) {
231+
self.inner.borrow_mut().set()
232+
}
233+
234+
/// Resets the event.
235+
pub fn reset(&self) {
236+
self.inner.borrow_mut().reset()
237+
}
238+
239+
/// Returns whether the event is set
240+
pub fn is_set(&self) -> bool {
241+
self.inner.borrow().is_set()
242+
}
243+
244+
/// Returns a future that gets fulfilled when the event is set.
245+
pub fn poll_set(&'a self) -> impl Future<Output = ()> + FusedFuture + 'a {
246+
LocalWaitHandle {
247+
event: &self.inner,
248+
reg: WaitHandleRegistration::new(),
249+
}
250+
}
251+
}
252+
253+
254+
/// Creates a new ManualResetEvent in the given state
255+
pub fn manual_reset_event(is_set: bool) -> ManualResetEvent {
256+
let inner = Arc::new(Mutex::new(InnerEventState::new(is_set)));
257+
ManualResetEvent {
258+
inner,
259+
}
260+
}
261+
262+
impl ManualResetEvent {
263+
/// Sets the event.
264+
///
265+
/// Setting the event will notify all pending waiters.
266+
pub fn set(&self) {
267+
let mut ev_state = self.inner.lock().unwrap();
268+
ev_state.set()
269+
}
270+
271+
/// Resets the event.
272+
pub fn reset(&self) {
273+
let mut ev_state = self.inner.lock().unwrap();
274+
ev_state.reset()
275+
}
276+
277+
/// Returns whether the event is set
278+
pub fn is_set(&self) -> bool {
279+
let ev_state = self.inner.lock().unwrap();
280+
ev_state.is_set()
281+
}
282+
283+
/// Returns a future that gets fulfilled when the event is set.
284+
pub fn poll_set(&self) -> impl Future<Output = ()> + FusedFuture {
285+
WaitHandle {
286+
event: self.inner.clone(),
287+
reg: WaitHandleRegistration::new(),
288+
}
289+
}
290+
}
291+
292+
impl<'a> Future for LocalWaitHandle<'a> {
293+
type Output = ();
294+
295+
fn poll(
296+
self: Pin<&mut Self>,
297+
lw: &LocalWaker,
298+
) -> Poll<()> {
299+
// It might be possible to use Pin::map_unchecked here instead of the two unsafe APIs.
300+
// However this didn't seem to work for some borrow checker reasons
301+
let mut_self: &mut LocalWaitHandle = unsafe { Pin::get_mut_unchecked(self) };
302+
mut_self.event.borrow_mut().poll_waiter(unsafe { Pin::new_unchecked(&mut mut_self.reg) }, lw)
303+
}
304+
}
305+
306+
impl Future for WaitHandle {
307+
type Output = ();
308+
309+
fn poll(
310+
self: Pin<&mut Self>,
311+
lw: &LocalWaker,
312+
) -> Poll<()> {
313+
let mut_self: &mut WaitHandle = unsafe { Pin::get_mut_unchecked(self) };
314+
let mut ev_state = mut_self.event.lock().unwrap();
315+
ev_state.poll_waiter(unsafe { Pin::new_unchecked(&mut mut_self.reg) }, lw)
316+
}
317+
}
318+
319+
impl FusedFuture for WaitHandle {
320+
fn is_terminated(&self) -> bool {
321+
return self.reg.terminated;
322+
}
323+
}
324+
325+
impl<'a> FusedFuture for LocalWaitHandle<'a> {
326+
fn is_terminated(&self) -> bool {
327+
return self.reg.terminated;
328+
}
329+
}
330+
331+
impl Drop for WaitHandle {
332+
fn drop(&mut self) {
333+
// If this WaitHandle has been polled and it was registered at the
334+
// event, it must be unregistered before dropping. Otherwise the
335+
// event would access invalid memory.
336+
let mut ev_state = self.event.lock().unwrap();
337+
ev_state.remove_waiter(&mut self.reg);
338+
}
339+
}
340+
341+
impl<'a> Drop for LocalWaitHandle<'a> {
342+
fn drop(&mut self) {
343+
// If this WaitHandle has been polled and it was registered at the
344+
// event, it must be unregistered before dropping. Otherwise the
345+
// event would access invalid memory.
346+
self.event.borrow_mut().remove_waiter(&mut self.reg);
347+
}
348+
}

0 commit comments

Comments
 (0)