Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implements the waker changes for task API stabilization. #1445

Merged
merged 8 commits into from
Feb 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@ target/
Cargo.lock
_site
.sass-cache
.idea
.DS_Store
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ matrix:

# When updating this, the reminder to update the minimum required version in README.md.
- name: cargo test (minimum required version)
rust: nightly-2019-01-11
rust: nightly-2019-02-15

- name: cargo clippy
rust: nightly
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ Now, you can use futures-rs:
use futures::future::Future; // Note: It's not `futures_preview`
```

The current version of futures-rs requires Rust nightly 2019-01-11 or later.
The current version of futures-rs requires Rust nightly 2019-02-15 or later.

### Feature `std`

Expand Down
42 changes: 21 additions & 21 deletions futures-channel/benches/sync_mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,16 @@ use {
ready,
stream::{Stream, StreamExt},
sink::Sink,
task::{LocalWaker, Poll},
task::{Waker, Poll},
},
futures_test::task::noop_local_waker_ref,
futures_test::task::noop_waker_ref,
std::pin::Pin,
};

/// Single producer, single consumer
#[bench]
fn unbounded_1_tx(b: &mut Bencher) {
let lw = noop_local_waker_ref();
let waker = noop_waker_ref();
b.iter(|| {
let (tx, mut rx) = mpsc::unbounded();

Expand All @@ -27,20 +27,20 @@ fn unbounded_1_tx(b: &mut Bencher) {
for i in 0..1000 {

// Poll, not ready, park
assert_eq!(Poll::Pending, rx.poll_next_unpin(lw));
assert_eq!(Poll::Pending, rx.poll_next_unpin(waker));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: here and elsewhere, I think it'd be fine to abbreviate waker as e.g. wk since it's used so often. I can open this as a separate followup change, though, since it seems potentially controversial.


UnboundedSender::unbounded_send(&tx, i).unwrap();

// Now poll ready
assert_eq!(Poll::Ready(Some(i)), rx.poll_next_unpin(lw));
assert_eq!(Poll::Ready(Some(i)), rx.poll_next_unpin(waker));
}
})
}

/// 100 producers, single consumer
#[bench]
fn unbounded_100_tx(b: &mut Bencher) {
let lw = noop_local_waker_ref();
let waker = noop_waker_ref();
b.iter(|| {
let (tx, mut rx) = mpsc::unbounded();

Expand All @@ -49,26 +49,26 @@ fn unbounded_100_tx(b: &mut Bencher) {
// 1000 send/recv operations total, result should be divided by 1000
for _ in 0..10 {
for i in 0..tx.len() {
assert_eq!(Poll::Pending, rx.poll_next_unpin(lw));
assert_eq!(Poll::Pending, rx.poll_next_unpin(waker));

UnboundedSender::unbounded_send(&tx[i], i).unwrap();

assert_eq!(Poll::Ready(Some(i)), rx.poll_next_unpin(lw));
assert_eq!(Poll::Ready(Some(i)), rx.poll_next_unpin(waker));
}
}
})
}

#[bench]
fn unbounded_uncontended(b: &mut Bencher) {
let lw = noop_local_waker_ref();
let waker = noop_waker_ref();
b.iter(|| {
let (tx, mut rx) = mpsc::unbounded();

for i in 0..1000 {
UnboundedSender::unbounded_send(&tx, i).expect("send");
// No need to create a task, because poll is not going to park.
assert_eq!(Poll::Ready(Some(i)), rx.poll_next_unpin(lw));
assert_eq!(Poll::Ready(Some(i)), rx.poll_next_unpin(waker));
}
})
}
Expand All @@ -84,41 +84,41 @@ struct TestSender {
impl Stream for TestSender {
type Item = u32;

fn poll_next(mut self: Pin<&mut Self>, lw: &LocalWaker)
fn poll_next(mut self: Pin<&mut Self>, waker: &Waker)
-> Poll<Option<Self::Item>>
{
let this = &mut *self;
let mut tx = Pin::new(&mut this.tx);

ready!(tx.as_mut().poll_ready(lw)).unwrap();
ready!(tx.as_mut().poll_ready(waker)).unwrap();
tx.as_mut().start_send(this.last + 1).unwrap();
this.last += 1;
assert_eq!(Poll::Ready(Ok(())), tx.as_mut().poll_flush(lw));
assert_eq!(Poll::Ready(Ok(())), tx.as_mut().poll_flush(waker));
Poll::Ready(Some(this.last))
}
}

/// Single producers, single consumer
#[bench]
fn bounded_1_tx(b: &mut Bencher) {
let lw = noop_local_waker_ref();
let waker = noop_waker_ref();
b.iter(|| {
let (tx, mut rx) = mpsc::channel(0);

let mut tx = TestSender { tx, last: 0 };

for i in 0..1000 {
assert_eq!(Poll::Ready(Some(i + 1)), tx.poll_next_unpin(lw));
assert_eq!(Poll::Pending, tx.poll_next_unpin(lw));
assert_eq!(Poll::Ready(Some(i + 1)), rx.poll_next_unpin(lw));
assert_eq!(Poll::Ready(Some(i + 1)), tx.poll_next_unpin(waker));
assert_eq!(Poll::Pending, tx.poll_next_unpin(waker));
assert_eq!(Poll::Ready(Some(i + 1)), rx.poll_next_unpin(waker));
}
})
}

/// 100 producers, single consumer
#[bench]
fn bounded_100_tx(b: &mut Bencher) {
let lw = noop_local_waker_ref();
let waker = noop_waker_ref();
b.iter(|| {
// Each sender can send one item after specified capacity
let (tx, mut rx) = mpsc::channel(0);
Expand All @@ -133,11 +133,11 @@ fn bounded_100_tx(b: &mut Bencher) {
for i in 0..10 {
for j in 0..tx.len() {
// Send an item
assert_eq!(Poll::Ready(Some(i + 1)), tx[j].poll_next_unpin(lw));
assert_eq!(Poll::Ready(Some(i + 1)), tx[j].poll_next_unpin(waker));
// Then block
assert_eq!(Poll::Pending, tx[j].poll_next_unpin(lw));
assert_eq!(Poll::Pending, tx[j].poll_next_unpin(waker));
// Recv the item
assert_eq!(Poll::Ready(Some(i + 1)), rx.poll_next_unpin(lw));
assert_eq!(Poll::Ready(Some(i + 1)), rx.poll_next_unpin(waker));
}
}
})
Expand Down
24 changes: 12 additions & 12 deletions futures-channel/src/mpsc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
// by the queue structure.

use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{LocalWaker, Waker, Poll};
use futures_core::task::{Waker, Poll};
use futures_core::task::__internal::AtomicWaker;
use std::any::Any;
use std::error::Error;
Expand Down Expand Up @@ -555,7 +555,7 @@ impl<T> SenderInner<T> {
/// - `Err(SendError)` if the receiver has been dropped.
fn poll_ready(
&mut self,
lw: &LocalWaker
waker: &Waker
) -> Poll<Result<(), SendError>> {
let state = decode_state(self.inner.state.load(SeqCst));
if !state.is_open {
Expand All @@ -564,7 +564,7 @@ impl<T> SenderInner<T> {
}));
}

self.poll_unparked(Some(lw)).map(Ok)
self.poll_unparked(Some(waker)).map(Ok)
}

/// Returns whether this channel is closed without needing a context.
Expand All @@ -582,7 +582,7 @@ impl<T> SenderInner<T> {
self.inner.recv_task.wake();
}

fn poll_unparked(&mut self, lw: Option<&LocalWaker>) -> Poll<()> {
fn poll_unparked(&mut self, waker: Option<&Waker>) -> Poll<()> {
// First check the `maybe_parked` variable. This avoids acquiring the
// lock in most cases
if self.maybe_parked {
Expand All @@ -600,7 +600,7 @@ impl<T> SenderInner<T> {
//
// Update the task in case the `Sender` has been moved to another
// task
task.task = lw.map(|lw| lw.clone().into_waker());
task.task = waker.map(|waker| waker.clone());

Poll::Pending
} else {
Expand Down Expand Up @@ -649,12 +649,12 @@ impl<T> Sender<T> {
/// - `Err(SendError)` if the receiver has been dropped.
pub fn poll_ready(
&mut self,
lw: &LocalWaker
waker: &Waker,
) -> Poll<Result<(), SendError>> {
let inner = self.0.as_mut().ok_or(SendError {
kind: SendErrorKind::Disconnected,
})?;
inner.poll_ready(lw)
inner.poll_ready(waker)
}

/// Returns whether this channel is closed without needing a context.
Expand All @@ -679,7 +679,7 @@ impl<T> UnboundedSender<T> {
/// Check if the channel is ready to receive a message.
pub fn poll_ready(
&self,
_: &LocalWaker,
_: &Waker,
) -> Poll<Result<(), SendError>> {
let inner = self.0.as_ref().ok_or(SendError {
kind: SendErrorKind::Disconnected,
Expand Down Expand Up @@ -904,7 +904,7 @@ impl<T> Stream for Receiver<T> {

fn poll_next(
mut self: Pin<&mut Self>,
lw: &LocalWaker,
waker: &Waker,
) -> Poll<Option<T>> {
// Try to read a message off of the message queue.
match self.next_message() {
Expand All @@ -916,7 +916,7 @@ impl<T> Stream for Receiver<T> {
},
Poll::Pending => {
// There are no messages to read, in this case, park.
self.inner.as_ref().unwrap().recv_task.register(lw);
self.inner.as_ref().unwrap().recv_task.register(waker);
// Check queue again after parking to prevent race condition:
// a message could be added to the queue after previous `next_message`
// before `register` call.
Expand Down Expand Up @@ -971,9 +971,9 @@ impl<T> Stream for UnboundedReceiver<T> {

fn poll_next(
mut self: Pin<&mut Self>,
lw: &LocalWaker,
waker: &Waker,
) -> Poll<Option<T>> {
Pin::new(&mut self.0).poll_next(lw)
Pin::new(&mut self.0).poll_next(waker)
}
}

Expand Down
18 changes: 9 additions & 9 deletions futures-channel/src/oneshot.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! A channel for sending a single message between asynchronous tasks.

use futures_core::future::Future;
use futures_core::task::{LocalWaker, Poll, Waker};
use futures_core::task::{Waker, Poll};
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
Expand Down Expand Up @@ -154,7 +154,7 @@ impl<T> Inner<T> {
}
}

fn poll_cancel(&self, lw: &LocalWaker) -> Poll<()> {
fn poll_cancel(&self, waker: &Waker) -> Poll<()> {
// Fast path up first, just read the flag and see if our other half is
// gone. This flag is set both in our destructor and the oneshot
// destructor, but our destructor hasn't run yet so if it's set then the
Expand All @@ -176,7 +176,7 @@ impl<T> Inner<T> {
// `Receiver` may have been dropped. The first thing it does is set the
// flag, and if it fails to acquire the lock it assumes that we'll see
// the flag later on. So... we then try to see the flag later on!
let handle = lw.clone().into_waker();
let handle = waker.clone();
match self.tx_task.try_lock() {
Some(mut p) => *p = Some(handle),
None => return Poll::Ready(()),
Expand Down Expand Up @@ -249,7 +249,7 @@ impl<T> Inner<T> {
}
}

fn recv(&self, lw: &LocalWaker) -> Poll<Result<T, Canceled>> {
fn recv(&self, waker: &Waker) -> Poll<Result<T, Canceled>> {
// Check to see if some data has arrived. If it hasn't then we need to
// block our task.
//
Expand All @@ -260,7 +260,7 @@ impl<T> Inner<T> {
let done = if self.complete.load(SeqCst) {
true
} else {
let task = lw.clone().into_waker();
let task = waker.clone();
match self.rx_task.try_lock() {
Some(mut slot) => { *slot = Some(task); false },
None => true,
Expand Down Expand Up @@ -348,8 +348,8 @@ impl<T> Sender<T> {
/// alive and may be able to receive a message if sent. The current task,
/// however, is scheduled to receive a notification if the corresponding
/// `Receiver` goes away.
pub fn poll_cancel(&mut self, lw: &LocalWaker) -> Poll<()> {
self.inner.poll_cancel(lw)
pub fn poll_cancel(&mut self, waker: &Waker) -> Poll<()> {
self.inner.poll_cancel(waker)
}

/// Tests to see whether this `Sender`'s corresponding `Receiver`
Expand Down Expand Up @@ -416,9 +416,9 @@ impl<T> Future for Receiver<T> {

fn poll(
self: Pin<&mut Self>,
lw: &LocalWaker,
waker: &Waker,
) -> Poll<Result<T, Canceled>> {
self.inner.recv(lw)
self.inner.recv(waker)
}
}

Expand Down
4 changes: 2 additions & 2 deletions futures-channel/tests/mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use futures::future::{FutureExt, poll_fn};
use futures::stream::{Stream, StreamExt};
use futures::sink::{Sink, SinkExt};
use futures::task::Poll;
use futures_test::task::noop_local_waker_ref;
use futures_test::task::noop_waker_ref;
use pin_utils::pin_mut;
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicUsize, Ordering};
Expand Down Expand Up @@ -304,7 +304,7 @@ fn stress_receiver_multi_task_bounded_hard() {
} else {
// Just poll
let n = n.clone();
match rx.poll_next_unpin(noop_local_waker_ref()) {
match rx.poll_next_unpin(noop_waker_ref()) {
Poll::Ready(Some(_)) => {
n.fetch_add(1, Ordering::Relaxed);
}
Expand Down
Loading