Skip to content

Commit 2287a7a

Browse files
committedSep 23, 2018
Auto merge of #54339 - cramertj:no-cx, r=aturon
Remove spawning from task::Context r? @aturon cc rustasync/team#56
2 parents 317ae05 + 1b00f0b commit 2287a7a

File tree

13 files changed

+93
-568
lines changed

13 files changed

+93
-568
lines changed
 

‎src/liballoc/boxed.rs

+4-67
Original file line numberDiff line numberDiff line change
@@ -60,15 +60,15 @@ use core::borrow;
6060
use core::cmp::Ordering;
6161
use core::convert::From;
6262
use core::fmt;
63-
use core::future::{Future, FutureObj, LocalFutureObj, UnsafeFutureObj};
63+
use core::future::Future;
6464
use core::hash::{Hash, Hasher};
6565
use core::iter::FusedIterator;
6666
use core::marker::{Unpin, Unsize};
6767
use core::mem;
6868
use core::pin::Pin;
6969
use core::ops::{CoerceUnsized, Deref, DerefMut, Generator, GeneratorState};
7070
use core::ptr::{self, NonNull, Unique};
71-
use core::task::{Context, Poll, Spawn, SpawnErrorKind, SpawnObjError};
71+
use core::task::{LocalWaker, Poll};
7272

7373
use raw_vec::RawVec;
7474
use str::from_boxed_utf8_unchecked;
@@ -804,70 +804,7 @@ impl<T> Generator for Box<T>
804804
impl<F: ?Sized + Future + Unpin> Future for Box<F> {
805805
type Output = F::Output;
806806

807-
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
808-
F::poll(Pin::new(&mut *self), cx)
809-
}
810-
}
811-
812-
#[unstable(feature = "futures_api", issue = "50547")]
813-
unsafe impl<'a, T, F> UnsafeFutureObj<'a, T> for Box<F>
814-
where F: Future<Output = T> + 'a
815-
{
816-
fn into_raw(self) -> *mut () {
817-
Box::into_raw(self) as *mut ()
818-
}
819-
820-
unsafe fn poll(ptr: *mut (), cx: &mut Context) -> Poll<T> {
821-
let ptr = ptr as *mut F;
822-
let pin: Pin<&mut F> = Pin::new_unchecked(&mut *ptr);
823-
F::poll(pin, cx)
824-
}
825-
826-
unsafe fn drop(ptr: *mut ()) {
827-
drop(Box::from_raw(ptr as *mut F))
828-
}
829-
}
830-
831-
#[unstable(feature = "futures_api", issue = "50547")]
832-
impl<Sp> Spawn for Box<Sp>
833-
where Sp: Spawn + ?Sized
834-
{
835-
fn spawn_obj(
836-
&mut self,
837-
future: FutureObj<'static, ()>,
838-
) -> Result<(), SpawnObjError> {
839-
(**self).spawn_obj(future)
840-
}
841-
842-
fn status(&self) -> Result<(), SpawnErrorKind> {
843-
(**self).status()
844-
}
845-
}
846-
847-
#[unstable(feature = "futures_api", issue = "50547")]
848-
impl<'a, F: Future<Output = ()> + Send + 'a> From<Box<F>> for FutureObj<'a, ()> {
849-
fn from(boxed: Box<F>) -> Self {
850-
FutureObj::new(boxed)
851-
}
852-
}
853-
854-
#[unstable(feature = "futures_api", issue = "50547")]
855-
impl<'a, F: Future<Output = ()> + 'a> From<Box<F>> for LocalFutureObj<'a, ()> {
856-
fn from(boxed: Box<F>) -> Self {
857-
LocalFutureObj::new(boxed)
858-
}
859-
}
860-
861-
#[unstable(feature = "futures_api", issue = "50547")]
862-
impl<'a, F: Future<Output = ()> + Send + 'a> From<Pin<Box<F>>> for FutureObj<'a, ()> {
863-
fn from(boxed: Pin<Box<F>>) -> Self {
864-
FutureObj::new(boxed)
865-
}
866-
}
867-
868-
#[unstable(feature = "futures_api", issue = "50547")]
869-
impl<'a, F: Future<Output = ()> + 'a> From<Pin<Box<F>>> for LocalFutureObj<'a, ()> {
870-
fn from(boxed: Pin<Box<F>>) -> Self {
871-
LocalFutureObj::new(boxed)
807+
fn poll(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
808+
F::poll(Pin::new(&mut *self), lw)
872809
}
873810
}

‎src/libcore/future/future.rs

+34-21
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
use marker::Unpin;
1616
use ops;
1717
use pin::Pin;
18-
use task::{self, Poll};
18+
use task::{Poll, LocalWaker};
1919

2020
/// A future represents an asychronous computation.
2121
///
@@ -50,28 +50,28 @@ pub trait Future {
5050
///
5151
/// Once a future has finished, clients should not `poll` it again.
5252
///
53-
/// When a future is not ready yet, `poll` returns
54-
/// `Poll::Pending`. The future will *also* register the
55-
/// interest of the current task in the value being produced. For example,
56-
/// if the future represents the availability of data on a socket, then the
57-
/// task is recorded so that when data arrives, it is woken up (via
58-
/// [`cx.waker()`]). Once a task has been woken up,
59-
/// it should attempt to `poll` the future again, which may or may not
60-
/// produce a final value.
53+
/// When a future is not ready yet, `poll` returns `Poll::Pending` and
54+
/// stores a clone of the [`LocalWaker`] to be woken once the future can
55+
/// make progress. For example, a future waiting for a socket to become
56+
/// readable would call `.clone()` on the [`LocalWaker`] and store it.
57+
/// When a signal arrives elsewhere indicating that the socket is readable,
58+
/// `[LocalWaker::wake]` is called and the socket future's task is awoken.
59+
/// Once a task has been woken up, it should attempt to `poll` the future
60+
/// again, which may or may not produce a final value.
6161
///
62-
/// Note that if `Pending` is returned it only means that the *current* task
63-
/// (represented by the argument `cx`) will receive a notification. Tasks
64-
/// from previous calls to `poll` will *not* receive notifications.
62+
/// Note that on multiple calls to `poll`, only the most recent
63+
/// [`LocalWaker`] passed to `poll` should be scheduled to receive a
64+
/// wakeup.
6565
///
6666
/// # Runtime characteristics
6767
///
6868
/// Futures alone are *inert*; they must be *actively* `poll`ed to make
6969
/// progress, meaning that each time the current task is woken up, it should
7070
/// actively re-`poll` pending futures that it still has an interest in.
7171
///
72-
/// The `poll` function is not called repeatedly in a tight loop for
73-
/// futures, but only whenever the future itself is ready, as signaled via
74-
/// the `Waker` inside `task::Context`. If you're familiar with the
72+
/// The `poll` function is not called repeatedly in a tight loop-- instead,
73+
/// it should only be called when the future indicates that it is ready to
74+
/// make progress (by calling `wake()`). If you're familiar with the
7575
/// `poll(2)` or `select(2)` syscalls on Unix it's worth noting that futures
7676
/// typically do *not* suffer the same problems of "all wakeups must poll
7777
/// all events"; they are more like `epoll(4)`.
@@ -83,6 +83,16 @@ pub trait Future {
8383
/// thread pool (or something similar) to ensure that `poll` can return
8484
/// quickly.
8585
///
86+
/// # [`LocalWaker`], [`Waker`] and thread-safety
87+
///
88+
/// The `poll` function takes a [`LocalWaker`], an object which knows how to
89+
/// awaken the current task. [`LocalWaker`] is not `Send` nor `Sync`, so in
90+
/// order to make thread-safe futures the [`LocalWaker::into_waker`] method
91+
/// should be used to convert the [`LocalWaker`] into a thread-safe version.
92+
/// [`LocalWaker::wake`] implementations have the ability to be more
93+
/// efficient, however, so when thread safety is not necessary,
94+
/// [`LocalWaker`] should be preferred.
95+
///
8696
/// # Panics
8797
///
8898
/// Once a future has completed (returned `Ready` from `poll`),
@@ -92,15 +102,18 @@ pub trait Future {
92102
///
93103
/// [`Poll::Pending`]: ../task/enum.Poll.html#variant.Pending
94104
/// [`Poll::Ready(val)`]: ../task/enum.Poll.html#variant.Ready
95-
/// [`cx.waker()`]: ../task/struct.Context.html#method.waker
96-
fn poll(self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<Self::Output>;
105+
/// [`LocalWaker`]: ../task/struct.LocalWaker.html
106+
/// [`LocalWaker::into_waker`]: ../task/struct.LocalWaker.html#method.into_waker
107+
/// [`LocalWaker::wake`]: ../task/struct.LocalWaker.html#method.wake
108+
/// [`Waker`]: ../task/struct.Waker.html
109+
fn poll(self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output>;
97110
}
98111

99112
impl<'a, F: ?Sized + Future + Unpin> Future for &'a mut F {
100113
type Output = F::Output;
101114

102-
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<Self::Output> {
103-
F::poll(Pin::new(&mut **self), cx)
115+
fn poll(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
116+
F::poll(Pin::new(&mut **self), lw)
104117
}
105118
}
106119

@@ -111,7 +124,7 @@ where
111124
{
112125
type Output = <<P as ops::Deref>::Target as Future>::Output;
113126

114-
fn poll(self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<Self::Output> {
115-
Pin::get_mut(self).as_mut().poll(cx)
127+
fn poll(self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
128+
Pin::get_mut(self).as_mut().poll(lw)
116129
}
117130
}

‎src/libcore/future/future_obj.rs

-203
This file was deleted.

‎src/libcore/future/mod.rs

-3
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,3 @@
1616
1717
mod future;
1818
pub use self::future::Future;
19-
20-
mod future_obj;
21-
pub use self::future_obj::{FutureObj, LocalFutureObj, UnsafeFutureObj};

‎src/libcore/task/context.rs

-98
This file was deleted.

‎src/libcore/task/mod.rs

-6
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,6 @@
1414

1515
//! Types and Traits for working with asynchronous tasks.
1616
17-
mod context;
18-
pub use self::context::Context;
19-
20-
mod spawn;
21-
pub use self::spawn::{Spawn, SpawnErrorKind, SpawnObjError, SpawnLocalObjError};
22-
2317
mod poll;
2418
pub use self::poll::Poll;
2519

‎src/libcore/task/spawn.rs

-93
This file was deleted.

‎src/libcore/task/wake.rs

+9
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,15 @@ impl LocalWaker {
123123
LocalWaker { inner }
124124
}
125125

126+
/// Converts this `LocalWaker` into a `Waker`.
127+
///
128+
/// `Waker` is nearly identical to `LocalWaker`, but is threadsafe
129+
/// (implements `Send` and `Sync`).
130+
#[inline]
131+
pub fn into_waker(self) -> Waker {
132+
self.into()
133+
}
134+
126135
/// Wake up the task associated with this `LocalWaker`.
127136
#[inline]
128137
pub fn wake(&self) {

‎src/libstd/future.rs

+27-32
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use core::marker::Unpin;
1515
use core::pin::Pin;
1616
use core::option::Option;
1717
use core::ptr::NonNull;
18-
use core::task::{self, Poll};
18+
use core::task::{LocalWaker, Poll};
1919
use core::ops::{Drop, Generator, GeneratorState};
2020

2121
#[doc(inline)]
@@ -42,75 +42,70 @@ impl<T: Generator<Yield = ()>> !Unpin for GenFuture<T> {}
4242
#[unstable(feature = "gen_future", issue = "50547")]
4343
impl<T: Generator<Yield = ()>> Future for GenFuture<T> {
4444
type Output = T::Return;
45-
fn poll(self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<Self::Output> {
46-
set_task_cx(cx, || match unsafe { Pin::get_mut_unchecked(self).0.resume() } {
45+
fn poll(self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
46+
set_task_waker(lw, || match unsafe { Pin::get_mut_unchecked(self).0.resume() } {
4747
GeneratorState::Yielded(()) => Poll::Pending,
4848
GeneratorState::Complete(x) => Poll::Ready(x),
4949
})
5050
}
5151
}
5252

5353
thread_local! {
54-
static TLS_CX: Cell<Option<NonNull<task::Context<'static>>>> = Cell::new(None);
54+
static TLS_WAKER: Cell<Option<NonNull<LocalWaker>>> = Cell::new(None);
5555
}
5656

57-
struct SetOnDrop(Option<NonNull<task::Context<'static>>>);
57+
struct SetOnDrop(Option<NonNull<LocalWaker>>);
5858

5959
impl Drop for SetOnDrop {
6060
fn drop(&mut self) {
61-
TLS_CX.with(|tls_cx| {
62-
tls_cx.set(self.0.take());
61+
TLS_WAKER.with(|tls_waker| {
62+
tls_waker.set(self.0.take());
6363
});
6464
}
6565
}
6666

6767
#[unstable(feature = "gen_future", issue = "50547")]
6868
/// Sets the thread-local task context used by async/await futures.
69-
pub fn set_task_cx<F, R>(cx: &mut task::Context, f: F) -> R
69+
pub fn set_task_waker<F, R>(lw: &LocalWaker, f: F) -> R
7070
where
7171
F: FnOnce() -> R
7272
{
73-
let old_cx = TLS_CX.with(|tls_cx| {
74-
tls_cx.replace(NonNull::new(
75-
cx
76-
as *mut task::Context
77-
as *mut ()
78-
as *mut task::Context<'static>
79-
))
73+
let old_waker = TLS_WAKER.with(|tls_waker| {
74+
tls_waker.replace(Some(NonNull::from(lw)))
8075
});
81-
let _reset_cx = SetOnDrop(old_cx);
76+
let _reset_waker = SetOnDrop(old_waker);
8277
f()
8378
}
8479

8580
#[unstable(feature = "gen_future", issue = "50547")]
86-
/// Retrieves the thread-local task context used by async/await futures.
81+
/// Retrieves the thread-local task waker used by async/await futures.
8782
///
88-
/// This function acquires exclusive access to the task context.
83+
/// This function acquires exclusive access to the task waker.
8984
///
90-
/// Panics if no task has been set or if the task context has already been
91-
/// retrieved by a surrounding call to get_task_cx.
92-
pub fn get_task_cx<F, R>(f: F) -> R
85+
/// Panics if no waker has been set or if the waker has already been
86+
/// retrieved by a surrounding call to get_task_waker.
87+
pub fn get_task_waker<F, R>(f: F) -> R
9388
where
94-
F: FnOnce(&mut task::Context) -> R
89+
F: FnOnce(&LocalWaker) -> R
9590
{
96-
let cx_ptr = TLS_CX.with(|tls_cx| {
97-
// Clear the entry so that nested `with_get_cx` calls
91+
let waker_ptr = TLS_WAKER.with(|tls_waker| {
92+
// Clear the entry so that nested `get_task_waker` calls
9893
// will fail or set their own value.
99-
tls_cx.replace(None)
94+
tls_waker.replace(None)
10095
});
101-
let _reset_cx = SetOnDrop(cx_ptr);
96+
let _reset_waker = SetOnDrop(waker_ptr);
10297

103-
let mut cx_ptr = cx_ptr.expect(
104-
"TLS task::Context not set. This is a rustc bug. \
98+
let mut waker_ptr = waker_ptr.expect(
99+
"TLS LocalWaker not set. This is a rustc bug. \
105100
Please file an issue on https://github.com/rust-lang/rust.");
106-
unsafe { f(cx_ptr.as_mut()) }
101+
unsafe { f(waker_ptr.as_mut()) }
107102
}
108103

109104
#[unstable(feature = "gen_future", issue = "50547")]
110-
/// Polls a future in the current thread-local task context.
111-
pub fn poll_in_task_cx<F>(f: Pin<&mut F>) -> Poll<F::Output>
105+
/// Polls a future in the current thread-local task waker.
106+
pub fn poll_with_tls_waker<F>(f: Pin<&mut F>) -> Poll<F::Output>
112107
where
113108
F: Future
114109
{
115-
get_task_cx(|cx| F::poll(f, cx))
110+
get_task_waker(|lw| F::poll(f, lw))
116111
}

‎src/libstd/macros.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ macro_rules! await {
229229
let mut pinned = $e;
230230
loop {
231231
if let $crate::task::Poll::Ready(x) =
232-
$crate::future::poll_in_task_cx(unsafe {
232+
$crate::future::poll_with_tls_waker(unsafe {
233233
$crate::pin::Pin::new_unchecked(&mut pinned)
234234
})
235235
{

‎src/libstd/panic.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use panicking;
2222
use ptr::{Unique, NonNull};
2323
use rc::Rc;
2424
use sync::{Arc, Mutex, RwLock, atomic};
25-
use task::{self, Poll};
25+
use task::{LocalWaker, Poll};
2626
use thread::Result;
2727

2828
#[stable(feature = "panic_hooks", since = "1.10.0")]
@@ -327,9 +327,9 @@ impl<T: fmt::Debug> fmt::Debug for AssertUnwindSafe<T> {
327327
impl<'a, F: Future> Future for AssertUnwindSafe<F> {
328328
type Output = F::Output;
329329

330-
fn poll(self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<Self::Output> {
330+
fn poll(self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
331331
let pinned_field = unsafe { Pin::map_unchecked_mut(self, |x| &mut x.0) };
332-
F::poll(pinned_field, cx)
332+
F::poll(pinned_field, lw)
333333
}
334334
}
335335

‎src/test/run-pass/async-await.rs

+5-17
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,8 @@ use std::sync::{
1818
Arc,
1919
atomic::{self, AtomicUsize},
2020
};
21-
use std::future::FutureObj;
2221
use std::task::{
23-
Context, Poll, Wake,
24-
Spawn, SpawnObjError,
22+
LocalWaker, Poll, Wake,
2523
local_waker_from_nonlocal,
2624
};
2725

@@ -35,24 +33,17 @@ impl Wake for Counter {
3533
}
3634
}
3735

38-
struct NoopSpawner;
39-
impl Spawn for NoopSpawner {
40-
fn spawn_obj(&mut self, _: FutureObj<'static, ()>) -> Result<(), SpawnObjError> {
41-
Ok(())
42-
}
43-
}
44-
4536
struct WakeOnceThenComplete(bool);
4637

4738
fn wake_and_yield_once() -> WakeOnceThenComplete { WakeOnceThenComplete(false) }
4839

4940
impl Future for WakeOnceThenComplete {
5041
type Output = ();
51-
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
42+
fn poll(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<()> {
5243
if self.0 {
5344
Poll::Ready(())
5445
} else {
55-
cx.waker().wake();
46+
lw.wake();
5647
self.0 = true;
5748
Poll::Pending
5849
}
@@ -150,13 +141,10 @@ where
150141
let mut fut = Box::pinned(f(9));
151142
let counter = Arc::new(Counter { wakes: AtomicUsize::new(0) });
152143
let waker = local_waker_from_nonlocal(counter.clone());
153-
let spawner = &mut NoopSpawner;
154-
let cx = &mut Context::new(&waker, spawner);
155-
156144
assert_eq!(0, counter.wakes.load(atomic::Ordering::SeqCst));
157-
assert_eq!(Poll::Pending, fut.as_mut().poll(cx));
145+
assert_eq!(Poll::Pending, fut.as_mut().poll(&waker));
158146
assert_eq!(1, counter.wakes.load(atomic::Ordering::SeqCst));
159-
assert_eq!(Poll::Ready(9), fut.as_mut().poll(cx));
147+
assert_eq!(Poll::Ready(9), fut.as_mut().poll(&waker));
160148
}
161149

162150
fn main() {

‎src/test/run-pass/futures-api.rs

+10-24
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,8 @@ use std::sync::{
1818
Arc,
1919
atomic::{self, AtomicUsize},
2020
};
21-
use std::future::FutureObj;
2221
use std::task::{
23-
Context, Poll,
24-
Wake, Waker, LocalWaker,
25-
Spawn, SpawnObjError,
22+
Poll, Wake, Waker, LocalWaker,
2623
local_waker, local_waker_from_nonlocal,
2724
};
2825

@@ -41,24 +38,17 @@ impl Wake for Counter {
4138
}
4239
}
4340

44-
struct NoopSpawner;
45-
46-
impl Spawn for NoopSpawner {
47-
fn spawn_obj(&mut self, _: FutureObj<'static, ()>) -> Result<(), SpawnObjError> {
48-
Ok(())
49-
}
50-
}
51-
5241
struct MyFuture;
5342

5443
impl Future for MyFuture {
5544
type Output = ();
56-
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
57-
// Ensure all the methods work appropriately
58-
cx.waker().wake();
59-
cx.waker().wake();
60-
cx.local_waker().wake();
61-
cx.spawner().spawn_obj(Box::pinned(MyFuture).into()).unwrap();
45+
fn poll(self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
46+
// Wake once locally
47+
lw.wake();
48+
// Wake twice non-locally
49+
let waker = lw.clone().into_waker();
50+
waker.wake();
51+
waker.wake();
6252
Poll::Ready(())
6353
}
6454
}
@@ -69,9 +59,7 @@ fn test_local_waker() {
6959
nonlocal_wakes: AtomicUsize::new(0),
7060
});
7161
let waker = unsafe { local_waker(counter.clone()) };
72-
let spawner = &mut NoopSpawner;
73-
let cx = &mut Context::new(&waker, spawner);
74-
assert_eq!(Poll::Ready(()), Pin::new(&mut MyFuture).poll(cx));
62+
assert_eq!(Poll::Ready(()), Pin::new(&mut MyFuture).poll(&waker));
7563
assert_eq!(1, counter.local_wakes.load(atomic::Ordering::SeqCst));
7664
assert_eq!(2, counter.nonlocal_wakes.load(atomic::Ordering::SeqCst));
7765
}
@@ -82,9 +70,7 @@ fn test_local_as_nonlocal_waker() {
8270
nonlocal_wakes: AtomicUsize::new(0),
8371
});
8472
let waker: LocalWaker = local_waker_from_nonlocal(counter.clone());
85-
let spawner = &mut NoopSpawner;
86-
let cx = &mut Context::new(&waker, spawner);
87-
assert_eq!(Poll::Ready(()), Pin::new(&mut MyFuture).poll(cx));
73+
assert_eq!(Poll::Ready(()), Pin::new(&mut MyFuture).poll(&waker));
8874
assert_eq!(0, counter.local_wakes.load(atomic::Ordering::SeqCst));
8975
assert_eq!(3, counter.nonlocal_wakes.load(atomic::Ordering::SeqCst));
9076
}

0 commit comments

Comments
 (0)
Please sign in to comment.