Skip to content
This repository was archived by the owner on Oct 30, 2019. It is now read-only.

Time #36

Merged
merged 34 commits into from
Jun 17, 2019
Merged

Time #36

Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
29501d7
init time
yoshuawuyts May 18, 2019
ea9c950
unimplemented
yoshuawuyts May 18, 2019
38b9526
impl native delay
yoshuawuyts May 18, 2019
49e0600
delay at
yoshuawuyts May 18, 2019
fc6e2e9
better errs
yoshuawuyts May 18, 2019
027f275
it works!
yoshuawuyts May 18, 2019
3060412
fmt
yoshuawuyts May 18, 2019
e99dee7
fix checks
yoshuawuyts May 18, 2019
0bd95d5
futureext
yoshuawuyts May 18, 2019
137a64b
time docs
yoshuawuyts May 19, 2019
7237edf
add debug impls for native
yoshuawuyts May 30, 2019
6b99a31
update names
yoshuawuyts May 30, 2019
3bc16fd
add tokio impl
yoshuawuyts May 30, 2019
45070bf
update desc
yoshuawuyts May 31, 2019
25b7002
rustfmt + simpler Debugs
yoshuawuyts May 31, 2019
7167e9d
initial docs
yoshuawuyts Jun 5, 2019
710d305
polish interface
yoshuawuyts Jun 5, 2019
5c529a9
add debug
yoshuawuyts Jun 5, 2019
1642d97
better debug
yoshuawuyts Jun 5, 2019
a2bf368
prelude
yoshuawuyts Jun 5, 2019
8ae5b06
impl ext
yoshuawuyts Jun 5, 2019
a45fd17
finish ext
yoshuawuyts Jun 5, 2019
7857607
streamext
yoshuawuyts Jun 5, 2019
9d00769
cargo fmt
yoshuawuyts Jun 5, 2019
70036d4
AsyncReadExt
yoshuawuyts Jun 5, 2019
7a66476
finish tokio + better docs
yoshuawuyts Jun 5, 2019
d56866a
fix trait name conflicts
yoshuawuyts Jun 5, 2019
55a4f65
fix all docs
yoshuawuyts Jun 5, 2019
396feae
fix async-read example
yoshuawuyts Jun 5, 2019
ff212ff
fix prelude
yoshuawuyts Jun 5, 2019
5f32ab5
remove pin-utils
yoshuawuyts Jun 5, 2019
320b46f
wrap up loose ends
yoshuawuyts Jun 5, 2019
7af4fc6
fix tests
yoshuawuyts Jun 5, 2019
f383699
update linux install
yoshuawuyts Jun 17, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion ci/install-rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ steps:
# Linux and macOS.
- script: |
set -e
curl https://sh.rustup.rs -sSf | sh -s -- -y --default-toolchain $RUSTUP_TOOLCHAIN
curl https://sh.rustup.rs -sSf | sh -s -- -y --default-toolchain none
export PATH=$PATH:$HOME/.cargo/bin
rustup toolchain install $RUSTUP_TOOLCHAIN
rustup default $RUSTUP_TOOLCHAIN
echo "##vso[task.setvariable variable=PATH;]$PATH:$HOME/.cargo/bin"
env:
RUSTUP_TOOLCHAIN: ${{parameters.rust_version}}
Expand Down
1 change: 1 addition & 0 deletions runtime-native/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ async-datagram = "2.2.0"
juliex = "0.3.0-alpha.6"
lazy_static = "1.3.0"
romio = "0.3.0-alpha.7"
futures-timer = "0.2.1"

[target.'cfg(target_arch = "wasm32")'.dependencies]
futures01 = { package = "futures", version = "0.1" }
Expand Down
19 changes: 19 additions & 0 deletions runtime-native/src/not_wasm32.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
use futures::prelude::*;
use futures::{future::BoxFuture, task::SpawnError};
use futures_timer::{Delay as AsyncDelay, Interval as AsyncInterval};
use lazy_static::lazy_static;

use std::io;
use std::net::SocketAddr;
use std::pin::Pin;
use std::time::{Duration, Instant};

mod tcp;
mod time;
mod udp;

use tcp::{TcpListener, TcpStream};
use time::{Delay, Interval};
use udp::UdpSocket;

lazy_static! {
Expand Down Expand Up @@ -58,4 +62,19 @@ impl runtime_raw::Runtime for Native {
let romio_socket = romio::UdpSocket::bind(&addr)?;
Ok(Box::pin(UdpSocket { romio_socket }))
}

fn new_delay(&self, dur: Duration) -> Pin<Box<dyn runtime_raw::Delay>> {
let async_delay = AsyncDelay::new(dur);
Box::pin(Delay { async_delay })
}

fn new_delay_at(&self, at: Instant) -> Pin<Box<dyn runtime_raw::Delay>> {
let async_delay = AsyncDelay::new_at(at);
Box::pin(Delay { async_delay })
}

fn new_interval(&self, dur: Duration) -> Pin<Box<dyn runtime_raw::Interval>> {
let async_interval = AsyncInterval::new(dur);
Box::pin(Interval { async_interval })
}
}
40 changes: 40 additions & 0 deletions runtime-native/src/not_wasm32/time.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Instant;

use futures::prelude::*;
use futures_timer::{Delay as AsyncDelay, Interval as AsyncInterval};

#[derive(Debug)]
pub(crate) struct Delay {
pub(crate) async_delay: AsyncDelay,
}

impl runtime_raw::Delay for Delay {}

impl Future for Delay {
type Output = Instant;

#[inline]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
futures::ready!(Pin::new(&mut self.async_delay).poll(cx)).unwrap();
Poll::Ready(Instant::now())
}
}

#[derive(Debug)]
pub(crate) struct Interval {
pub(crate) async_interval: AsyncInterval,
}

impl runtime_raw::Interval for Interval {}

impl Stream for Interval {
type Item = Instant;

#[inline]
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
futures::ready!(Pin::new(&mut self.async_interval).poll_next(cx)).unwrap();
Poll::Ready(Some(Instant::now()))
}
}
13 changes: 13 additions & 0 deletions runtime-native/src/wasm32.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use futures::{future::BoxFuture, task::SpawnError};
use std::io;
use std::net::SocketAddr;
use std::pin::Pin;
use std::time::{Duration, Instant};

use wasm_bindgen::prelude::*;
use wasm_bindgen_futures::future_to_promise;
Expand Down Expand Up @@ -41,4 +42,16 @@ impl runtime_raw::Runtime for Native {
) -> io::Result<Pin<Box<dyn runtime_raw::UdpSocket>>> {
panic!("Binding UDP sockets is currently not supported in wasm");
}

fn new_delay(&self, _dur: Duration) -> Pin<Box<dyn runtime_raw::Delay>> {
panic!("Timers are currently not supported in wasm");
}

fn new_delay_at(&self, _at: Instant) -> Pin<Box<dyn runtime_raw::Delay>> {
panic!("Timers are currently not supported in wasm");
}

fn new_interval(&self, _dur: Duration) -> Pin<Box<dyn runtime_raw::Interval>> {
panic!("Timers are currently not supported in wasm");
}
}
23 changes: 22 additions & 1 deletion runtime-raw/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,18 @@ use std::cell::Cell;
use std::io;
use std::net::SocketAddr;
use std::pin::Pin;
use std::time::{Duration, Instant};

mod tcp;
mod time;
mod udp;

pub use tcp::*;
pub use time::*;
pub use udp::*;

thread_local! {
static RUNTIME: Cell<Option<&'static dyn Runtime>> = Cell::new(None);
static RUNTIME: Cell<Option<&'static dyn Runtime>> = Cell::new(None);
}

/// Get the current runtime.
Expand Down Expand Up @@ -95,4 +98,22 @@ pub trait Runtime: Send + Sync + 'static {
/// This method is defined on the `Runtime` trait because defining it on
/// `UdpSocket` would prevent it from being a trait object.
fn bind_udp_socket(&self, addr: &SocketAddr) -> io::Result<Pin<Box<dyn UdpSocket>>>;

/// Create a new Future that wakes up after the given duration
///
/// This method is defined on the `Runtime` trait because defining it on
/// `Delay` would prevent it from being a trait object.
fn new_delay(&self, dur: Duration) -> Pin<Box<dyn Delay>>;

/// Create a new Future that wakes up at the given time.
///
/// This method is defined on the `Runtime` trait because defining it on
/// `Delay` would prevent it from being a trait object.
fn new_delay_at(&self, at: Instant) -> Pin<Box<dyn Delay>>;

/// A stream representing notifications at a fixed interval.
///
/// This method is defined on the `Runtime` trait because defining it on
/// `Interval` would prevent it from being a trait object.
fn new_interval(&self, dur: Duration) -> Pin<Box<dyn Interval>>;
}
11 changes: 11 additions & 0 deletions runtime-raw/src/time.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
use std::fmt::Debug;
use std::future::Future;
use std::time::Instant;

use futures::Stream;

/// A future representing the notification that an elapsed duration has occurred.
pub trait Delay: Future<Output = Instant> + Debug + Send {}

/// A stream representing notifications at a fixed interval.
pub trait Interval: Stream<Item = Instant> + Debug + Send {}
34 changes: 34 additions & 0 deletions runtime-tokio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,21 @@ use futures::{
task::SpawnError,
};
use lazy_static::lazy_static;
use tokio::timer::{Delay as TokioDelay, Interval as TokioInterval};

use std::io;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::{mpsc, Mutex};
use std::thread;
use std::time::{Duration, Instant};

mod tcp;
mod time;
mod udp;

use tcp::{TcpListener, TcpStream};
use time::{Delay, Interval};
use udp::UdpSocket;

/// The default Tokio runtime.
Expand Down Expand Up @@ -78,6 +82,21 @@ impl runtime_raw::Runtime for Tokio {
let tokio_socket = tokio::net::UdpSocket::bind(&addr)?;
Ok(Box::pin(UdpSocket { tokio_socket }))
}

fn new_delay(&self, dur: Duration) -> Pin<Box<dyn runtime_raw::Delay>> {
let tokio_delay = TokioDelay::new(Instant::now() + dur);
Box::pin(Delay { tokio_delay })
}

fn new_delay_at(&self, at: Instant) -> Pin<Box<dyn runtime_raw::Delay>> {
let tokio_delay = TokioDelay::new(at);
Box::pin(Delay { tokio_delay })
}

fn new_interval(&self, dur: Duration) -> Pin<Box<dyn runtime_raw::Interval>> {
let tokio_interval = TokioInterval::new(Instant::now(), dur);
Box::pin(Interval { tokio_interval })
}
}

/// The single-threaded Tokio runtime based on `tokio-current-thread`.
Expand Down Expand Up @@ -143,4 +162,19 @@ impl runtime_raw::Runtime for TokioCurrentThread {
let tokio_socket = tokio::net::UdpSocket::bind(&addr)?;
Ok(Box::pin(UdpSocket { tokio_socket }))
}

fn new_delay(&self, dur: Duration) -> Pin<Box<dyn runtime_raw::Delay>> {
let tokio_delay = TokioDelay::new(Instant::now() + dur);
Box::pin(Delay { tokio_delay })
}

fn new_delay_at(&self, at: Instant) -> Pin<Box<dyn runtime_raw::Delay>> {
let tokio_delay = TokioDelay::new(at);
Box::pin(Delay { tokio_delay })
}

fn new_interval(&self, dur: Duration) -> Pin<Box<dyn runtime_raw::Interval>> {
let tokio_interval = TokioInterval::new(Instant::now(), dur);
Box::pin(Interval { tokio_interval })
}
}
46 changes: 46 additions & 0 deletions runtime-tokio/src/time.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Instant;

use futures::compat::Compat01As03;
use futures::prelude::*;
use tokio::timer::{Delay as TokioDelay, Interval as TokioInterval};

#[derive(Debug)]
pub(crate) struct Delay {
pub(crate) tokio_delay: TokioDelay,
}

impl runtime_raw::Delay for Delay {}

impl Future for Delay {
type Output = Instant;

#[inline]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut fut = Compat01As03::new(&mut self.tokio_delay);
futures::ready!(Pin::new(&mut fut).poll(cx)).unwrap();
Poll::Ready(Instant::now())
}
}

#[derive(Debug)]
pub(crate) struct Interval {
pub(crate) tokio_interval: TokioInterval,
}

impl runtime_raw::Interval for Interval {}

impl Stream for Interval {
type Item = Instant;

#[inline]
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut stream = Compat01As03::new(&mut self.tokio_interval);
// https://docs.rs/tokio/0.1.20/tokio/timer/struct.Error.html
futures::ready!(Pin::new(&mut stream).poll_next(cx))
.unwrap()
.unwrap();
Poll::Ready(Some(Instant::now()))
}
}
23 changes: 23 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,29 @@

pub mod net;
pub mod task;
pub mod time;

/// The Runtime Prelude.
///
/// Rust comes with a variety of things in its standard library. However, Runtime and Futures
/// provide new functionality outside of it. We want to make Runtime feel as close to standard Rust
/// as possible. We care deeply about usability.
///
/// The _prelude_ is the list of things we recommend importing into Runtime programs. It's kept as
/// small as possible, and is focused on things, particularly traits.
///
/// To use the `prelude` do:
/// ```
/// use runtime::prelude::*;
/// ```
pub mod prelude {
#[doc(inline)]
pub use super::time::AsyncReadExt as _;
#[doc(inline)]
pub use super::time::FutureExt as _;
#[doc(inline)]
pub use super::time::StreamExt as _;
}

#[doc(inline)]
pub use task::spawn;
Expand Down
57 changes: 57 additions & 0 deletions src/time.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
//! Types and Functions for time-related operations.
//!
//! This module provides primitives for setting asynchronous timeouts, intervals, and delays.
//!
//! # Organization
//!
//! * [`Delay`] and [`Interval`] provide functionality for setting delays and intervals.
//! * [`FutureExt`] extends Futures with the ability to time-out.
//! * Other types are return or parameter types for various methods in this module
//!
//! [`Delay`]: struct.Delay.html
//! [`Interval`]: struct.Interval.html
//! [`FutureExt`]: trait.FutureExt.html
//!
//! ## Examples
//! __Delay execution for three seconds__
//! ```no_run
//! # #![feature(async_await)]
//! # #[runtime::main]
//! # async fn main() {
//! use runtime::time::Delay;
//! use std::time::{Duration, Instant};
//!
//! let start = Instant::now();
//! let now = Delay::new(Duration::from_secs(3)).await;
//!
//! let elapsed = now - start;
//! println!("elapsed: {}s", elapsed.as_secs());
//! # }
//! ```
//!
//! __Emit an event every two seconds__
//! ```no_run
//! # #![feature(async_await)]
//! # #[runtime::main]
//! # async fn main() {
//! # use futures::prelude::*;
//! use runtime::time::Interval;
//! use std::time::{Duration, Instant};
//!
//! let start = Instant::now();
//!
//! let mut interval = Interval::new(Duration::from_secs(2));
//! while let Some(now) = interval.next().await {
//! let elapsed = now - start;
//! println!("elapsed: {}s", elapsed.as_secs());
//! }
//! # }
//! ```

mod delay;
mod ext;
mod interval;

pub use delay::*;
pub use ext::*;
pub use interval::*;
Loading