|
| 1 | +#![feature(async_await)] |
| 2 | + |
| 3 | +use std::error::Error; |
| 4 | +use std::io; |
| 5 | +use std::time::Duration; |
| 6 | +use std::task::Poll; |
| 7 | +use std::thread; |
| 8 | + |
| 9 | +use futures::future::poll_fn; |
| 10 | +use futures::channel::mpsc::*; |
| 11 | +use futures_timer::*; |
| 12 | +use futures::TryStreamExt as TryStreamExt03; |
| 13 | + |
| 14 | +type TestResult = io::Result<()>; |
| 15 | + |
| 16 | +#[runtime::test] |
| 17 | +async fn future_timeout() -> TestResult { |
| 18 | + // Never completes |
| 19 | + let long_future = poll_fn::<TestResult, _>(|_| { |
| 20 | + Poll::Pending |
| 21 | + }); |
| 22 | + |
| 23 | + let res = long_future.timeout(Duration::from_millis(100)).await; |
| 24 | + assert_eq!("future timed out", res.unwrap_err().description()); |
| 25 | + Ok(()) |
| 26 | +} |
| 27 | + |
| 28 | +#[runtime::test] |
| 29 | +async fn future_doesnt_timeout() -> TestResult { |
| 30 | + // Never completes |
| 31 | + let short_future = futures::future::ready::<TestResult>(Ok(())); |
| 32 | + short_future.timeout(Duration::from_millis(100)).await?; |
| 33 | + Ok(()) |
| 34 | +} |
| 35 | + |
| 36 | +#[runtime::test] |
| 37 | +async fn stream() -> TestResult { |
| 38 | + |
| 39 | + let dur = Duration::from_millis(10); |
| 40 | + Delay::new(dur).await?; |
| 41 | + Delay::new(dur).await?; |
| 42 | + Ok(()) |
| 43 | +} |
| 44 | + |
| 45 | +#[runtime::test] |
| 46 | +async fn stream_timeout() -> TestResult { |
| 47 | + let (mut tx, rx) = unbounded::<io::Result<u8>>(); |
| 48 | + |
| 49 | + thread::spawn(move || { |
| 50 | + for i in 0..10_u8 { |
| 51 | + tx.start_send(Ok(i)).unwrap(); |
| 52 | + thread::sleep(Duration::from_millis(100)); |
| 53 | + } |
| 54 | + |
| 55 | + drop(tx) |
| 56 | + }); |
| 57 | + |
| 58 | + let mut f = rx.timeout(Duration::from_millis(10)); |
| 59 | + let mut ok = 0; |
| 60 | + let mut err = 0; |
| 61 | + loop { |
| 62 | + let next = f.try_next().await; |
| 63 | + match next { |
| 64 | + Ok(None) => { break; } |
| 65 | + Ok(_) => { ok += 1; } |
| 66 | + Err(_) => { err += 1; } |
| 67 | + } |
| 68 | + } |
| 69 | + |
| 70 | + // Exactly 10 successes |
| 71 | + assert_eq!(ok, 10); |
| 72 | + // We should have way more errors than success (non-deterministic) |
| 73 | + assert!(err > ok * 5); |
| 74 | + |
| 75 | + Ok(()) |
| 76 | +} |
| 77 | + |
| 78 | +#[runtime::test] |
| 79 | +async fn stream_doesnt_timeout() -> TestResult { |
| 80 | + let (mut tx, rx) = unbounded::<io::Result<u8>>(); |
| 81 | + |
| 82 | + // Produce a list of numbers that arrive safely within the timeout period |
| 83 | + thread::spawn(move || { |
| 84 | + for i in 0..10_u8 { |
| 85 | + tx.start_send(Ok(i)).unwrap(); |
| 86 | + thread::sleep(Duration::from_millis(100)); |
| 87 | + } |
| 88 | + |
| 89 | + drop(tx) |
| 90 | + }); |
| 91 | + |
| 92 | + let mut f = rx.timeout(Duration::from_millis(200)); |
| 93 | + let mut count = 0; |
| 94 | + loop { |
| 95 | + let next = f.try_next().await; |
| 96 | + if let Ok(None) = next { |
| 97 | + break; |
| 98 | + } |
| 99 | + // All of these items should be non-error |
| 100 | + next.unwrap(); |
| 101 | + count += 1; |
| 102 | + } |
| 103 | + |
| 104 | + assert_eq!(count, 10); |
| 105 | + |
| 106 | + Ok(()) |
| 107 | +} |
0 commit comments