From 29501d70b47ba53c912b8bebafa108668b5d331b Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Sat, 18 May 2019 17:19:29 +0200 Subject: [PATCH 01/34] init time Signed-off-by: Yoshua Wuyts --- Cargo.toml | 4 ++++ runtime-native/Cargo.toml | 1 + runtime-raw/src/lib.rs | 27 ++++++++++++++++++--- runtime-raw/src/time.rs | 11 +++++++++ src/time.rs | 27 +++++++++++++++++++++ src/time/delay.rs | 37 +++++++++++++++++++++++++++++ src/time/ext.rs | 50 +++++++++++++++++++++++++++++++++++++++ src/time/interval.rs | 30 +++++++++++++++++++++++ 8 files changed, 184 insertions(+), 3 deletions(-) create mode 100644 runtime-raw/src/time.rs create mode 100644 src/time.rs create mode 100644 src/time/delay.rs create mode 100644 src/time/ext.rs create mode 100644 src/time/interval.rs diff --git a/Cargo.toml b/Cargo.toml index d6f7ad5e..74e1920f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,7 @@ juliex = "0.3.0-alpha.6" mio = "0.6.16" rand = "0.6.5" runtime-tokio = { path = "runtime-tokio", version = "0.3.0-alpha.3" } +futures-timer = "0.1.1" tokio = "0.1.19" [profile.bench] @@ -44,3 +45,6 @@ members = [ "runtime-raw", "runtime-tokio", ] + +[patch.crates-io] +futures-timer = { path = "../../alexcrichton/futures-timer" } diff --git a/runtime-native/Cargo.toml b/runtime-native/Cargo.toml index 1214c358..4ad6e564 100644 --- a/runtime-native/Cargo.toml +++ b/runtime-native/Cargo.toml @@ -21,6 +21,7 @@ async-datagram = "2.2.0" juliex = "0.3.0-alpha.6" lazy_static = "1.3.0" romio = "0.3.0-alpha.7" +futures-timer = "0.1.1" [target.'cfg(target_arch = "wasm32")'.dependencies] futures01 = { package = "futures", version = "0.1" } diff --git a/runtime-raw/src/lib.rs b/runtime-raw/src/lib.rs index f5eddb85..99e3b747 100644 --- a/runtime-raw/src/lib.rs +++ b/runtime-raw/src/lib.rs @@ -19,19 +19,22 @@ use futures::future::BoxFuture; use futures::prelude::*; use futures::task::SpawnError; +use std::time::Duration; use std::cell::Cell; use std::io; use std::net::SocketAddr; use std::pin::Pin; -mod tcp; mod udp; +mod tcp; +mod time; -pub use tcp::*; pub use udp::*; +pub use tcp::*; +pub use time::*; thread_local! { - static RUNTIME: Cell> = Cell::new(None); + static RUNTIME: Cell> = Cell::new(None); } /// Get the current runtime. @@ -95,4 +98,22 @@ pub trait Runtime: Send + Sync + 'static { /// This method is defined on the `Runtime` trait because defining it on /// `UdpSocket` would prevent it from being a trait object. fn bind_udp_socket(&self, addr: &SocketAddr) -> io::Result>>; + + /// Create a new Future that sleeps for the given duration. + /// + /// This method is defined on the `Runtime` trait because defining it on + /// `Delay` would prevent it from being a trait object. + fn new_delay(&self, dur: Duration) -> Pin>; + + /// Create a new Future that sleeps until the given time. + /// + /// This method is defined on the `Runtime` trait because defining it on + /// `Delay` would prevent it from being a trait object. + fn new_delay_at(&self, dur: Duration) -> Pin>; + + /// A stream representing notifications at a fixed interval. + /// + /// This method is defined on the `Runtime` trait because defining it on + /// `Interval` would prevent it from being a trait object. + fn new_interval(&self, dur: Duration) -> Pin>; } diff --git a/runtime-raw/src/time.rs b/runtime-raw/src/time.rs new file mode 100644 index 00000000..f675138b --- /dev/null +++ b/runtime-raw/src/time.rs @@ -0,0 +1,11 @@ +use std::fmt::Debug; +use std::time::{Duration, Instant}; +use std::future::Future; + +use futures::Stream; + +/// A future representing the notification that an elapsed duration has occurred. +pub trait Delay: Future + Debug + Send {} + +/// A stream representing notifications at a fixed interval. +pub trait Interval: Stream + Debug + Send {} diff --git a/src/time.rs b/src/time.rs new file mode 100644 index 00000000..72b2bc25 --- /dev/null +++ b/src/time.rs @@ -0,0 +1,27 @@ +mod delay; +mod interval; + +pub mod ext; + +pub use delay::Delay; +pub use interval::Interval; + +use std::time::{Duration, Instant}; + +/// Sleep the current future for the given duration. +#[inline] +pub fn wait_for(dur: Duration) -> Delay { + Delay::new(dur) +} + +/// Sleep the current future until the given time. +#[inline] +pub fn wait_until(at: Instant) -> Delay { + Delay::new_at(at) +} + +/// Create a stream that fires events at a set interval. +#[inline] +pub fn repeat(dur: Duration) -> Interval { + Interval::new(dur) +} diff --git a/src/time/delay.rs b/src/time/delay.rs new file mode 100644 index 00000000..3fb6fde1 --- /dev/null +++ b/src/time/delay.rs @@ -0,0 +1,37 @@ +use futures::prelude::*; + +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::time::{Duration, Instant}; + +/// A future representing the notification that an elapsed duration has occurred. +#[must_use = "futures do nothing unless awaited"] +#[derive(Debug)] +pub struct Delay { + inner: Pin>, +} + +impl Delay { + /// Sleep the current future for the given duration. + #[inline] + pub fn new(dur: Duration) -> Self { + let inner = runtime_raw::new_delay(dur); + Self { inner } + } + + /// Sleep the current future until the given time. + #[inline] + pub fn new_at(at: Instant) -> Self { + let inner = runtime_raw::new_delay_at(dur); + Self { inner } + } +} + +impl Future for Delay { + type Output = Instant; + + #[inline] + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.inner.as_mut().poll(cx) + } +} diff --git a/src/time/ext.rs b/src/time/ext.rs new file mode 100644 index 00000000..74e53a84 --- /dev/null +++ b/src/time/ext.rs @@ -0,0 +1,50 @@ +//! Extensions for Futures types. + +use std::error::Error; +use std::fmt::{self, Debug, Display}; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::time::{Duration, Instant}; + +/// The Future returned from [`FutureExt.timeout`]. +#[derive(Debug)] +pub struct Timeout { + _phantom: std::marker::PhantomData, +} + +impl Future for Timeout { + type Output = Result<::Output, TimeoutError>; + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + unimplemented!(); + } +} + +/// The Error returned in the Future returned from [`FutureExt.timeout`]. +pub struct TimeoutError(pub Instant); +impl Error for TimeoutError {} + +impl Debug for TimeoutError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { + unimplemented!(); + } +} + +impl Display for TimeoutError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { + unimplemented!(); + } +} + +/// Extend `Future` with methods to time out execution. +pub trait FutureExt: Future + Sized { + /// Timeout the future if it isn't completed after `dur` duration. + fn timeout(self, dur: Duration) -> Timeout { + unimplemented!(); + } + + /// Timeout the future if it isn't completed by `at`. + fn deadline(self, at: Instant) -> Timeout { + unimplemented!(); + } +} diff --git a/src/time/interval.rs b/src/time/interval.rs new file mode 100644 index 00000000..b68f0add --- /dev/null +++ b/src/time/interval.rs @@ -0,0 +1,30 @@ +use futures::prelude::*; + +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::time::{Duration, Instant}; + +/// A stream representing notifications at a fixed interval. +#[must_use = "streams do nothing unless polled"] +#[derive(Debug)] +pub struct Interval { + inner: Pin>, +} + +impl Interval { + /// Create a stream that fires events at a set interval. + #[inline] + pub fn new(dur: Duration) -> Self { + let inner = runtime_raw::new_interval(dur); + Self { inner } + } +} + +impl Stream for Interval { + type Item = Instant; + + #[inline] + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.inner.as_mut().poll_next(cx) + } +} From ea9c950c90e9300bb653beab5e306723aaebb992 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Sat, 18 May 2019 17:36:30 +0200 Subject: [PATCH 02/34] unimplemented Signed-off-by: Yoshua Wuyts --- runtime-native/src/not_wasm32.rs | 13 +++++++++++++ runtime-native/src/wasm32.rs | 13 +++++++++++++ runtime-raw/src/time.rs | 2 +- runtime-tokio/src/lib.rs | 25 +++++++++++++++++++++++++ 4 files changed, 52 insertions(+), 1 deletion(-) diff --git a/runtime-native/src/not_wasm32.rs b/runtime-native/src/not_wasm32.rs index 482f4219..26d5da78 100644 --- a/runtime-native/src/not_wasm32.rs +++ b/runtime-native/src/not_wasm32.rs @@ -5,6 +5,7 @@ use lazy_static::lazy_static; use std::io; use std::net::SocketAddr; use std::pin::Pin; +use std::time::Duration; mod tcp; mod udp; @@ -58,4 +59,16 @@ impl runtime_raw::Runtime for Native { let romio_socket = romio::UdpSocket::bind(&addr)?; Ok(Box::pin(UdpSocket { romio_socket })) } + + fn new_delay(&self, _dur: Duration) -> Pin> { + unimplemented!(); + } + + fn new_delay_at(&self, _dur: Duration) -> Pin> { + unimplemented!(); + } + + fn new_interval(&self, _dur: Duration) -> Pin> { + unimplemented!(); + } } diff --git a/runtime-native/src/wasm32.rs b/runtime-native/src/wasm32.rs index 907dba95..625d74c3 100644 --- a/runtime-native/src/wasm32.rs +++ b/runtime-native/src/wasm32.rs @@ -5,6 +5,7 @@ use futures::{future::BoxFuture, task::SpawnError}; use std::io; use std::net::SocketAddr; use std::pin::Pin; +use std::time::Duration; use wasm_bindgen::prelude::*; use wasm_bindgen_futures::future_to_promise; @@ -41,4 +42,16 @@ impl runtime_raw::Runtime for Native { ) -> io::Result>> { panic!("Binding UDP sockets is currently not supported in wasm"); } + + fn new_delay(&self, _dur: Duration) -> Pin> { + unimplemented!(); + } + + fn new_delay_at(&self, _dur: Duration) -> Pin> { + unimplemented!(); + } + + fn new_interval(&self, _dur: Duration) -> Pin> { + unimplemented!(); + } } diff --git a/runtime-raw/src/time.rs b/runtime-raw/src/time.rs index f675138b..de130808 100644 --- a/runtime-raw/src/time.rs +++ b/runtime-raw/src/time.rs @@ -1,5 +1,5 @@ use std::fmt::Debug; -use std::time::{Duration, Instant}; +use std::time::Instant; use std::future::Future; use futures::Stream; diff --git a/runtime-tokio/src/lib.rs b/runtime-tokio/src/lib.rs index 03563793..990d0511 100644 --- a/runtime-tokio/src/lib.rs +++ b/runtime-tokio/src/lib.rs @@ -22,6 +22,7 @@ use std::net::SocketAddr; use std::pin::Pin; use std::sync::{mpsc, Mutex}; use std::thread; +use std::time::Duration; mod tcp; mod udp; @@ -78,6 +79,18 @@ impl runtime_raw::Runtime for Tokio { let tokio_socket = tokio::net::UdpSocket::bind(&addr)?; Ok(Box::pin(UdpSocket { tokio_socket })) } + + fn new_delay(&self, _dur: Duration) -> Pin> { + unimplemented!(); + } + + fn new_delay_at(&self, _dur: Duration) -> Pin> { + unimplemented!(); + } + + fn new_interval(&self, _dur: Duration) -> Pin> { + unimplemented!(); + } } /// The single-threaded Tokio runtime based on `tokio-current-thread`. @@ -143,4 +156,16 @@ impl runtime_raw::Runtime for TokioCurrentThread { let tokio_socket = tokio::net::UdpSocket::bind(&addr)?; Ok(Box::pin(UdpSocket { tokio_socket })) } + + fn new_delay(&self, _dur: Duration) -> Pin> { + unimplemented!(); + } + + fn new_delay_at(&self, _dur: Duration) -> Pin> { + unimplemented!(); + } + + fn new_interval(&self, _dur: Duration) -> Pin> { + unimplemented!(); + } } From 38b95264cd7ef58790c8066f4a88a882d77f74d8 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Sat, 18 May 2019 18:15:17 +0200 Subject: [PATCH 03/34] impl native delay Signed-off-by: Yoshua Wuyts --- runtime-native/src/not_wasm32.rs | 8 ++++++-- runtime-native/src/not_wasm32/time.rs | 23 +++++++++++++++++++++++ runtime-raw/src/time.rs | 2 +- 3 files changed, 30 insertions(+), 3 deletions(-) create mode 100644 runtime-native/src/not_wasm32/time.rs diff --git a/runtime-native/src/not_wasm32.rs b/runtime-native/src/not_wasm32.rs index 26d5da78..7f4b5b7b 100644 --- a/runtime-native/src/not_wasm32.rs +++ b/runtime-native/src/not_wasm32.rs @@ -1,6 +1,7 @@ use futures::prelude::*; use futures::{future::BoxFuture, task::SpawnError}; use lazy_static::lazy_static; +use futures_timer::Delay as AsyncDelay; use std::io; use std::net::SocketAddr; @@ -9,9 +10,11 @@ use std::time::Duration; mod tcp; mod udp; +mod time; use tcp::{TcpListener, TcpStream}; use udp::UdpSocket; +use time::Delay; lazy_static! { static ref JULIEX_THREADPOOL: juliex::ThreadPool = { @@ -60,8 +63,9 @@ impl runtime_raw::Runtime for Native { Ok(Box::pin(UdpSocket { romio_socket })) } - fn new_delay(&self, _dur: Duration) -> Pin> { - unimplemented!(); + fn new_delay(&self, dur: Duration) -> Pin> { + let async_delay = AsyncDelay::new(dur); + Box::pin(Delay { async_delay }) } fn new_delay_at(&self, _dur: Duration) -> Pin> { diff --git a/runtime-native/src/not_wasm32/time.rs b/runtime-native/src/not_wasm32/time.rs new file mode 100644 index 00000000..e1ba0472 --- /dev/null +++ b/runtime-native/src/not_wasm32/time.rs @@ -0,0 +1,23 @@ +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::time::Instant; + +use futures::prelude::*; +use futures_timer::Delay as AsyncDelay; + +pub(crate) struct Delay { + pub(crate) async_delay: AsyncDelay, +} + +impl runtime_raw::Delay for Delay {} + +impl Future for Delay { + type Output = Instant; + + #[inline] + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + // TODO: this should probably not be fallible. + futures::ready!(Pin::new(&mut self.async_delay).poll(cx)).unwrap(); + Poll::Ready(Instant::now()) + } +} diff --git a/runtime-raw/src/time.rs b/runtime-raw/src/time.rs index de130808..49ca5475 100644 --- a/runtime-raw/src/time.rs +++ b/runtime-raw/src/time.rs @@ -5,7 +5,7 @@ use std::future::Future; use futures::Stream; /// A future representing the notification that an elapsed duration has occurred. -pub trait Delay: Future + Debug + Send {} +pub trait Delay: Future + Send {} /// A stream representing notifications at a fixed interval. pub trait Interval: Stream + Debug + Send {} From 49e06006e05afe028478935feb816cbf589ac278 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Sat, 18 May 2019 18:33:17 +0200 Subject: [PATCH 04/34] delay at Signed-off-by: Yoshua Wuyts --- runtime-native/src/not_wasm32.rs | 7 ++++--- runtime-native/src/wasm32.rs | 4 ++-- runtime-raw/src/lib.rs | 4 ++-- runtime-tokio/src/lib.rs | 6 +++--- 4 files changed, 11 insertions(+), 10 deletions(-) diff --git a/runtime-native/src/not_wasm32.rs b/runtime-native/src/not_wasm32.rs index 7f4b5b7b..aa8f7883 100644 --- a/runtime-native/src/not_wasm32.rs +++ b/runtime-native/src/not_wasm32.rs @@ -6,7 +6,7 @@ use futures_timer::Delay as AsyncDelay; use std::io; use std::net::SocketAddr; use std::pin::Pin; -use std::time::Duration; +use std::time::{Duration, Instant}; mod tcp; mod udp; @@ -68,8 +68,9 @@ impl runtime_raw::Runtime for Native { Box::pin(Delay { async_delay }) } - fn new_delay_at(&self, _dur: Duration) -> Pin> { - unimplemented!(); + fn new_delay_at(&self, at: Instant) -> Pin> { + let async_delay = AsyncDelay::new_at(at); + Box::pin(Delay { async_delay }) } fn new_interval(&self, _dur: Duration) -> Pin> { diff --git a/runtime-native/src/wasm32.rs b/runtime-native/src/wasm32.rs index 625d74c3..3b18a44c 100644 --- a/runtime-native/src/wasm32.rs +++ b/runtime-native/src/wasm32.rs @@ -5,7 +5,7 @@ use futures::{future::BoxFuture, task::SpawnError}; use std::io; use std::net::SocketAddr; use std::pin::Pin; -use std::time::Duration; +use std::time::{Duration, Instant}; use wasm_bindgen::prelude::*; use wasm_bindgen_futures::future_to_promise; @@ -47,7 +47,7 @@ impl runtime_raw::Runtime for Native { unimplemented!(); } - fn new_delay_at(&self, _dur: Duration) -> Pin> { + fn new_delay_at(&self, _at: Instant) -> Pin> { unimplemented!(); } diff --git a/runtime-raw/src/lib.rs b/runtime-raw/src/lib.rs index 99e3b747..d92ef97e 100644 --- a/runtime-raw/src/lib.rs +++ b/runtime-raw/src/lib.rs @@ -19,7 +19,7 @@ use futures::future::BoxFuture; use futures::prelude::*; use futures::task::SpawnError; -use std::time::Duration; +use std::time::{Duration, Instant}; use std::cell::Cell; use std::io; use std::net::SocketAddr; @@ -109,7 +109,7 @@ pub trait Runtime: Send + Sync + 'static { /// /// This method is defined on the `Runtime` trait because defining it on /// `Delay` would prevent it from being a trait object. - fn new_delay_at(&self, dur: Duration) -> Pin>; + fn new_delay_at(&self, at: Instant) -> Pin>; /// A stream representing notifications at a fixed interval. /// diff --git a/runtime-tokio/src/lib.rs b/runtime-tokio/src/lib.rs index 990d0511..e68381b0 100644 --- a/runtime-tokio/src/lib.rs +++ b/runtime-tokio/src/lib.rs @@ -22,7 +22,7 @@ use std::net::SocketAddr; use std::pin::Pin; use std::sync::{mpsc, Mutex}; use std::thread; -use std::time::Duration; +use std::time::{Duration, Instant}; mod tcp; mod udp; @@ -84,7 +84,7 @@ impl runtime_raw::Runtime for Tokio { unimplemented!(); } - fn new_delay_at(&self, _dur: Duration) -> Pin> { + fn new_delay_at(&self, _at: Instant) -> Pin> { unimplemented!(); } @@ -161,7 +161,7 @@ impl runtime_raw::Runtime for TokioCurrentThread { unimplemented!(); } - fn new_delay_at(&self, _dur: Duration) -> Pin> { + fn new_delay_at(&self, _at: Instant) -> Pin> { unimplemented!(); } From fc6e2e9f816183cb5e0201c9ac590686a4fb82bc Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Sat, 18 May 2019 21:02:39 +0200 Subject: [PATCH 05/34] better errs Signed-off-by: Yoshua Wuyts --- runtime-native/src/wasm32.rs | 6 +++--- runtime-tokio/src/lib.rs | 12 ++++++------ 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/runtime-native/src/wasm32.rs b/runtime-native/src/wasm32.rs index 3b18a44c..6f1aebf1 100644 --- a/runtime-native/src/wasm32.rs +++ b/runtime-native/src/wasm32.rs @@ -44,14 +44,14 @@ impl runtime_raw::Runtime for Native { } fn new_delay(&self, _dur: Duration) -> Pin> { - unimplemented!(); + panic!("Timers are currently not supported in wasm"); } fn new_delay_at(&self, _at: Instant) -> Pin> { - unimplemented!(); + panic!("Timers are currently not supported in wasm"); } fn new_interval(&self, _dur: Duration) -> Pin> { - unimplemented!(); + panic!("Timers are currently not supported in wasm"); } } diff --git a/runtime-tokio/src/lib.rs b/runtime-tokio/src/lib.rs index e68381b0..f9f412a0 100644 --- a/runtime-tokio/src/lib.rs +++ b/runtime-tokio/src/lib.rs @@ -81,15 +81,15 @@ impl runtime_raw::Runtime for Tokio { } fn new_delay(&self, _dur: Duration) -> Pin> { - unimplemented!(); + panic!("Timers are currently not supported in runtime-tokio"); } fn new_delay_at(&self, _at: Instant) -> Pin> { - unimplemented!(); + panic!("Timers are currently not supported in runtime-tokio"); } fn new_interval(&self, _dur: Duration) -> Pin> { - unimplemented!(); + panic!("Timers are currently not supported in runtime-tokio"); } } @@ -158,14 +158,14 @@ impl runtime_raw::Runtime for TokioCurrentThread { } fn new_delay(&self, _dur: Duration) -> Pin> { - unimplemented!(); + panic!("Timers are currently not supported in runtime-tokio"); } fn new_delay_at(&self, _at: Instant) -> Pin> { - unimplemented!(); + panic!("Timers are currently not supported in runtime-tokio"); } fn new_interval(&self, _dur: Duration) -> Pin> { - unimplemented!(); + panic!("Timers are currently not supported in runtime-tokio"); } } From 027f27527fb556d5903108c3b634baf32cd72bb4 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Sat, 18 May 2019 21:49:51 +0200 Subject: [PATCH 06/34] it works! Signed-off-by: Yoshua Wuyts --- runtime-native/src/not_wasm32.rs | 11 +++++---- runtime-native/src/not_wasm32/time.rs | 34 ++++++++++++++++++++++++++- runtime-raw/src/time.rs | 2 +- src/lib.rs | 1 + src/time/delay.rs | 16 +++++++++---- src/time/interval.rs | 6 ++--- 6 files changed, 55 insertions(+), 15 deletions(-) diff --git a/runtime-native/src/not_wasm32.rs b/runtime-native/src/not_wasm32.rs index aa8f7883..8fe41c17 100644 --- a/runtime-native/src/not_wasm32.rs +++ b/runtime-native/src/not_wasm32.rs @@ -1,7 +1,7 @@ use futures::prelude::*; use futures::{future::BoxFuture, task::SpawnError}; +use futures_timer::{Delay as AsyncDelay, Interval as AsyncInterval}; use lazy_static::lazy_static; -use futures_timer::Delay as AsyncDelay; use std::io; use std::net::SocketAddr; @@ -9,12 +9,12 @@ use std::pin::Pin; use std::time::{Duration, Instant}; mod tcp; -mod udp; mod time; +mod udp; use tcp::{TcpListener, TcpStream}; +use time::{Delay, Interval}; use udp::UdpSocket; -use time::Delay; lazy_static! { static ref JULIEX_THREADPOOL: juliex::ThreadPool = { @@ -73,7 +73,8 @@ impl runtime_raw::Runtime for Native { Box::pin(Delay { async_delay }) } - fn new_interval(&self, _dur: Duration) -> Pin> { - unimplemented!(); + fn new_interval(&self, dur: Duration) -> Pin> { + let async_interval = AsyncInterval::new(dur); + Box::pin(Interval { async_interval }) } } diff --git a/runtime-native/src/not_wasm32/time.rs b/runtime-native/src/not_wasm32/time.rs index e1ba0472..bc49b46d 100644 --- a/runtime-native/src/not_wasm32/time.rs +++ b/runtime-native/src/not_wasm32/time.rs @@ -1,9 +1,10 @@ use std::pin::Pin; use std::task::{Context, Poll}; use std::time::Instant; +use std::fmt; use futures::prelude::*; -use futures_timer::Delay as AsyncDelay; +use futures_timer::{Delay as AsyncDelay, Interval as AsyncInterval}; pub(crate) struct Delay { pub(crate) async_delay: AsyncDelay, @@ -21,3 +22,34 @@ impl Future for Delay { Poll::Ready(Instant::now()) } } + +// TODO: implement this +impl fmt::Debug for Delay { + fn fmt(&self, _f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { + unimplemented!(); + } +} + +pub(crate) struct Interval { + pub(crate) async_interval: AsyncInterval, +} + +impl runtime_raw::Interval for Interval {} + +impl Stream for Interval { + type Item = Instant; + + #[inline] + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // TODO: this should probably not be fallible. + futures::ready!(Pin::new(&mut self.async_interval).poll_next(cx)).unwrap(); + Poll::Ready(Some(Instant::now())) + } +} + +// TODO: implement this +impl fmt::Debug for Interval { + fn fmt(&self, _f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { + unimplemented!(); + } +} diff --git a/runtime-raw/src/time.rs b/runtime-raw/src/time.rs index 49ca5475..de130808 100644 --- a/runtime-raw/src/time.rs +++ b/runtime-raw/src/time.rs @@ -5,7 +5,7 @@ use std::future::Future; use futures::Stream; /// A future representing the notification that an elapsed duration has occurred. -pub trait Delay: Future + Send {} +pub trait Delay: Future + Debug + Send {} /// A stream representing notifications at a fixed interval. pub trait Interval: Stream + Debug + Send {} diff --git a/src/lib.rs b/src/lib.rs index 2e4252e7..40f6d57f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -96,6 +96,7 @@ pub mod net; pub mod task; +pub mod time; #[doc(inline)] pub use task::spawn; diff --git a/src/time/delay.rs b/src/time/delay.rs index 3fb6fde1..a3d41829 100644 --- a/src/time/delay.rs +++ b/src/time/delay.rs @@ -3,10 +3,10 @@ use futures::prelude::*; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::{Duration, Instant}; +use std::fmt; /// A future representing the notification that an elapsed duration has occurred. #[must_use = "futures do nothing unless awaited"] -#[derive(Debug)] pub struct Delay { inner: Pin>, } @@ -15,23 +15,29 @@ impl Delay { /// Sleep the current future for the given duration. #[inline] pub fn new(dur: Duration) -> Self { - let inner = runtime_raw::new_delay(dur); + let inner = runtime_raw::current_runtime().new_delay(dur); Self { inner } } /// Sleep the current future until the given time. #[inline] pub fn new_at(at: Instant) -> Self { - let inner = runtime_raw::new_delay_at(dur); + let inner = runtime_raw::current_runtime().new_delay_at(at); Self { inner } } } +impl fmt::Debug for Delay { + fn fmt(&self, _f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { + unimplemented!(); + } +} + impl Future for Delay { type Output = Instant; #[inline] - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.inner.as_mut().poll(cx) + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.inner.poll_unpin(cx) } } diff --git a/src/time/interval.rs b/src/time/interval.rs index b68f0add..f8e893ed 100644 --- a/src/time/interval.rs +++ b/src/time/interval.rs @@ -15,7 +15,7 @@ impl Interval { /// Create a stream that fires events at a set interval. #[inline] pub fn new(dur: Duration) -> Self { - let inner = runtime_raw::new_interval(dur); + let inner = runtime_raw::current_runtime().new_interval(dur); Self { inner } } } @@ -24,7 +24,7 @@ impl Stream for Interval { type Item = Instant; #[inline] - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.inner.as_mut().poll_next(cx) + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_next_unpin(cx) } } From 3060412fb7ffe026ae9063da70ccf9734f8929a9 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Sat, 18 May 2019 21:53:42 +0200 Subject: [PATCH 07/34] fmt Signed-off-by: Yoshua Wuyts --- runtime-native/src/not_wasm32/time.rs | 2 +- runtime-raw/src/lib.rs | 6 +++--- runtime-raw/src/time.rs | 2 +- src/time/delay.rs | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/runtime-native/src/not_wasm32/time.rs b/runtime-native/src/not_wasm32/time.rs index bc49b46d..1b3bf4c1 100644 --- a/runtime-native/src/not_wasm32/time.rs +++ b/runtime-native/src/not_wasm32/time.rs @@ -1,7 +1,7 @@ +use std::fmt; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::Instant; -use std::fmt; use futures::prelude::*; use futures_timer::{Delay as AsyncDelay, Interval as AsyncInterval}; diff --git a/runtime-raw/src/lib.rs b/runtime-raw/src/lib.rs index d92ef97e..07030897 100644 --- a/runtime-raw/src/lib.rs +++ b/runtime-raw/src/lib.rs @@ -19,19 +19,19 @@ use futures::future::BoxFuture; use futures::prelude::*; use futures::task::SpawnError; -use std::time::{Duration, Instant}; use std::cell::Cell; use std::io; use std::net::SocketAddr; use std::pin::Pin; +use std::time::{Duration, Instant}; -mod udp; mod tcp; mod time; +mod udp; -pub use udp::*; pub use tcp::*; pub use time::*; +pub use udp::*; thread_local! { static RUNTIME: Cell> = Cell::new(None); diff --git a/runtime-raw/src/time.rs b/runtime-raw/src/time.rs index de130808..1e2d53b9 100644 --- a/runtime-raw/src/time.rs +++ b/runtime-raw/src/time.rs @@ -1,6 +1,6 @@ use std::fmt::Debug; -use std::time::Instant; use std::future::Future; +use std::time::Instant; use futures::Stream; diff --git a/src/time/delay.rs b/src/time/delay.rs index a3d41829..46968b59 100644 --- a/src/time/delay.rs +++ b/src/time/delay.rs @@ -1,9 +1,9 @@ use futures::prelude::*; +use std::fmt; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::{Duration, Instant}; -use std::fmt; /// A future representing the notification that an elapsed duration has occurred. #[must_use = "futures do nothing unless awaited"] From e99dee7e6be2148b6d128faad7cead206cac458c Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Sat, 18 May 2019 22:24:03 +0200 Subject: [PATCH 08/34] fix checks Signed-off-by: Yoshua Wuyts --- src/time.rs | 2 ++ src/time/ext.rs | 10 +++++----- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/src/time.rs b/src/time.rs index 72b2bc25..b7108080 100644 --- a/src/time.rs +++ b/src/time.rs @@ -1,3 +1,5 @@ +//! Temporal manipulation. + mod delay; mod interval; diff --git a/src/time/ext.rs b/src/time/ext.rs index 74e53a84..6df1b9b2 100644 --- a/src/time/ext.rs +++ b/src/time/ext.rs @@ -15,7 +15,7 @@ pub struct Timeout { impl Future for Timeout { type Output = Result<::Output, TimeoutError>; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { unimplemented!(); } } @@ -25,13 +25,13 @@ pub struct TimeoutError(pub Instant); impl Error for TimeoutError {} impl Debug for TimeoutError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { + fn fmt(&self, _f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { unimplemented!(); } } impl Display for TimeoutError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { + fn fmt(&self, _f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { unimplemented!(); } } @@ -39,12 +39,12 @@ impl Display for TimeoutError { /// Extend `Future` with methods to time out execution. pub trait FutureExt: Future + Sized { /// Timeout the future if it isn't completed after `dur` duration. - fn timeout(self, dur: Duration) -> Timeout { + fn timeout(self, _dur: Duration) -> Timeout { unimplemented!(); } /// Timeout the future if it isn't completed by `at`. - fn deadline(self, at: Instant) -> Timeout { + fn deadline(self, _at: Instant) -> Timeout { unimplemented!(); } } From 0bd95d50e08c4f5cbe722874dc80b04303e00194 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Sat, 18 May 2019 22:27:50 +0200 Subject: [PATCH 09/34] futureext Signed-off-by: Yoshua Wuyts --- src/time.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/time.rs b/src/time.rs index b7108080..cfbc21ea 100644 --- a/src/time.rs +++ b/src/time.rs @@ -6,6 +6,8 @@ mod interval; pub mod ext; pub use delay::Delay; +#[doc(inline)] +pub use ext::FutureExt; pub use interval::Interval; use std::time::{Duration, Instant}; From 137a64b855ff8ca914c1f621a84dbd21469aab0e Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Sun, 19 May 2019 11:33:20 +0200 Subject: [PATCH 10/34] time docs Signed-off-by: Yoshua Wuyts --- src/time.rs | 50 +++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 49 insertions(+), 1 deletion(-) diff --git a/src/time.rs b/src/time.rs index cfbc21ea..4a89dd27 100644 --- a/src/time.rs +++ b/src/time.rs @@ -1,4 +1,52 @@ -//! Temporal manipulation. +//! Types and Functions for temporal manipulation. +//! +//! This module provides primitives for setting asynchronous timeouts, intervals, and delays. +//! +//! # Organization +//! +//! * [`Delay`] and [`Interval`] provide functionality for setting delays and intervals. +//! * [`FutureExt`] extends Futures with the ability to time-out. +//! * Other types are return or parameter types for various methods in this module +//! +//! [`Delay`]: struct.Delay.html +//! [`Interval`]: struct.Interval.html +//! [`FutureExt`]: trait.FutureExt.html +//! +//! ## Examples +//! __Schedule a 3-second delay__ +//! ```no_run +//! # #![feature(async_await)] +//! # #[runtime::main] +//! # async fn main() { +//! use runtime::time::wait_for; +//! use std::time::{Duration, Instant}; +//! +//! let start = Instant::now(); +//! let now = wait_for(Duration::from_secs(3)).await; +//! +//! let elapsed = now - start; +//! println!("elapsed: {}s", elapsed.as_secs()); +//! # } +//! ``` +//! +//! __Schedule a 2-second interval__ +//! ```ignore +//! # #![feature(async_await)] +//! # #[runtime::main] +//! # async fn main() { +//! # use futures::for_await; +//! use runtime::time::repeat; +//! use std::time::{Duration, Instant}; +//! +//! let start = Instant::now(); +//! +//! #[for_await] +//! for now in repeat(Duration::from_secs(2)) { +//! let elapsed = now - start; +//! println!("elapsed: {}s", elapsed.as_secs()); +//! } +//! # } +//! ``` mod delay; mod interval; From 7237edf62b62585759e230d06d29b611ec3208ed Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Thu, 30 May 2019 12:00:00 +0200 Subject: [PATCH 11/34] add debug impls for native Signed-off-by: Yoshua Wuyts --- Cargo.toml | 4 ---- runtime-native/Cargo.toml | 2 +- runtime-native/src/not_wasm32/time.rs | 17 +++++++++++------ 3 files changed, 12 insertions(+), 11 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 74e1920f..d6f7ad5e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,7 +30,6 @@ juliex = "0.3.0-alpha.6" mio = "0.6.16" rand = "0.6.5" runtime-tokio = { path = "runtime-tokio", version = "0.3.0-alpha.3" } -futures-timer = "0.1.1" tokio = "0.1.19" [profile.bench] @@ -45,6 +44,3 @@ members = [ "runtime-raw", "runtime-tokio", ] - -[patch.crates-io] -futures-timer = { path = "../../alexcrichton/futures-timer" } diff --git a/runtime-native/Cargo.toml b/runtime-native/Cargo.toml index 4ad6e564..cac1c1c0 100644 --- a/runtime-native/Cargo.toml +++ b/runtime-native/Cargo.toml @@ -21,7 +21,7 @@ async-datagram = "2.2.0" juliex = "0.3.0-alpha.6" lazy_static = "1.3.0" romio = "0.3.0-alpha.7" -futures-timer = "0.1.1" +futures-timer = "0.2.0" [target.'cfg(target_arch = "wasm32")'.dependencies] futures01 = { package = "futures", version = "0.1" } diff --git a/runtime-native/src/not_wasm32/time.rs b/runtime-native/src/not_wasm32/time.rs index 1b3bf4c1..de47d702 100644 --- a/runtime-native/src/not_wasm32/time.rs +++ b/runtime-native/src/not_wasm32/time.rs @@ -17,7 +17,6 @@ impl Future for Delay { #[inline] fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - // TODO: this should probably not be fallible. futures::ready!(Pin::new(&mut self.async_delay).poll(cx)).unwrap(); Poll::Ready(Instant::now()) } @@ -25,8 +24,11 @@ impl Future for Delay { // TODO: implement this impl fmt::Debug for Delay { - fn fmt(&self, _f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { - unimplemented!(); + // fmt::Display::fmt(self.async_delay, f) + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { + f.debug_struct("Delay") + .field("when", &"...") + .finish() } } @@ -41,7 +43,6 @@ impl Stream for Interval { #[inline] fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - // TODO: this should probably not be fallible. futures::ready!(Pin::new(&mut self.async_interval).poll_next(cx)).unwrap(); Poll::Ready(Some(Instant::now())) } @@ -49,7 +50,11 @@ impl Stream for Interval { // TODO: implement this impl fmt::Debug for Interval { - fn fmt(&self, _f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { - unimplemented!(); + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { + // fmt::Display::fmt(self.async_interval, f) + f.debug_struct("Interval") + .field("delay", &"...") + .field("interval", &"...") + .finish() } } From 6b99a317599fbf05085b1c40d353b9819664024f Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Thu, 30 May 2019 12:03:53 +0200 Subject: [PATCH 12/34] update names Signed-off-by: Yoshua Wuyts --- src/time.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/time.rs b/src/time.rs index 4a89dd27..ee7b6d99 100644 --- a/src/time.rs +++ b/src/time.rs @@ -18,11 +18,11 @@ //! # #![feature(async_await)] //! # #[runtime::main] //! # async fn main() { -//! use runtime::time::wait_for; +//! use runtime::time::delay_for; //! use std::time::{Duration, Instant}; //! //! let start = Instant::now(); -//! let now = wait_for(Duration::from_secs(3)).await; +//! let now = delay_for(Duration::from_secs(3)).await; //! //! let elapsed = now - start; //! println!("elapsed: {}s", elapsed.as_secs()); @@ -35,13 +35,13 @@ //! # #[runtime::main] //! # async fn main() { //! # use futures::for_await; -//! use runtime::time::repeat; +//! use runtime::time::interval; //! use std::time::{Duration, Instant}; //! //! let start = Instant::now(); //! //! #[for_await] -//! for now in repeat(Duration::from_secs(2)) { +//! for now in interval(Duration::from_secs(2)) { //! let elapsed = now - start; //! println!("elapsed: {}s", elapsed.as_secs()); //! } @@ -62,18 +62,18 @@ use std::time::{Duration, Instant}; /// Sleep the current future for the given duration. #[inline] -pub fn wait_for(dur: Duration) -> Delay { +pub fn delay_for(dur: Duration) -> Delay { Delay::new(dur) } /// Sleep the current future until the given time. #[inline] -pub fn wait_until(at: Instant) -> Delay { +pub fn delay_until(at: Instant) -> Delay { Delay::new_at(at) } /// Create a stream that fires events at a set interval. #[inline] -pub fn repeat(dur: Duration) -> Interval { +pub fn interval(dur: Duration) -> Interval { Interval::new(dur) } From 3bc16fdf90c63bbdd6e22c359761fc2a63e843b9 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Thu, 30 May 2019 12:53:24 +0200 Subject: [PATCH 13/34] add tokio impl Signed-off-by: Yoshua Wuyts --- runtime-tokio/src/lib.rs | 18 ++++++++----- runtime-tokio/src/time.rs | 55 +++++++++++++++++++++++++++++++++++++++ src/time.rs | 4 +-- 3 files changed, 69 insertions(+), 8 deletions(-) create mode 100644 runtime-tokio/src/time.rs diff --git a/runtime-tokio/src/lib.rs b/runtime-tokio/src/lib.rs index f9f412a0..b5604af3 100644 --- a/runtime-tokio/src/lib.rs +++ b/runtime-tokio/src/lib.rs @@ -16,6 +16,7 @@ use futures::{ task::SpawnError, }; use lazy_static::lazy_static; +use tokio::timer::{Delay as TokioDelay, Interval as TokioInterval}; use std::io; use std::net::SocketAddr; @@ -26,9 +27,11 @@ use std::time::{Duration, Instant}; mod tcp; mod udp; +mod time; use tcp::{TcpListener, TcpStream}; use udp::UdpSocket; +use time::{Delay, Interval}; /// The default Tokio runtime. #[derive(Debug)] @@ -157,15 +160,18 @@ impl runtime_raw::Runtime for TokioCurrentThread { Ok(Box::pin(UdpSocket { tokio_socket })) } - fn new_delay(&self, _dur: Duration) -> Pin> { - panic!("Timers are currently not supported in runtime-tokio"); + fn new_delay(&self, dur: Duration) -> Pin> { + let tokio_delay = TokioDelay::new(Instant::now() + dur); + Box::pin(Delay { tokio_delay }) } - fn new_delay_at(&self, _at: Instant) -> Pin> { - panic!("Timers are currently not supported in runtime-tokio"); + fn new_delay_at(&self, at: Instant) -> Pin> { + let tokio_delay = TokioDelay::new(at); + Box::pin(Delay { tokio_delay }) } - fn new_interval(&self, _dur: Duration) -> Pin> { - panic!("Timers are currently not supported in runtime-tokio"); + fn new_interval(&self, dur: Duration) -> Pin> { + let tokio_interval = TokioInterval::new(Instant::now(), dur); + Box::pin(Interval { tokio_interval }) } } diff --git a/runtime-tokio/src/time.rs b/runtime-tokio/src/time.rs new file mode 100644 index 00000000..066d2f9f --- /dev/null +++ b/runtime-tokio/src/time.rs @@ -0,0 +1,55 @@ +use std::fmt; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::time::Instant; + +use futures::compat::Compat01As03; +use futures::prelude::*; +use tokio::timer::{Delay as TokioDelay, Interval as TokioInterval}; + +pub(crate) struct Delay { + pub(crate) tokio_delay: TokioDelay, +} + +impl runtime_raw::Delay for Delay {} + +impl Future for Delay { + type Output = Instant; + + #[inline] + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut fut = Compat01As03::new(&mut self.tokio_delay); + futures::ready!(Pin::new(&mut fut).poll(cx)).unwrap(); + Poll::Ready(Instant::now()) + } +} + +impl fmt::Debug for Delay { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { + fmt::Debug::fmt(&self.tokio_delay, f) + } +} + +pub(crate) struct Interval { + pub(crate) tokio_interval: TokioInterval, +} + +impl runtime_raw::Interval for Interval {} + +impl Stream for Interval { + type Item = Instant; + + #[inline] + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut stream = Compat01As03::new(&mut self.tokio_interval); + // https://docs.rs/tokio/0.1.20/tokio/timer/struct.Error.html + futures::ready!(Pin::new(&mut stream).poll_next(cx)).unwrap().unwrap(); + Poll::Ready(Some(Instant::now())) + } +} + +impl fmt::Debug for Interval { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { + fmt::Debug::fmt(&self.tokio_interval, f) + } +} diff --git a/src/time.rs b/src/time.rs index ee7b6d99..9baf710c 100644 --- a/src/time.rs +++ b/src/time.rs @@ -13,7 +13,7 @@ //! [`FutureExt`]: trait.FutureExt.html //! //! ## Examples -//! __Schedule a 3-second delay__ +//! __Schedule a three-second delay__ //! ```no_run //! # #![feature(async_await)] //! # #[runtime::main] @@ -29,7 +29,7 @@ //! # } //! ``` //! -//! __Schedule a 2-second interval__ +//! __Schedule a two-second interval__ //! ```ignore //! # #![feature(async_await)] //! # #[runtime::main] From 45070bf6ea390e10d3195ea5a32a8afe3139d40a Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Fri, 31 May 2019 11:51:52 +0200 Subject: [PATCH 14/34] update desc Signed-off-by: Yoshua Wuyts --- runtime-raw/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/runtime-raw/src/lib.rs b/runtime-raw/src/lib.rs index 07030897..47a4ceca 100644 --- a/runtime-raw/src/lib.rs +++ b/runtime-raw/src/lib.rs @@ -99,13 +99,13 @@ pub trait Runtime: Send + Sync + 'static { /// `UdpSocket` would prevent it from being a trait object. fn bind_udp_socket(&self, addr: &SocketAddr) -> io::Result>>; - /// Create a new Future that sleeps for the given duration. + /// Create a new Future that wakes up after the given duration /// /// This method is defined on the `Runtime` trait because defining it on /// `Delay` would prevent it from being a trait object. fn new_delay(&self, dur: Duration) -> Pin>; - /// Create a new Future that sleeps until the given time. + /// Create a new Future that wakes up at the given time. /// /// This method is defined on the `Runtime` trait because defining it on /// `Delay` would prevent it from being a trait object. From 25b7002bdd9a9d20b97d8c917bf3c2b10adf9e3b Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Fri, 31 May 2019 14:26:50 +0200 Subject: [PATCH 15/34] rustfmt + simpler Debugs Signed-off-by: Yoshua Wuyts --- runtime-native/Cargo.toml | 2 +- runtime-native/src/not_wasm32/time.rs | 24 ++---------------------- runtime-tokio/src/lib.rs | 4 ++-- runtime-tokio/src/time.rs | 19 +++++-------------- 4 files changed, 10 insertions(+), 39 deletions(-) diff --git a/runtime-native/Cargo.toml b/runtime-native/Cargo.toml index cac1c1c0..796c3b1a 100644 --- a/runtime-native/Cargo.toml +++ b/runtime-native/Cargo.toml @@ -21,7 +21,7 @@ async-datagram = "2.2.0" juliex = "0.3.0-alpha.6" lazy_static = "1.3.0" romio = "0.3.0-alpha.7" -futures-timer = "0.2.0" +futures-timer = "0.2.1" [target.'cfg(target_arch = "wasm32")'.dependencies] futures01 = { package = "futures", version = "0.1" } diff --git a/runtime-native/src/not_wasm32/time.rs b/runtime-native/src/not_wasm32/time.rs index de47d702..c0daec0f 100644 --- a/runtime-native/src/not_wasm32/time.rs +++ b/runtime-native/src/not_wasm32/time.rs @@ -1,4 +1,3 @@ -use std::fmt; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::Instant; @@ -6,6 +5,7 @@ use std::time::Instant; use futures::prelude::*; use futures_timer::{Delay as AsyncDelay, Interval as AsyncInterval}; +#[derive(Debug)] pub(crate) struct Delay { pub(crate) async_delay: AsyncDelay, } @@ -22,16 +22,7 @@ impl Future for Delay { } } -// TODO: implement this -impl fmt::Debug for Delay { - // fmt::Display::fmt(self.async_delay, f) - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { - f.debug_struct("Delay") - .field("when", &"...") - .finish() - } -} - +#[derive(Debug)] pub(crate) struct Interval { pub(crate) async_interval: AsyncInterval, } @@ -47,14 +38,3 @@ impl Stream for Interval { Poll::Ready(Some(Instant::now())) } } - -// TODO: implement this -impl fmt::Debug for Interval { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { - // fmt::Display::fmt(self.async_interval, f) - f.debug_struct("Interval") - .field("delay", &"...") - .field("interval", &"...") - .finish() - } -} diff --git a/runtime-tokio/src/lib.rs b/runtime-tokio/src/lib.rs index b5604af3..93d09536 100644 --- a/runtime-tokio/src/lib.rs +++ b/runtime-tokio/src/lib.rs @@ -26,12 +26,12 @@ use std::thread; use std::time::{Duration, Instant}; mod tcp; -mod udp; mod time; +mod udp; use tcp::{TcpListener, TcpStream}; -use udp::UdpSocket; use time::{Delay, Interval}; +use udp::UdpSocket; /// The default Tokio runtime. #[derive(Debug)] diff --git a/runtime-tokio/src/time.rs b/runtime-tokio/src/time.rs index 066d2f9f..0931a5d4 100644 --- a/runtime-tokio/src/time.rs +++ b/runtime-tokio/src/time.rs @@ -1,4 +1,3 @@ -use std::fmt; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::Instant; @@ -7,6 +6,7 @@ use futures::compat::Compat01As03; use futures::prelude::*; use tokio::timer::{Delay as TokioDelay, Interval as TokioInterval}; +#[derive(Debug)] pub(crate) struct Delay { pub(crate) tokio_delay: TokioDelay, } @@ -24,12 +24,7 @@ impl Future for Delay { } } -impl fmt::Debug for Delay { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { - fmt::Debug::fmt(&self.tokio_delay, f) - } -} - +#[derive(Debug)] pub(crate) struct Interval { pub(crate) tokio_interval: TokioInterval, } @@ -43,13 +38,9 @@ impl Stream for Interval { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut stream = Compat01As03::new(&mut self.tokio_interval); // https://docs.rs/tokio/0.1.20/tokio/timer/struct.Error.html - futures::ready!(Pin::new(&mut stream).poll_next(cx)).unwrap().unwrap(); + futures::ready!(Pin::new(&mut stream).poll_next(cx)) + .unwrap() + .unwrap(); Poll::Ready(Some(Instant::now())) } } - -impl fmt::Debug for Interval { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { - fmt::Debug::fmt(&self.tokio_interval, f) - } -} From 7167e9d2c2d86a2a8476b53df935a974e7c8b760 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Wed, 5 Jun 2019 03:38:11 +0200 Subject: [PATCH 16/34] initial docs Signed-off-by: Yoshua Wuyts --- src/time.rs | 49 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/src/time.rs b/src/time.rs index 9baf710c..55e17488 100644 --- a/src/time.rs +++ b/src/time.rs @@ -61,18 +61,67 @@ pub use interval::Interval; use std::time::{Duration, Instant}; /// Sleep the current future for the given duration. +/// +/// ## Examples +/// ``` +/// # #![feature(async_await)] +/// use runtime::time::delay_for; +/// use std::time::{Duration, Instant}; +/// +/// # #[runtime::main] +/// # async fn main () -> Result<(), Box> { +/// let start = Instant::now(); +/// let now = delay_for(Duration::from_millis(20)).await; +/// +/// assert!(now - start >= Duration::from_millis(20)); +/// # Ok(())} +/// ``` #[inline] pub fn delay_for(dur: Duration) -> Delay { Delay::new(dur) } /// Sleep the current future until the given time. +/// +/// ## Examples +/// ``` +/// # #![feature(async_await)] +/// use runtime::time::delay_until; +/// use std::time::{Duration, Instant}; +/// +/// # #[runtime::main] +/// # async fn main () -> Result<(), Box> { +/// let start = Instant::now(); +/// let now = delay_until(start + Duration::from_millis(40)).await; +/// +/// assert!(now - start >= Duration::from_millis(40)); +/// # Ok(())} +/// ``` #[inline] pub fn delay_until(at: Instant) -> Delay { Delay::new_at(at) } /// Create a stream that fires events at a set interval. +/// +/// ## Examples +/// ``` +/// # #![feature(async_await)] +/// # use futures::prelude::*; +/// use runtime::time::interval; +/// use std::time::{Duration, Instant}; +/// +/// # #[runtime::main] +/// # async fn main () -> Result<(), Box> { +/// let start = Instant::now(); +/// let mut interval = interval(Duration::from_millis(10)).take(3); +/// while let Some(now) = interval.next().await { +/// println!("{}ms have elapsed", (now - start).as_millis()); +/// } +/// +/// assert!(Instant::now() - start >= Duration::from_millis(30)); +/// # Ok(())} +/// ``` #[inline] pub fn interval(dur: Duration) -> Interval { Interval::new(dur) From 710d305bedcc733d779f6a237b93d5d7d2422c7f Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Wed, 5 Jun 2019 12:50:50 +0200 Subject: [PATCH 17/34] polish interface Signed-off-by: Yoshua Wuyts --- src/time.rs | 93 ++++++-------------------------------------- src/time/delay.rs | 34 +++++++++++++++- src/time/ext.rs | 8 +++- src/time/interval.rs | 19 +++++++++ 4 files changed, 68 insertions(+), 86 deletions(-) diff --git a/src/time.rs b/src/time.rs index 55e17488..9560d0b8 100644 --- a/src/time.rs +++ b/src/time.rs @@ -18,11 +18,11 @@ //! # #![feature(async_await)] //! # #[runtime::main] //! # async fn main() { -//! use runtime::time::delay_for; +//! use runtime::time::Delay; //! use std::time::{Duration, Instant}; //! //! let start = Instant::now(); -//! let now = delay_for(Duration::from_secs(3)).await; +//! let now = Delay::new(Duration::from_secs(3)).await; //! //! let elapsed = now - start; //! println!("elapsed: {}s", elapsed.as_secs()); @@ -30,18 +30,18 @@ //! ``` //! //! __Schedule a two-second interval__ -//! ```ignore +//! ```no_run //! # #![feature(async_await)] //! # #[runtime::main] //! # async fn main() { -//! # use futures::for_await; -//! use runtime::time::interval; +//! # use futures::prelude::*; +//! use runtime::time::Interval; //! use std::time::{Duration, Instant}; //! //! let start = Instant::now(); //! -//! #[for_await] -//! for now in interval(Duration::from_secs(2)) { +//! let mut interval = Interval::new(Duration::from_secs(2)); +//! while let Some(now) = interval.next().await { //! let elapsed = now - start; //! println!("elapsed: {}s", elapsed.as_secs()); //! } @@ -50,79 +50,8 @@ mod delay; mod interval; +mod ext; -pub mod ext; - -pub use delay::Delay; -#[doc(inline)] -pub use ext::FutureExt; -pub use interval::Interval; - -use std::time::{Duration, Instant}; - -/// Sleep the current future for the given duration. -/// -/// ## Examples -/// ``` -/// # #![feature(async_await)] -/// use runtime::time::delay_for; -/// use std::time::{Duration, Instant}; -/// -/// # #[runtime::main] -/// # async fn main () -> Result<(), Box> { -/// let start = Instant::now(); -/// let now = delay_for(Duration::from_millis(20)).await; -/// -/// assert!(now - start >= Duration::from_millis(20)); -/// # Ok(())} -/// ``` -#[inline] -pub fn delay_for(dur: Duration) -> Delay { - Delay::new(dur) -} - -/// Sleep the current future until the given time. -/// -/// ## Examples -/// ``` -/// # #![feature(async_await)] -/// use runtime::time::delay_until; -/// use std::time::{Duration, Instant}; -/// -/// # #[runtime::main] -/// # async fn main () -> Result<(), Box> { -/// let start = Instant::now(); -/// let now = delay_until(start + Duration::from_millis(40)).await; -/// -/// assert!(now - start >= Duration::from_millis(40)); -/// # Ok(())} -/// ``` -#[inline] -pub fn delay_until(at: Instant) -> Delay { - Delay::new_at(at) -} - -/// Create a stream that fires events at a set interval. -/// -/// ## Examples -/// ``` -/// # #![feature(async_await)] -/// # use futures::prelude::*; -/// use runtime::time::interval; -/// use std::time::{Duration, Instant}; -/// -/// # #[runtime::main] -/// # async fn main () -> Result<(), Box> { -/// let start = Instant::now(); -/// let mut interval = interval(Duration::from_millis(10)).take(3); -/// while let Some(now) = interval.next().await { -/// println!("{}ms have elapsed", (now - start).as_millis()); -/// } -/// -/// assert!(Instant::now() - start >= Duration::from_millis(30)); -/// # Ok(())} -/// ``` -#[inline] -pub fn interval(dur: Duration) -> Interval { - Interval::new(dur) -} +pub use delay::*; +pub use interval::*; +pub use ext::*; diff --git a/src/time/delay.rs b/src/time/delay.rs index 46968b59..11d8b750 100644 --- a/src/time/delay.rs +++ b/src/time/delay.rs @@ -12,14 +12,44 @@ pub struct Delay { } impl Delay { - /// Sleep the current future for the given duration. + /// Continue execution after the duration has passed. + /// + /// ## Examples + /// ``` + /// # #![feature(async_await)] + /// use runtime::time::Delay; + /// use std::time::{Duration, Instant}; + /// + /// # #[runtime::main] + /// # async fn main () -> Result<(), Box> { + /// let start = Instant::now(); + /// let now = Delay::new(Duration::from_millis(40)).await; + /// + /// assert!(now - start >= Duration::from_millis(40)); + /// # Ok(())} + /// ``` #[inline] pub fn new(dur: Duration) -> Self { let inner = runtime_raw::current_runtime().new_delay(dur); Self { inner } } - /// Sleep the current future until the given time. + /// Continue execution after the given instant. + /// + /// ## Examples + /// ``` + /// # #![feature(async_await)] + /// use runtime::time::Delay; + /// use std::time::{Duration, Instant}; + /// + /// # #[runtime::main] + /// # async fn main () -> Result<(), Box> { + /// let start = Instant::now(); + /// let now = Delay::new_at(start + Duration::from_millis(40)).await; + /// + /// assert!(now - start >= Duration::from_millis(40)); + /// # Ok(())} + /// ``` #[inline] pub fn new_at(at: Instant) -> Self { let inner = runtime_raw::current_runtime().new_delay_at(at); diff --git a/src/time/ext.rs b/src/time/ext.rs index 6df1b9b2..b981f16a 100644 --- a/src/time/ext.rs +++ b/src/time/ext.rs @@ -7,7 +7,9 @@ use std::pin::Pin; use std::task::{Context, Poll}; use std::time::{Duration, Instant}; -/// The Future returned from [`FutureExt.timeout`]. +/// The Future returned from [`FutureExt`]. +/// +/// [`FutureExt.timeout`]: trait.FutureExt.html #[derive(Debug)] pub struct Timeout { _phantom: std::marker::PhantomData, @@ -20,7 +22,9 @@ impl Future for Timeout { } } -/// The Error returned in the Future returned from [`FutureExt.timeout`]. +/// The Error returned from [`Timeout`]. +/// +/// [`Timeout`]: struct.Timeout.html pub struct TimeoutError(pub Instant); impl Error for TimeoutError {} diff --git a/src/time/interval.rs b/src/time/interval.rs index f8e893ed..004cba32 100644 --- a/src/time/interval.rs +++ b/src/time/interval.rs @@ -13,6 +13,25 @@ pub struct Interval { impl Interval { /// Create a stream that fires events at a set interval. + /// + /// ## Examples + /// ``` + /// # #![feature(async_await)] + /// # use futures::prelude::*; + /// use runtime::time::Interval; + /// use std::time::{Duration, Instant}; + /// + /// # #[runtime::main] + /// # async fn main () -> Result<(), Box> { + /// let start = Instant::now(); + /// let mut interval = Interval::new(Duration::from_millis(10)).take(3); + /// while let Some(now) = interval.next().await { + /// println!("{}ms have elapsed", (now - start).as_millis()); + /// } + /// + /// assert!(Instant::now() - start >= Duration::from_millis(30)); + /// # Ok(())} + /// ``` #[inline] pub fn new(dur: Duration) -> Self { let inner = runtime_raw::current_runtime().new_interval(dur); From 5c529a9a5303e9ebfe1b21519e3cf90b7b330a43 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Wed, 5 Jun 2019 12:52:17 +0200 Subject: [PATCH 18/34] add debug Signed-off-by: Yoshua Wuyts --- src/time/delay.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/time/delay.rs b/src/time/delay.rs index 11d8b750..9954f478 100644 --- a/src/time/delay.rs +++ b/src/time/delay.rs @@ -58,8 +58,8 @@ impl Delay { } impl fmt::Debug for Delay { - fn fmt(&self, _f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { - unimplemented!(); + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { + fmt::Debug::fmt(&self.inner, f) } } From 1642d979df38fd86b8b132773b704f0ae777798f Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Wed, 5 Jun 2019 12:53:12 +0200 Subject: [PATCH 19/34] better debug Signed-off-by: Yoshua Wuyts --- src/time/interval.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/time/interval.rs b/src/time/interval.rs index 004cba32..5db84f45 100644 --- a/src/time/interval.rs +++ b/src/time/interval.rs @@ -3,10 +3,10 @@ use futures::prelude::*; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::{Duration, Instant}; +use std::fmt; /// A stream representing notifications at a fixed interval. #[must_use = "streams do nothing unless polled"] -#[derive(Debug)] pub struct Interval { inner: Pin>, } @@ -47,3 +47,9 @@ impl Stream for Interval { self.inner.poll_next_unpin(cx) } } + +impl fmt::Debug for Interval { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { + fmt::Debug::fmt(&self.inner, f) + } +} From a2bf368f92c4a5a898360b624510dcdc6807a8b8 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Wed, 5 Jun 2019 13:04:36 +0200 Subject: [PATCH 20/34] prelude Signed-off-by: Yoshua Wuyts --- src/lib.rs | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/src/lib.rs b/src/lib.rs index 40f6d57f..203fdedc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -98,6 +98,24 @@ pub mod net; pub mod task; pub mod time; +/// The Runtime Prelude. +/// +/// Rust comes with a variety of things in its standard library. However, Runtime and Futures +/// provide new functionality outside of it. We want to make Runtime feel as close to standard Rust +/// as possible. We care deeply about usability. +/// +/// The _prelude_ is the list of things we recommend importing into Runtime programs. It's kept as +/// small as possible, and is focused on things, particularly traits. +/// +/// To use the `prelude` do: +/// ``` +/// use runtime::prelude::*; +/// ``` +pub mod prelude { + #[doc(inline)] + pub use super::time::FutureExt; +} + #[doc(inline)] pub use task::spawn; From 8ae5b06fe612fa1aa78f441b3b6126965d30a709 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Wed, 5 Jun 2019 13:39:13 +0200 Subject: [PATCH 21/34] impl ext Signed-off-by: Yoshua Wuyts --- Cargo.toml | 1 + src/time/ext.rs | 64 ++++++++++++++++++++++++++++++++----------------- 2 files changed, 43 insertions(+), 22 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d6f7ad5e..c1b1268d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,7 @@ futures-preview = "0.3.0-alpha.16" runtime-attributes = { path = "runtime-attributes", version = "0.3.0-alpha.4" } runtime-raw = { path = "runtime-raw", version = "0.3.0-alpha.3" } runtime-native = { path = "runtime-native", version = "0.3.0-alpha.3", optional = true } +pin-utils = "0.1.0-alpha.4" [dev-dependencies] failure = "0.1.5" diff --git a/src/time/ext.rs b/src/time/ext.rs index b981f16a..01a678e9 100644 --- a/src/time/ext.rs +++ b/src/time/ext.rs @@ -1,54 +1,74 @@ //! Extensions for Futures types. use std::error::Error; -use std::fmt::{self, Debug, Display}; +use std::fmt; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::{Duration, Instant}; +use super::Delay; + /// The Future returned from [`FutureExt`]. /// /// [`FutureExt.timeout`]: trait.FutureExt.html #[derive(Debug)] -pub struct Timeout { - _phantom: std::marker::PhantomData, +pub struct Timeout { + future: F, + delay: Delay, +} + +impl Timeout { + pin_utils::unsafe_pinned!(future: F); + pin_utils::unsafe_pinned!(delay: Delay); } -impl Future for Timeout { - type Output = Result<::Output, TimeoutError>; - fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { - unimplemented!(); +impl Future for Timeout { + type Output = Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.as_mut().future().poll(cx) { + Poll::Pending => {} + Poll::Ready(t) => return Poll::Ready(Ok(t)), + } + + if self.as_mut().poll(cx).is_ready() { + let err = Err(TimeoutError(Instant::now())); + Poll::Ready(err) + } else { + Poll::Pending + } } } /// The Error returned from [`Timeout`]. /// /// [`Timeout`]: struct.Timeout.html +#[derive(Debug)] pub struct TimeoutError(pub Instant); impl Error for TimeoutError {} -impl Debug for TimeoutError { - fn fmt(&self, _f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { - unimplemented!(); - } -} - -impl Display for TimeoutError { - fn fmt(&self, _f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { - unimplemented!(); +impl fmt::Display for TimeoutError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { + fmt::Debug::fmt(self, f) } } /// Extend `Future` with methods to time out execution. pub trait FutureExt: Future + Sized { - /// Timeout the future if it isn't completed after `dur` duration. - fn timeout(self, _dur: Duration) -> Timeout { - unimplemented!(); + /// Time out the future if it isn't completed after `dur` duration. + fn timeout(self, dur: Duration) -> Timeout { + Timeout { + delay: Delay::new(dur), + future: self, + } } - /// Timeout the future if it isn't completed by `at`. - fn deadline(self, _at: Instant) -> Timeout { - unimplemented!(); + /// Time out the future if it isn't completed before `at`. + fn timeout_at(self, at: Instant) -> Timeout { + Timeout { + delay: Delay::new_at(at), + future: self, + } } } From a45fd174d9ef3c7b005d43c0b9853ca69cd25a48 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Wed, 5 Jun 2019 14:11:37 +0200 Subject: [PATCH 22/34] finish ext Signed-off-by: Yoshua Wuyts --- src/time.rs | 2 +- src/time/ext.rs | 69 +++++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 68 insertions(+), 3 deletions(-) diff --git a/src/time.rs b/src/time.rs index 9560d0b8..102bb1e4 100644 --- a/src/time.rs +++ b/src/time.rs @@ -1,4 +1,4 @@ -//! Types and Functions for temporal manipulation. +//! Types and Functions for time-related operations. //! //! This module provides primitives for setting asynchronous timeouts, intervals, and delays. //! diff --git a/src/time/ext.rs b/src/time/ext.rs index 01a678e9..d821c420 100644 --- a/src/time/ext.rs +++ b/src/time/ext.rs @@ -56,7 +56,39 @@ impl fmt::Display for TimeoutError { /// Extend `Future` with methods to time out execution. pub trait FutureExt: Future + Sized { - /// Time out the future if it isn't completed after `dur` duration. + /// Creates a new future which will take at most `dur` time to resolve from + /// the point at which this method is called. + /// + /// This combinator creates a new future which wraps the receiving future + /// in a timeout. The future returned will resolve in at most `dur` time + /// specified (relative to when this function is called). + /// + /// If the future completes before `dur` elapses then the future will + /// resolve with that item. Otherwise the future will resolve to an error + /// once `dur` has elapsed. + /// + /// # Examples + /// ``` + /// # #![feature(async_await)] + /// # use futures::prelude::*; + /// use std::time::Duration; + /// use runtime::time::FutureExt; + /// + /// # fn long_future() -> impl Future> { + /// # futures::future::ok(()) + /// # } + /// # + /// #[runtime::main] + /// async fn main() { + /// let future = long_future(); + /// let timed_out = future.timeout(Duration::from_millis(100)); + /// + /// match timed_out.await { + /// Ok(item) => println!("got {:?} within enough time!", item), + /// Err(_) => println!("took too long to produce the item"), + /// } + /// } + /// ``` fn timeout(self, dur: Duration) -> Timeout { Timeout { delay: Delay::new(dur), @@ -64,7 +96,38 @@ pub trait FutureExt: Future + Sized { } } - /// Time out the future if it isn't completed before `at`. + /// Creates a new future which will resolve no later than `at` specified. + /// + /// This method is otherwise equivalent to the [`timeout`] method except that + /// it tweaks the moment at when the timeout elapsed to being specified with + /// an absolute value rather than a relative one. For more documentation see + /// the [`timeout`] method. + /// + /// [`timeout`]: trait.FutureExt.html#method.timeout + /// + /// # Examples + /// ``` + /// # #![feature(async_await)] + /// # use futures::prelude::*; + /// use std::time::{Duration, Instant}; + /// use runtime::time::FutureExt; + /// + /// # fn long_future() -> impl Future> { + /// # futures::future::ok(()) + /// # } + /// # + /// #[runtime::main] + /// async fn main() { + /// let future = long_future(); + /// let at = Instant::now() + Duration::from_millis(100); + /// let timed_out = future.timeout_at(at); + /// + /// match timed_out.await { + /// Ok(item) => println!("got {:?} within enough time!", item), + /// Err(_) => println!("took too long to produce the item"), + /// } + /// } + /// ``` fn timeout_at(self, at: Instant) -> Timeout { Timeout { delay: Delay::new_at(at), @@ -72,3 +135,5 @@ pub trait FutureExt: Future + Sized { } } } + +impl FutureExt for T {} From 785760720f193987e61376d392b0f679d5e23a54 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Wed, 5 Jun 2019 14:44:07 +0200 Subject: [PATCH 23/34] streamext Signed-off-by: Yoshua Wuyts --- src/lib.rs | 2 ++ src/time/ext.rs | 92 +++++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 92 insertions(+), 2 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 203fdedc..bf8d11d6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -114,6 +114,8 @@ pub mod time; pub mod prelude { #[doc(inline)] pub use super::time::FutureExt; + #[doc(inline)] + pub use super::time::StreamExt; } #[doc(inline)] diff --git a/src/time/ext.rs b/src/time/ext.rs index d821c420..fc6cfe67 100644 --- a/src/time/ext.rs +++ b/src/time/ext.rs @@ -7,9 +7,11 @@ use std::pin::Pin; use std::task::{Context, Poll}; use std::time::{Duration, Instant}; +use futures::Stream; + use super::Delay; -/// The Future returned from [`FutureExt`]. +/// A future returned by methods in the [`FutureExt`] trait. /// /// [`FutureExt.timeout`]: trait.FutureExt.html #[derive(Debug)] @@ -41,9 +43,10 @@ impl Future for Timeout { } } -/// The Error returned from [`Timeout`]. +/// An error returned from [`Timeout`] and [`TimeoutStream`]. /// /// [`Timeout`]: struct.Timeout.html +/// [`TimeoutStream`]: struct.TimeoutStream.html #[derive(Debug)] pub struct TimeoutError(pub Instant); impl Error for TimeoutError {} @@ -137,3 +140,88 @@ pub trait FutureExt: Future + Sized { } impl FutureExt for T {} + +/// Extend `Stream` with methods to time out execution. +pub trait StreamExt: Stream + Sized { + /// Creates a new stream which will take at most `dur` time to yield each + /// item of the stream. + /// + /// This combinator creates a new stream which wraps the receiving stream + /// in a timeout-per-item. The stream returned will resolve in at most + /// `dur` time for each item yielded from the stream. The first item's timer + /// starts when this method is called. + /// + /// If a stream's item completes before `dur` elapses then the timer will be + /// reset for the next item. If the timeout elapses, however, then an error + /// will be yielded on the stream and the timer will be reset. + /// + /// ## Examples + /// + /// ``` + /// # #![feature(async_await)] + /// # #[runtime::main] + /// # async fn main() { + /// # use futures::prelude::*; + /// use runtime::time::Interval; + /// use std::time::{Duration, Instant}; + /// + /// let start = Instant::now(); + /// + /// let mut interval = Interval::new(Duration::from_millis(100)).take(2); + /// while let Some(now) = interval.next().await { + /// let elapsed = now - start; + /// println!("elapsed: {}s", elapsed.as_secs()); + /// } + /// # } + /// ``` + fn timeout(self, dur: Duration) -> TimeoutStream { + TimeoutStream { + timeout: Delay::new(dur), + dur, + stream: self, + } + } +} + +impl StreamExt for S {} + +/// A stream returned by methods in the [`StreamExt`] trait. +/// +/// [`StreamExt`]: trait.StreamExt.html +#[derive(Debug)] +pub struct TimeoutStream { + timeout: Delay, + dur: Duration, + stream: S, +} + +impl TimeoutStream { + pin_utils::unsafe_pinned!(timeout: Delay); + pin_utils::unsafe_pinned!(stream: S); +} + +impl Stream for TimeoutStream { + type Item = Result; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + match self.as_mut().stream().poll_next(cx) { + Poll::Pending => {} + Poll::Ready(s) => { + *self.as_mut().timeout() = Delay::new(self.dur); + let res = Ok(s).transpose(); + return Poll::Ready(res); + } + } + + if self.as_mut().timeout().poll(cx).is_ready() { + *self.as_mut().timeout() = Delay::new(self.dur); + let err = Some(Err(TimeoutError(Instant::now()))); + Poll::Ready(err) + } else { + Poll::Pending + } + } +} From 9d00769b236362cc2a1e9f565f1cae8ce7876901 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Wed, 5 Jun 2019 14:46:30 +0200 Subject: [PATCH 24/34] cargo fmt Signed-off-by: Yoshua Wuyts --- src/time.rs | 4 ++-- src/time/ext.rs | 5 +---- src/time/interval.rs | 2 +- 3 files changed, 4 insertions(+), 7 deletions(-) diff --git a/src/time.rs b/src/time.rs index 102bb1e4..0cd903e1 100644 --- a/src/time.rs +++ b/src/time.rs @@ -49,9 +49,9 @@ //! ``` mod delay; -mod interval; mod ext; +mod interval; pub use delay::*; -pub use interval::*; pub use ext::*; +pub use interval::*; diff --git a/src/time/ext.rs b/src/time/ext.rs index fc6cfe67..a3b0c8fb 100644 --- a/src/time/ext.rs +++ b/src/time/ext.rs @@ -203,10 +203,7 @@ impl TimeoutStream { impl Stream for TimeoutStream { type Item = Result; - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match self.as_mut().stream().poll_next(cx) { Poll::Pending => {} Poll::Ready(s) => { diff --git a/src/time/interval.rs b/src/time/interval.rs index 5db84f45..e6ad8475 100644 --- a/src/time/interval.rs +++ b/src/time/interval.rs @@ -1,9 +1,9 @@ use futures::prelude::*; +use std::fmt; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::{Duration, Instant}; -use std::fmt; /// A stream representing notifications at a fixed interval. #[must_use = "streams do nothing unless polled"] From 70036d4d4b06045870727f0fd111a501109843ac Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Wed, 5 Jun 2019 20:33:03 +0200 Subject: [PATCH 25/34] AsyncReadExt Signed-off-by: Yoshua Wuyts --- src/lib.rs | 2 + src/time/ext.rs | 121 ++++++++++++++++++++++++++++++++++++++---------- 2 files changed, 99 insertions(+), 24 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index bf8d11d6..b2c33079 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -116,6 +116,8 @@ pub mod prelude { pub use super::time::FutureExt; #[doc(inline)] pub use super::time::StreamExt; + #[doc(inline)] + pub use super::time::AsyncReadExt; } #[doc(inline)] diff --git a/src/time/ext.rs b/src/time/ext.rs index a3b0c8fb..9ee7a56c 100644 --- a/src/time/ext.rs +++ b/src/time/ext.rs @@ -1,13 +1,12 @@ //! Extensions for Futures types. -use std::error::Error; -use std::fmt; use std::future::Future; +use std::io; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::{Duration, Instant}; -use futures::Stream; +use futures::{AsyncRead, Stream}; use super::Delay; @@ -26,7 +25,7 @@ impl Timeout { } impl Future for Timeout { - type Output = Result; + type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { match self.as_mut().future().poll(cx) { @@ -35,7 +34,7 @@ impl Future for Timeout { } if self.as_mut().poll(cx).is_ready() { - let err = Err(TimeoutError(Instant::now())); + let err = Err(io::Error::new(io::ErrorKind::TimedOut, "future timed out").into()); Poll::Ready(err) } else { Poll::Pending @@ -43,20 +42,6 @@ impl Future for Timeout { } } -/// An error returned from [`Timeout`] and [`TimeoutStream`]. -/// -/// [`Timeout`]: struct.Timeout.html -/// [`TimeoutStream`]: struct.TimeoutStream.html -#[derive(Debug)] -pub struct TimeoutError(pub Instant); -impl Error for TimeoutError {} - -impl fmt::Display for TimeoutError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { - fmt::Debug::fmt(self, f) - } -} - /// Extend `Future` with methods to time out execution. pub trait FutureExt: Future + Sized { /// Creates a new future which will take at most `dur` time to resolve from @@ -166,9 +151,10 @@ pub trait StreamExt: Stream + Sized { /// use std::time::{Duration, Instant}; /// /// let start = Instant::now(); + /// let timeout = Duration::from_millis(150); /// /// let mut interval = Interval::new(Duration::from_millis(100)).take(2); - /// while let Some(now) = interval.next().await { + /// while let Some(now) = interval.next().timeout(timeout).await { /// let elapsed = now - start; /// println!("elapsed: {}s", elapsed.as_secs()); /// } @@ -183,8 +169,6 @@ pub trait StreamExt: Stream + Sized { } } -impl StreamExt for S {} - /// A stream returned by methods in the [`StreamExt`] trait. /// /// [`StreamExt`]: trait.StreamExt.html @@ -201,7 +185,7 @@ impl TimeoutStream { } impl Stream for TimeoutStream { - type Item = Result; + type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match self.as_mut().stream().poll_next(cx) { @@ -215,10 +199,99 @@ impl Stream for TimeoutStream { if self.as_mut().timeout().poll(cx).is_ready() { *self.as_mut().timeout() = Delay::new(self.dur); - let err = Some(Err(TimeoutError(Instant::now()))); + let err = Err(io::Error::new(io::ErrorKind::TimedOut, "future timed out").into()); + Poll::Ready(Some(err)) + } else { + Poll::Pending + } + } +} + +impl StreamExt for S {} + +/// Extend `AsyncRead` with methods to time out execution. +pub trait AsyncReadExt: AsyncRead + Sized { + /// Creates a new stream which will take at most `dur` time to yield each + /// item of the stream. + /// + /// This combinator creates a new stream which wraps the receiving stream + /// in a timeout-per-item. The stream returned will resolve in at most + /// `dur` time for each item yielded from the stream. The first item's timer + /// starts when this method is called. + /// + /// If a stream's item completes before `dur` elapses then the timer will be + /// reset for the next item. If the timeout elapses, however, then an error + /// will be yielded on the stream and the timer will be reset. + /// + /// ## Examples + /// + /// ```no_run + /// # #![feature(async_await)] + /// # #[runtime::main] + /// # async fn work () -> Result<(), Box> { + /// # use futures::prelude::*; + /// use runtime::time::Interval; + /// use runtime::net::TcpListener; + /// use std::time::{Duration, Instant}; + /// + /// let start = Instant::now(); + /// + /// let mut listener = TcpListener::bind("127.0.0.1:0")?; + /// let mut incoming = listener.incoming(); + /// while let Some(stream) = incoming.next().await { + /// match stream { + /// Ok(stream) => println!("new client!"), + /// Err(e) => { /* connection failed */ } + /// } + /// } + /// # Ok(())} + /// ``` + fn timeout(self, dur: Duration) -> TimeoutAsyncRead { + TimeoutAsyncRead { + timeout: Delay::new(dur), + dur, + stream: self, + } + } +} + +/// A stream returned by methods in the [`StreamExt`] trait. +/// +/// [`StreamExt`]: trait.StreamExt.html +#[derive(Debug)] +pub struct TimeoutAsyncRead { + timeout: Delay, + dur: Duration, + stream: S, +} + +impl TimeoutAsyncRead { + pin_utils::unsafe_pinned!(timeout: Delay); + pin_utils::unsafe_pinned!(stream: S); +} + +impl AsyncRead for TimeoutAsyncRead { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + match self.as_mut().stream().poll_read(cx, buf) { + Poll::Pending => {} + Poll::Ready(s) => { + *self.as_mut().timeout() = Delay::new(self.dur); + return Poll::Ready(s); + } + } + + if self.as_mut().timeout().poll(cx).is_ready() { + *self.as_mut().timeout() = Delay::new(self.dur); + let err = Err(io::Error::new(io::ErrorKind::TimedOut, "future timed out").into()); Poll::Ready(err) } else { Poll::Pending } } } + +impl AsyncReadExt for S {} From 7a664760b3e2a996908a5d52fa0aeda66b79db9d Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Wed, 5 Jun 2019 21:02:43 +0200 Subject: [PATCH 26/34] finish tokio + better docs Signed-off-by: Yoshua Wuyts --- runtime-tokio/src/lib.rs | 15 +++++++++------ src/time.rs | 4 ++-- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/runtime-tokio/src/lib.rs b/runtime-tokio/src/lib.rs index 93d09536..d8ff1d14 100644 --- a/runtime-tokio/src/lib.rs +++ b/runtime-tokio/src/lib.rs @@ -83,16 +83,19 @@ impl runtime_raw::Runtime for Tokio { Ok(Box::pin(UdpSocket { tokio_socket })) } - fn new_delay(&self, _dur: Duration) -> Pin> { - panic!("Timers are currently not supported in runtime-tokio"); + fn new_delay(&self, dur: Duration) -> Pin> { + let tokio_delay = TokioDelay::new(Instant::now() + dur); + Box::pin(Delay { tokio_delay }) } - fn new_delay_at(&self, _at: Instant) -> Pin> { - panic!("Timers are currently not supported in runtime-tokio"); + fn new_delay_at(&self, at: Instant) -> Pin> { + let tokio_delay = TokioDelay::new(at); + Box::pin(Delay { tokio_delay }) } - fn new_interval(&self, _dur: Duration) -> Pin> { - panic!("Timers are currently not supported in runtime-tokio"); + fn new_interval(&self, dur: Duration) -> Pin> { + let tokio_interval = TokioInterval::new(Instant::now(), dur); + Box::pin(Interval { tokio_interval }) } } diff --git a/src/time.rs b/src/time.rs index 0cd903e1..b4e2a9b8 100644 --- a/src/time.rs +++ b/src/time.rs @@ -13,7 +13,7 @@ //! [`FutureExt`]: trait.FutureExt.html //! //! ## Examples -//! __Schedule a three-second delay__ +//! __Delay execution for three seconds__ //! ```no_run //! # #![feature(async_await)] //! # #[runtime::main] @@ -29,7 +29,7 @@ //! # } //! ``` //! -//! __Schedule a two-second interval__ +//! __Emit an event every two seconds__ //! ```no_run //! # #![feature(async_await)] //! # #[runtime::main] From d56866af7460c0ecfdc16065431116751dec1dcc Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Wed, 5 Jun 2019 21:57:19 +0200 Subject: [PATCH 27/34] fix trait name conflicts Signed-off-by: Yoshua Wuyts --- src/lib.rs | 6 +- src/time/ext.rs | 155 ++++++++++++++++++++++++------------------------ 2 files changed, 81 insertions(+), 80 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index b2c33079..6c7abe84 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -113,11 +113,11 @@ pub mod time; /// ``` pub mod prelude { #[doc(inline)] - pub use super::time::FutureExt; + pub use super::time::AsyncReadTimeExt; #[doc(inline)] - pub use super::time::StreamExt; + pub use super::time::FutureTimeExt; #[doc(inline)] - pub use super::time::AsyncReadExt; + pub use super::time::StreamTimeExt; } #[doc(inline)] diff --git a/src/time/ext.rs b/src/time/ext.rs index 9ee7a56c..5e9b8bfd 100644 --- a/src/time/ext.rs +++ b/src/time/ext.rs @@ -10,9 +10,9 @@ use futures::{AsyncRead, Stream}; use super::Delay; -/// A future returned by methods in the [`FutureExt`] trait. +/// A future returned by methods in the [`FutureTimeExt`] trait. /// -/// [`FutureExt.timeout`]: trait.FutureExt.html +/// [`FutureTimeExt.timeout`]: trait.FutureTimeExt.html #[derive(Debug)] pub struct Timeout { future: F, @@ -43,7 +43,7 @@ impl Future for Timeout { } /// Extend `Future` with methods to time out execution. -pub trait FutureExt: Future + Sized { +pub trait FutureTimeExt: Future + Sized { /// Creates a new future which will take at most `dur` time to resolve from /// the point at which this method is called. /// @@ -58,9 +58,9 @@ pub trait FutureExt: Future + Sized { /// # Examples /// ``` /// # #![feature(async_await)] - /// # use futures::prelude::*; + /// use futures::prelude::*; + /// use runtime::prelude::*; /// use std::time::Duration; - /// use runtime::time::FutureExt; /// /// # fn long_future() -> impl Future> { /// # futures::future::ok(()) @@ -91,14 +91,14 @@ pub trait FutureExt: Future + Sized { /// an absolute value rather than a relative one. For more documentation see /// the [`timeout`] method. /// - /// [`timeout`]: trait.FutureExt.html#method.timeout + /// [`timeout`]: trait.FutureTimeExt.html#method.timeout /// /// # Examples /// ``` /// # #![feature(async_await)] - /// # use futures::prelude::*; + /// use futures::prelude::*; + /// use runtime::prelude::*; /// use std::time::{Duration, Instant}; - /// use runtime::time::FutureExt; /// /// # fn long_future() -> impl Future> { /// # futures::future::ok(()) @@ -124,54 +124,11 @@ pub trait FutureExt: Future + Sized { } } -impl FutureExt for T {} +impl FutureTimeExt for T {} -/// Extend `Stream` with methods to time out execution. -pub trait StreamExt: Stream + Sized { - /// Creates a new stream which will take at most `dur` time to yield each - /// item of the stream. - /// - /// This combinator creates a new stream which wraps the receiving stream - /// in a timeout-per-item. The stream returned will resolve in at most - /// `dur` time for each item yielded from the stream. The first item's timer - /// starts when this method is called. - /// - /// If a stream's item completes before `dur` elapses then the timer will be - /// reset for the next item. If the timeout elapses, however, then an error - /// will be yielded on the stream and the timer will be reset. - /// - /// ## Examples - /// - /// ``` - /// # #![feature(async_await)] - /// # #[runtime::main] - /// # async fn main() { - /// # use futures::prelude::*; - /// use runtime::time::Interval; - /// use std::time::{Duration, Instant}; - /// - /// let start = Instant::now(); - /// let timeout = Duration::from_millis(150); - /// - /// let mut interval = Interval::new(Duration::from_millis(100)).take(2); - /// while let Some(now) = interval.next().timeout(timeout).await { - /// let elapsed = now - start; - /// println!("elapsed: {}s", elapsed.as_secs()); - /// } - /// # } - /// ``` - fn timeout(self, dur: Duration) -> TimeoutStream { - TimeoutStream { - timeout: Delay::new(dur), - dur, - stream: self, - } - } -} - -/// A stream returned by methods in the [`StreamExt`] trait. +/// A stream returned by methods in the [`StreamTimeExt`] trait. /// -/// [`StreamExt`]: trait.StreamExt.html +/// [`StreamTimeExt`]: trait.StreamTimeExt.html #[derive(Debug)] pub struct TimeoutStream { timeout: Delay, @@ -207,10 +164,8 @@ impl Stream for TimeoutStream { } } -impl StreamExt for S {} - -/// Extend `AsyncRead` with methods to time out execution. -pub trait AsyncReadExt: AsyncRead + Sized { +/// Extend `Stream` with methods to time out execution. +pub trait StreamTimeExt: Stream + Sized { /// Creates a new stream which will take at most `dur` time to yield each /// item of the stream. /// @@ -224,30 +179,28 @@ pub trait AsyncReadExt: AsyncRead + Sized { /// will be yielded on the stream and the timer will be reset. /// /// ## Examples - /// - /// ```no_run + /// ``` /// # #![feature(async_await)] - /// # #[runtime::main] - /// # async fn work () -> Result<(), Box> { /// # use futures::prelude::*; - /// use runtime::time::Interval; - /// use runtime::net::TcpListener; + /// use runtime::time::{Interval, StreamTimeExt as _}; /// use std::time::{Duration, Instant}; /// + /// # #[runtime::main] + /// # async fn main () -> Result<(), Box> { /// let start = Instant::now(); - /// - /// let mut listener = TcpListener::bind("127.0.0.1:0")?; - /// let mut incoming = listener.incoming(); - /// while let Some(stream) = incoming.next().await { - /// match stream { - /// Ok(stream) => println!("new client!"), - /// Err(e) => { /* connection failed */ } - /// } + /// let timeout = Duration::from_millis(15); + /// let mut interval = Interval::new(Duration::from_millis(10)) + /// .take(3) + /// .timeout(timeout); + /// while let Some(now) = interval.next().await { + /// println!("{}ms have elapsed", (now? - start).as_millis()); /// } + /// + /// assert!(Instant::now() - start >= Duration::from_millis(30)); /// # Ok(())} /// ``` - fn timeout(self, dur: Duration) -> TimeoutAsyncRead { - TimeoutAsyncRead { + fn timeout(self, dur: Duration) -> TimeoutStream { + TimeoutStream { timeout: Delay::new(dur), dur, stream: self, @@ -255,9 +208,11 @@ pub trait AsyncReadExt: AsyncRead + Sized { } } -/// A stream returned by methods in the [`StreamExt`] trait. +impl StreamTimeExt for S {} + +/// A stream returned by methods in the [`StreamTimeExt`] trait. /// -/// [`StreamExt`]: trait.StreamExt.html +/// [`StreamTimeExt`]: trait.StreamTimeExt.html #[derive(Debug)] pub struct TimeoutAsyncRead { timeout: Delay, @@ -294,4 +249,50 @@ impl AsyncRead for TimeoutAsyncRead { } } -impl AsyncReadExt for S {} +/// Extend `AsyncRead` with methods to time out execution. +pub trait AsyncReadTimeExt: AsyncRead + Sized { + /// Creates a new stream which will take at most `dur` time to yield each + /// item of the stream. + /// + /// This combinator creates a new stream which wraps the receiving stream + /// in a timeout-per-item. The stream returned will resolve in at most + /// `dur` time for each item yielded from the stream. The first item's timer + /// starts when this method is called. + /// + /// If a stream's item completes before `dur` elapses then the timer will be + /// reset for the next item. If the timeout elapses, however, then an error + /// will be yielded on the stream and the timer will be reset. + /// + /// ## Examples + /// + /// ```no_run + /// # #![feature(async_await)] + /// # #[runtime::main] + /// # async fn main () -> Result<(), Box> { + /// use futures::prelude::*; + /// use runtime::prelude::*; + /// use runtime::net::TcpListener; + /// use std::time::{Duration, Instant}; + /// + /// let start = Instant::now(); + /// + /// let mut listener = TcpListener::bind("127.0.0.1:0")?; + /// let mut incoming = listener.incoming(); + /// while let Some(stream) = incoming.next().await { + /// match stream { + /// Ok(stream) => println!("new client!"), + /// Err(e) => { /* connection failed */ } + /// } + /// } + /// # Ok(())} + /// ``` + fn timeout(self, dur: Duration) -> TimeoutAsyncRead { + TimeoutAsyncRead { + timeout: Delay::new(dur), + dur, + stream: self, + } + } +} + +impl AsyncReadTimeExt for S {} From 55a4f650c3bd1754a570227049fdc4ba7d816fbc Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Wed, 5 Jun 2019 21:58:26 +0200 Subject: [PATCH 28/34] fix all docs Signed-off-by: Yoshua Wuyts --- src/time/ext.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/time/ext.rs b/src/time/ext.rs index 5e9b8bfd..ce8a65ac 100644 --- a/src/time/ext.rs +++ b/src/time/ext.rs @@ -188,10 +188,9 @@ pub trait StreamTimeExt: Stream + Sized { /// # #[runtime::main] /// # async fn main () -> Result<(), Box> { /// let start = Instant::now(); - /// let timeout = Duration::from_millis(15); /// let mut interval = Interval::new(Duration::from_millis(10)) /// .take(3) - /// .timeout(timeout); + /// .timeout(Duration::from_millis(15)); /// while let Some(now) = interval.next().await { /// println!("{}ms have elapsed", (now? - start).as_millis()); /// } @@ -277,7 +276,8 @@ pub trait AsyncReadTimeExt: AsyncRead + Sized { /// let start = Instant::now(); /// /// let mut listener = TcpListener::bind("127.0.0.1:0")?; - /// let mut incoming = listener.incoming(); + /// let mut incoming = listener.incoming() + /// .timeout(Duration::from_millis(100)); /// while let Some(stream) = incoming.next().await { /// match stream { /// Ok(stream) => println!("new client!"), From 396feae6ac5cb88f597c30598de4ff375b5865e6 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Wed, 5 Jun 2019 22:22:02 +0200 Subject: [PATCH 29/34] fix async-read example Signed-off-by: Yoshua Wuyts --- src/time/ext.rs | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/src/time/ext.rs b/src/time/ext.rs index ce8a65ac..834ebb40 100644 --- a/src/time/ext.rs +++ b/src/time/ext.rs @@ -270,20 +270,13 @@ pub trait AsyncReadTimeExt: AsyncRead + Sized { /// # async fn main () -> Result<(), Box> { /// use futures::prelude::*; /// use runtime::prelude::*; - /// use runtime::net::TcpListener; + /// use runtime::net::TcpStream; /// use std::time::{Duration, Instant}; /// /// let start = Instant::now(); /// - /// let mut listener = TcpListener::bind("127.0.0.1:0")?; - /// let mut incoming = listener.incoming() - /// .timeout(Duration::from_millis(100)); - /// while let Some(stream) = incoming.next().await { - /// match stream { - /// Ok(stream) => println!("new client!"), - /// Err(e) => { /* connection failed */ } - /// } - /// } + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; + /// let _stream = stream.timeout(Duration::from_millis(100)); /// # Ok(())} /// ``` fn timeout(self, dur: Duration) -> TimeoutAsyncRead { From ff212ff7060c456dbdc9ed3b80d0ccf38c40815b Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Wed, 5 Jun 2019 22:23:36 +0200 Subject: [PATCH 30/34] fix prelude Signed-off-by: Yoshua Wuyts --- src/lib.rs | 6 +++--- src/time/ext.rs | 28 ++++++++++++++-------------- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 6c7abe84..02ea90db 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -113,11 +113,11 @@ pub mod time; /// ``` pub mod prelude { #[doc(inline)] - pub use super::time::AsyncReadTimeExt; + pub use super::time::AsyncReadExt as _; #[doc(inline)] - pub use super::time::FutureTimeExt; + pub use super::time::FutureExt as _; #[doc(inline)] - pub use super::time::StreamTimeExt; + pub use super::time::StreamExt as _; } #[doc(inline)] diff --git a/src/time/ext.rs b/src/time/ext.rs index 834ebb40..d92c6b45 100644 --- a/src/time/ext.rs +++ b/src/time/ext.rs @@ -10,9 +10,9 @@ use futures::{AsyncRead, Stream}; use super::Delay; -/// A future returned by methods in the [`FutureTimeExt`] trait. +/// A future returned by methods in the [`FutureExt`] trait. /// -/// [`FutureTimeExt.timeout`]: trait.FutureTimeExt.html +/// [`FutureExt.timeout`]: trait.FutureExt.html #[derive(Debug)] pub struct Timeout { future: F, @@ -43,7 +43,7 @@ impl Future for Timeout { } /// Extend `Future` with methods to time out execution. -pub trait FutureTimeExt: Future + Sized { +pub trait FutureExt: Future + Sized { /// Creates a new future which will take at most `dur` time to resolve from /// the point at which this method is called. /// @@ -91,7 +91,7 @@ pub trait FutureTimeExt: Future + Sized { /// an absolute value rather than a relative one. For more documentation see /// the [`timeout`] method. /// - /// [`timeout`]: trait.FutureTimeExt.html#method.timeout + /// [`timeout`]: trait.FutureExt.html#method.timeout /// /// # Examples /// ``` @@ -124,11 +124,11 @@ pub trait FutureTimeExt: Future + Sized { } } -impl FutureTimeExt for T {} +impl FutureExt for T {} -/// A stream returned by methods in the [`StreamTimeExt`] trait. +/// A stream returned by methods in the [`StreamExt`] trait. /// -/// [`StreamTimeExt`]: trait.StreamTimeExt.html +/// [`StreamExt`]: trait.StreamExt.html #[derive(Debug)] pub struct TimeoutStream { timeout: Delay, @@ -165,7 +165,7 @@ impl Stream for TimeoutStream { } /// Extend `Stream` with methods to time out execution. -pub trait StreamTimeExt: Stream + Sized { +pub trait StreamExt: Stream + Sized { /// Creates a new stream which will take at most `dur` time to yield each /// item of the stream. /// @@ -182,7 +182,7 @@ pub trait StreamTimeExt: Stream + Sized { /// ``` /// # #![feature(async_await)] /// # use futures::prelude::*; - /// use runtime::time::{Interval, StreamTimeExt as _}; + /// use runtime::time::{Interval, StreamExt as _}; /// use std::time::{Duration, Instant}; /// /// # #[runtime::main] @@ -207,11 +207,11 @@ pub trait StreamTimeExt: Stream + Sized { } } -impl StreamTimeExt for S {} +impl StreamExt for S {} -/// A stream returned by methods in the [`StreamTimeExt`] trait. +/// A stream returned by methods in the [`StreamExt`] trait. /// -/// [`StreamTimeExt`]: trait.StreamTimeExt.html +/// [`StreamExt`]: trait.StreamExt.html #[derive(Debug)] pub struct TimeoutAsyncRead { timeout: Delay, @@ -249,7 +249,7 @@ impl AsyncRead for TimeoutAsyncRead { } /// Extend `AsyncRead` with methods to time out execution. -pub trait AsyncReadTimeExt: AsyncRead + Sized { +pub trait AsyncReadExt: AsyncRead + Sized { /// Creates a new stream which will take at most `dur` time to yield each /// item of the stream. /// @@ -288,4 +288,4 @@ pub trait AsyncReadTimeExt: AsyncRead + Sized { } } -impl AsyncReadTimeExt for S {} +impl AsyncReadExt for S {} From 5f32ab525ffe0ddd51a5d182fa845c572f3bcaf5 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Wed, 5 Jun 2019 22:50:21 +0200 Subject: [PATCH 31/34] remove pin-utils Signed-off-by: Yoshua Wuyts --- src/time/ext.rs | 92 +++++++++++++++++-------------------------------- 1 file changed, 32 insertions(+), 60 deletions(-) diff --git a/src/time/ext.rs b/src/time/ext.rs index d92c6b45..a370ad73 100644 --- a/src/time/ext.rs +++ b/src/time/ext.rs @@ -2,6 +2,7 @@ use std::future::Future; use std::io; +use std::marker::Unpin; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::{Duration, Instant}; @@ -19,26 +20,17 @@ pub struct Timeout { delay: Delay, } -impl Timeout { - pin_utils::unsafe_pinned!(future: F); - pin_utils::unsafe_pinned!(delay: Delay); -} - -impl Future for Timeout { +impl Future for Timeout { type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.as_mut().future().poll(cx) { - Poll::Pending => {} - Poll::Ready(t) => return Poll::Ready(Ok(t)), + if let Poll::Ready(t) = Pin::new(&mut self.future).poll(cx) { + return Poll::Ready(Ok(t)); } - if self.as_mut().poll(cx).is_ready() { - let err = Err(io::Error::new(io::ErrorKind::TimedOut, "future timed out").into()); - Poll::Ready(err) - } else { - Poll::Pending - } + self.as_mut() + .poll(cx) + .map(|_| Err(io::Error::new(io::ErrorKind::TimedOut, "future timed out"))) } } @@ -130,42 +122,33 @@ impl FutureExt for T {} /// /// [`StreamExt`]: trait.StreamExt.html #[derive(Debug)] -pub struct TimeoutStream { +pub struct TimeoutStream { timeout: Delay, dur: Duration, stream: S, } -impl TimeoutStream { - pin_utils::unsafe_pinned!(timeout: Delay); - pin_utils::unsafe_pinned!(stream: S); -} - -impl Stream for TimeoutStream { +impl Stream for TimeoutStream { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match self.as_mut().stream().poll_next(cx) { - Poll::Pending => {} - Poll::Ready(s) => { - *self.as_mut().timeout() = Delay::new(self.dur); - let res = Ok(s).transpose(); - return Poll::Ready(res); - } + if let Poll::Ready(s) = Pin::new(&mut self.stream).poll_next(cx) { + self.timeout = Delay::new(self.dur); + return Poll::Ready(Ok(s).transpose()); } - if self.as_mut().timeout().poll(cx).is_ready() { - *self.as_mut().timeout() = Delay::new(self.dur); - let err = Err(io::Error::new(io::ErrorKind::TimedOut, "future timed out").into()); - Poll::Ready(Some(err)) - } else { - Poll::Pending - } + Pin::new(&mut self.timeout).poll(cx).map(|_| { + self.timeout = Delay::new(self.dur); + Some(Err(io::Error::new( + io::ErrorKind::TimedOut, + "future timed out", + ))) + }) } } /// Extend `Stream` with methods to time out execution. -pub trait StreamExt: Stream + Sized { +pub trait StreamExt: Stream + Sized + Unpin { /// Creates a new stream which will take at most `dur` time to yield each /// item of the stream. /// @@ -207,49 +190,38 @@ pub trait StreamExt: Stream + Sized { } } -impl StreamExt for S {} +impl StreamExt for S {} /// A stream returned by methods in the [`StreamExt`] trait. /// /// [`StreamExt`]: trait.StreamExt.html #[derive(Debug)] -pub struct TimeoutAsyncRead { +pub struct TimeoutAsyncRead { timeout: Delay, dur: Duration, stream: S, } -impl TimeoutAsyncRead { - pin_utils::unsafe_pinned!(timeout: Delay); - pin_utils::unsafe_pinned!(stream: S); -} - -impl AsyncRead for TimeoutAsyncRead { +impl AsyncRead for TimeoutAsyncRead { fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll> { - match self.as_mut().stream().poll_read(cx, buf) { - Poll::Pending => {} - Poll::Ready(s) => { - *self.as_mut().timeout() = Delay::new(self.dur); - return Poll::Ready(s); - } + if let Poll::Ready(s) = Pin::new(&mut self.stream).poll_read(cx, buf) { + self.timeout = Delay::new(self.dur); + return Poll::Ready(s); } - if self.as_mut().timeout().poll(cx).is_ready() { - *self.as_mut().timeout() = Delay::new(self.dur); - let err = Err(io::Error::new(io::ErrorKind::TimedOut, "future timed out").into()); - Poll::Ready(err) - } else { - Poll::Pending - } + Pin::new(&mut self.timeout).poll(cx).map(|_| { + self.timeout = Delay::new(self.dur); + Err(io::Error::new(io::ErrorKind::TimedOut, "future timed out")) + }) } } /// Extend `AsyncRead` with methods to time out execution. -pub trait AsyncReadExt: AsyncRead + Sized { +pub trait AsyncReadExt: AsyncRead + Sized + Unpin { /// Creates a new stream which will take at most `dur` time to yield each /// item of the stream. /// @@ -288,4 +260,4 @@ pub trait AsyncReadExt: AsyncRead + Sized { } } -impl AsyncReadExt for S {} +impl AsyncReadExt for S {} From 320b46f0b57b3c9104188ded20935c9f1d064847 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Wed, 5 Jun 2019 22:51:25 +0200 Subject: [PATCH 32/34] wrap up loose ends Signed-off-by: Yoshua Wuyts --- Cargo.toml | 1 - src/time/ext.rs | 4 ++-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c1b1268d..d6f7ad5e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,7 +21,6 @@ futures-preview = "0.3.0-alpha.16" runtime-attributes = { path = "runtime-attributes", version = "0.3.0-alpha.4" } runtime-raw = { path = "runtime-raw", version = "0.3.0-alpha.3" } runtime-native = { path = "runtime-native", version = "0.3.0-alpha.3", optional = true } -pin-utils = "0.1.0-alpha.4" [dev-dependencies] failure = "0.1.5" diff --git a/src/time/ext.rs b/src/time/ext.rs index a370ad73..83c602d1 100644 --- a/src/time/ext.rs +++ b/src/time/ext.rs @@ -15,7 +15,7 @@ use super::Delay; /// /// [`FutureExt.timeout`]: trait.FutureExt.html #[derive(Debug)] -pub struct Timeout { +pub struct Timeout { future: F, delay: Delay, } @@ -35,7 +35,7 @@ impl Future for Timeout { } /// Extend `Future` with methods to time out execution. -pub trait FutureExt: Future + Sized { +pub trait FutureExt: Future + Sized + Unpin { /// Creates a new future which will take at most `dur` time to resolve from /// the point at which this method is called. /// From 7af4fc65e23877f7673479c4161e8bdb61523b6d Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Wed, 5 Jun 2019 22:52:51 +0200 Subject: [PATCH 33/34] fix tests Signed-off-by: Yoshua Wuyts --- src/time/ext.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/time/ext.rs b/src/time/ext.rs index 83c602d1..cf973f99 100644 --- a/src/time/ext.rs +++ b/src/time/ext.rs @@ -116,7 +116,7 @@ pub trait FutureExt: Future + Sized + Unpin { } } -impl FutureExt for T {} +impl FutureExt for T {} /// A stream returned by methods in the [`StreamExt`] trait. /// From f38369913806873ba76167556e9fd0bfae7db5f5 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Mon, 17 Jun 2019 12:51:56 +0200 Subject: [PATCH 34/34] update linux install --- ci/install-rust.yml | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/ci/install-rust.yml b/ci/install-rust.yml index 654db473..17ea2d91 100644 --- a/ci/install-rust.yml +++ b/ci/install-rust.yml @@ -2,7 +2,10 @@ steps: # Linux and macOS. - script: | set -e - curl https://sh.rustup.rs -sSf | sh -s -- -y --default-toolchain $RUSTUP_TOOLCHAIN + curl https://sh.rustup.rs -sSf | sh -s -- -y --default-toolchain none + export PATH=$PATH:$HOME/.cargo/bin + rustup toolchain install $RUSTUP_TOOLCHAIN + rustup default $RUSTUP_TOOLCHAIN echo "##vso[task.setvariable variable=PATH;]$PATH:$HOME/.cargo/bin" env: RUSTUP_TOOLCHAIN: ${{parameters.rust_version}}