Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement stream combinators using async_stream_block and generators #7

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ maintenance = { status = "experimental" }

[dependencies]
pin-utils = "=0.1.0-alpha.4"
futures-async-stream = "0.1.0-alpha.1"

[dependencies.futures-core]
version = "=0.3.0-alpha.19"
Expand Down
34 changes: 11 additions & 23 deletions src/future.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use futures_core::future::Future;
use futures_core::stream::Stream;
use futures_async_stream::async_stream_block;

use core::pin::Pin;
use core::task::{Context, Poll};
Expand Down Expand Up @@ -430,22 +431,13 @@ where
Fut: Future<Output = St>,
St: Stream<Item = T>,
{
use crate::stream::next;
crate::stream::unfold((Some(future), None), async move |(future, stream)| {
match (future, stream) {
(Some(future), None) => {
let stream = future.await;
let mut stream = Box::pin(stream);
let item = next(&mut stream).await;
item.map(|item| (item, (None, Some(stream))))
}
(None, Some(mut stream)) => {
let item = next(&mut stream).await;
item.map(|item| (item, (None, Some(stream))))
}
_ => unreachable!(),
async_stream_block! {
let stream = future.await;
#[for_await]
for item in stream {
yield item
}
})
}
}

/// Convert this future into a single element stream.
Expand All @@ -470,14 +462,10 @@ pub fn into_stream<Fut>(future: Fut) -> impl Stream<Item = Fut::Output>
where
Fut: Future,
{
crate::stream::unfold(Some(future), async move |future| {
if let Some(future) = future {
let item = future.await;
Some((item, (None)))
} else {
None
}
})
async_stream_block! {
let item = future.await;
yield item
}
}

/// Creates a new future wrapping around a function returning [`Poll`](core::task::Poll).
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#![feature(async_closure, gen_future, generators)]
#![feature(gen_future, generators, proc_macro_hygiene)]

pub mod future;
pub mod stream;
222 changes: 94 additions & 128 deletions src/stream.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use core::future::Future;
use futures_core::future::Future;
pub use futures_core::stream::Stream;
use futures_async_stream::async_stream_block;

use core::iter::IntoIterator;
use core::pin::Pin;
Expand Down Expand Up @@ -97,11 +98,13 @@ where
St: Stream,
F: FnMut(St::Item) -> U,
{
let stream = Box::pin(stream);
unfold((stream, f), async move |(mut stream, mut f)| {
let item = next(&mut stream).await;
item.map(|item| (f(item), (stream, f)))
})
let mut f = f;
async_stream_block! {
#[for_await]
for item in stream {
yield f(item)
}
}
}

/// Filters the values produced by this stream according to the provided
Expand Down Expand Up @@ -136,18 +139,15 @@ where
F: FnMut(&St::Item) -> Fut,
Fut: Future<Output = bool>,
{
let stream = Box::pin(stream);
unfold((stream, f), async move |(mut stream, mut f)| {
while let Some(item) = next(&mut stream).await {
let matched = f(&item).await;
if matched {
return Some((item, (stream, f)));
} else {
continue;
let mut f = f;
async_stream_block! {
#[for_await]
for item in stream {
if f(&item).await {
yield item
}
}
None
})
}
}

/// Filters the values produced by this stream while simultaneously mapping
Expand Down Expand Up @@ -183,17 +183,15 @@ where
F: FnMut(St::Item) -> Fut,
Fut: Future<Output = Option<U>>,
{
let stream = Box::pin(stream);
unfold((stream, f), async move |(mut stream, mut f)| {
while let Some(item) = next(&mut stream).await {
let mut f = f;
async_stream_block! {
#[for_await]
for item in stream {
if let Some(item) = f(item).await {
return Some((item, (stream, f)));
} else {
continue;
yield item
}
}
None
})
}
}

/// Converts this stream into a future of `(next_item, tail_of_stream)`.
Expand Down Expand Up @@ -366,18 +364,18 @@ pub fn take<St>(stream: St, n: u64) -> impl Stream<Item = St::Item>
where
St: Stream,
{
let stream = Box::pin(stream);
unfold((stream, n), async move |(mut stream, n)| {
if n == 0 {
None
} else {
if let Some(item) = next(&mut stream).await {
Some((item, (stream, n - 1)))
let mut n = n;
async_stream_block! {
#[for_await]
for item in stream {
if n == 0 {
break;
} else {
None
n = n - 1;
yield item
}
}
})
}
}

/// Create a stream which produces the same item repeatedly.
Expand Down Expand Up @@ -428,28 +426,15 @@ where
SubSt: Stream<Item = T>,
St: Stream<Item = SubSt>,
{
let stream = Box::pin(stream);
unfold(
(Some(stream), None),
async move |(mut state_stream, mut state_substream)| loop {
if let Some(mut substream) = state_substream.take() {
if let Some(item) = next(&mut substream).await {
return Some((item, (state_stream, Some(substream))));
} else {
continue;
}
}
if let Some(mut stream) = state_stream.take() {
if let Some(substream) = next(&mut stream).await {
let substream = Box::pin(substream);
state_stream = Some(stream);
state_substream = Some(substream);
continue;
}
async_stream_block! {
#[for_await]
for substream in stream {
#[for_await]
for item in substream {
yield item
}
return None;
},
)
}
}
}

/// Computes from this stream's items new items of a different type using
Expand Down Expand Up @@ -481,16 +466,14 @@ where
F: FnMut(St::Item) -> Fut,
Fut: Future<Output = St::Item>,
{
let stream = Box::pin(stream);
unfold((stream, f), async move |(mut stream, mut f)| {
let item = next(&mut stream).await;
if let Some(item) = item {
let mut f = f;
async_stream_block! {
#[for_await]
for item in stream {
let new_item = f(item).await;
Some((new_item, (stream, f)))
} else {
None
yield new_item
}
})
}
}

/// Creates a new stream which skips `n` items of the underlying stream.
Expand All @@ -515,22 +498,18 @@ pub fn skip<St>(stream: St, n: u64) -> impl Stream<Item = St::Item>
where
St: Stream,
{
let stream = Box::pin(stream);
unfold((stream, n), async move |(mut stream, mut n)| {
while n != 0 {
if let Some(_) = next(&mut stream).await {
let mut n = n;
async_stream_block! {
#[for_await]
for item in stream {
if n == 0 {
yield item
} else {
n = n - 1;
continue;
} else {
return None;
}
}
if let Some(item) = next(&mut stream).await {
Some((item, (stream, 0)))
} else {
None
}
})
}
}

/// An adapter for zipping two streams together.
Expand Down Expand Up @@ -559,16 +538,18 @@ where
St1: Stream,
St2: Stream,
{
let stream = Box::pin(stream);
let other = Box::pin(other);
unfold((stream, other), async move |(mut stream, mut other)| {
let left = next(&mut stream).await;
let right = next(&mut other).await;
match (left, right) {
(Some(left), Some(right)) => Some(((left, right), (stream, other))),
_ => None,
let mut stream = Box::pin(stream);
let mut other = Box::pin(other);
async_stream_block! {
loop {
let left = next(&mut stream).await;
let right = next(&mut other).await;
match (left, right) {
(Some(left), Some(right)) => yield (left, right),
_ => break,
}
}
})
}
}

/// Adapter for chaining two stream.
Expand Down Expand Up @@ -598,24 +579,16 @@ pub fn chain<St>(stream: St, other: St) -> impl Stream<Item = St::Item>
where
St: Stream,
{
let stream = Box::pin(stream);
let other = Box::pin(other);
let start_with_first = true;
unfold(
(stream, other, start_with_first),
async move |(mut stream, mut other, start_with_first)| {
if start_with_first {
if let Some(item) = next(&mut stream).await {
return Some((item, (stream, other, start_with_first)));
}
}
if let Some(item) = next(&mut other).await {
Some((item, (stream, other, /* start_with_first */ false)))
} else {
None
}
},
)
async_stream_block! {
#[for_await]
for item in stream {
yield item
}
#[for_await]
for item in other {
yield item
}
}
}

/// Take elements from this stream while the provided asynchronous predicate
Expand Down Expand Up @@ -644,18 +617,17 @@ where
F: FnMut(&St::Item) -> Fut,
Fut: Future<Output = bool>,
{
let stream = Box::pin(stream);
unfold((stream, f), async move |(mut stream, mut f)| {
if let Some(item) = next(&mut stream).await {
let mut f = f;
async_stream_block! {
#[for_await]
for item in stream {
if f(&item).await {
Some((item, (stream, f)))
yield item
} else {
None
break;
}
} else {
None
}
})
}
}

/// Skip elements on this stream while the provided asynchronous predicate
Expand Down Expand Up @@ -685,29 +657,23 @@ where
F: FnMut(&St::Item) -> Fut,
Fut: Future<Output = bool>,
{
let stream = Box::pin(stream);
let should_skip = true;
unfold(
(stream, f, should_skip),
async move |(mut stream, mut f, should_skip)| {
while should_skip {
if let Some(item) = next(&mut stream).await {
if f(&item).await {
continue;
} else {
return Some((item, (stream, f, /* should_skip */ false)));
}
let mut f = f;
let mut should_skip = true;
async_stream_block! {
#[for_await]
for item in stream {
if should_skip {
if f(&item).await {
continue;
} else {
return None;
should_skip = false;
yield item
}
}
if let Some(item) = next(&mut stream).await {
Some((item, (stream, f, /* should_skip */ false)))
} else {
None
yield item
}
},
)
}
}
}

/// Execute an accumulating asynchronous computation over a stream,
Expand Down