Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tests for ext traits + rename traits #18

Merged
merged 3 commits into from
Jun 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions src/ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::Delay;

/// An extension trait for futures which provides convenient accessors for
/// timing out execution and such.
pub trait FutureExt: TryFuture + Sized {
pub trait TryFutureExt: TryFuture + Sized {
/// Creates a new future which will take at most `dur` time to resolve from
/// the point at which this method is called.
///
Expand All @@ -30,7 +30,7 @@ pub trait FutureExt: TryFuture + Sized {
/// # #![feature(async_await)]
/// use std::time::Duration;
/// use futures::prelude::*;
/// use futures_timer::FutureExt;
/// use futures_timer::TryFutureExt;
///
/// # fn long_future() -> impl TryFuture<Ok = (), Error = std::io::Error> {
/// # futures::future::ok(())
Expand Down Expand Up @@ -74,7 +74,7 @@ pub trait FutureExt: TryFuture + Sized {
}
}

impl<F: TryFuture> FutureExt for F {}
impl<F: TryFuture> TryFutureExt for F {}

/// Future returned by the `FutureExt::timeout` method.
pub struct Timeout<F>
Expand Down Expand Up @@ -119,7 +119,7 @@ where

/// An extension trait for streams which provides convenient accessors for
/// timing out execution and such.
pub trait StreamExt: TryStream + Sized {
pub trait TryStreamExt: TryStream + Sized {
/// Creates a new stream which will take at most `dur` time to yield each
/// item of the stream.
///
Expand All @@ -143,7 +143,7 @@ pub trait StreamExt: TryStream + Sized {
}
}

impl<S: TryStream> StreamExt for S {}
impl<S: TryStream> TryStreamExt for S {}

/// Stream returned by the `StreamExt::timeout` method.
pub struct TimeoutStream<S>
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ mod global;
mod heap;

pub mod ext;
pub use ext::FutureExt;
pub use ext::{TryFutureExt, TryStreamExt};

/// A "timer heap" used to power separately owned instances of `Delay` and
/// `Interval`.
Expand Down
107 changes: 107 additions & 0 deletions tests/ext.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
#![feature(async_await)]

use std::error::Error;
use std::io;
use std::time::Duration;
use std::task::Poll;
use std::thread;

use futures::future::poll_fn;
use futures::channel::mpsc::*;
use futures_timer::*;
use futures::TryStreamExt as TryStreamExt03;

type TestResult = io::Result<()>;

#[runtime::test]
async fn future_timeout() -> TestResult {
// Never completes
let long_future = poll_fn::<TestResult, _>(|_| {
Poll::Pending
});

let res = long_future.timeout(Duration::from_millis(100)).await;
assert_eq!("future timed out", res.unwrap_err().description());
Ok(())
}

#[runtime::test]
async fn future_doesnt_timeout() -> TestResult {
// Never completes
let short_future = futures::future::ready::<TestResult>(Ok(()));
short_future.timeout(Duration::from_millis(100)).await?;
Ok(())
}

#[runtime::test]
async fn stream() -> TestResult {

let dur = Duration::from_millis(10);
Delay::new(dur).await?;
Delay::new(dur).await?;
Ok(())
}

#[runtime::test]
async fn stream_timeout() -> TestResult {
let (mut tx, rx) = unbounded::<io::Result<u8>>();

thread::spawn(move || {
for i in 0..10_u8 {
tx.start_send(Ok(i)).unwrap();
thread::sleep(Duration::from_millis(100));
}

drop(tx)
});

let mut f = rx.timeout(Duration::from_millis(10));
let mut ok = 0;
let mut err = 0;
loop {
let next = f.try_next().await;
match next {
Ok(None) => { break; }
Ok(_) => { ok += 1; }
Err(_) => { err += 1; }
}
}

// Exactly 10 successes
assert_eq!(ok, 10);
// We should have way more errors than success (non-deterministic)
assert!(err > ok * 5);

Ok(())
}

#[runtime::test]
async fn stream_doesnt_timeout() -> TestResult {
let (mut tx, rx) = unbounded::<io::Result<u8>>();

// Produce a list of numbers that arrive safely within the timeout period
thread::spawn(move || {
for i in 0..10_u8 {
tx.start_send(Ok(i)).unwrap();
thread::sleep(Duration::from_millis(100));
}

drop(tx)
});

let mut f = rx.timeout(Duration::from_millis(200));
let mut count = 0;
loop {
let next = f.try_next().await;
if let Ok(None) = next {
break;
}
// All of these items should be non-error
next.unwrap();
count += 1;
}

assert_eq!(count, 10);

Ok(())
}