Skip to content

Commit c338b8b

Browse files
committed
Implement stream combinators using async_stream_block and generators
1 parent 5ed3443 commit c338b8b

File tree

4 files changed

+95
-133
lines changed

4 files changed

+95
-133
lines changed

Cargo.toml

+4-1
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,8 @@ edition = "2018"
1414
pin-utils = "=0.1.0-alpha.4"
1515

1616
[dependencies.futures]
17-
version = "=0.3.0-alpha.16"
17+
git = "https://github.com/taiki-e/futures-rs.git"
18+
branch = "async-stream"
19+
features = ["async-stream", "nightly"]
20+
#version = "=0.3.0-alpha.16"
1821
package = "futures-preview"

src/future.rs

+11-23
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use futures::future::Future;
22
use futures::stream::Stream;
3+
use futures::async_stream_block;
34

45
use core::task::{Poll, Context};
56

@@ -100,35 +101,22 @@ pub fn flatten_stream<Fut, St, T>(future: Fut) -> impl Stream<Item = T>
100101
where Fut: Future<Output = St>,
101102
St: Stream<Item = T>,
102103
{
103-
use crate::stream::next;
104-
futures::stream::unfold((Some(future), None), async move | (future, stream)| {
105-
match (future, stream) {
106-
(Some(future), None) => {
107-
let stream = future.await;
108-
let mut stream = Box::pin(stream);
109-
let item = next(&mut stream).await;
110-
item.map(|item| (item, (None, Some(stream))))
111-
},
112-
(None, Some(mut stream)) => {
113-
let item = next(&mut stream).await;
114-
item.map(|item| (item, (None, Some(stream))))
115-
},
116-
_ => unreachable!()
104+
async_stream_block! {
105+
let stream = future.await;
106+
#[for_await]
107+
for item in stream {
108+
yield item
117109
}
118-
})
110+
}
119111
}
120112

121113
pub fn into_stream<Fut>(future: Fut) -> impl Stream<Item = Fut::Output>
122114
where Fut: Future,
123115
{
124-
futures::stream::unfold(Some(future), async move |future| {
125-
if let Some(future) = future {
126-
let item = future.await;
127-
Some((item, (None)))
128-
} else {
129-
None
130-
}
131-
})
116+
async_stream_block! {
117+
let item = future.await;
118+
yield item
119+
}
132120
}
133121

134122
pub fn poll_fn<F, T>(f: F) -> impl Future<Output = T>

src/lib.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
#![feature(async_await, gen_future, generators)]
1+
#![feature(async_await, gen_future, generators, proc_macro_hygiene)]
22

33
pub mod future;
44
pub mod stream;

src/stream.rs

+79-108
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use futures::stream::Stream;
22
use futures::future::Future;
3+
use futures::async_stream_block;
34

45
use core::pin::Pin;
56
use core::iter::IntoIterator;
@@ -30,48 +31,45 @@ pub fn map<St, U, F>(stream: St, f: F) -> impl Stream<Item = U>
3031
where St: Stream,
3132
F: FnMut(St::Item) -> U,
3233
{
33-
let stream = Box::pin(stream);
34-
unfold((stream, f), async move | (mut stream, mut f)| {
35-
let item = next(&mut stream).await;
36-
item.map(|item| (f(item), (stream, f)))
37-
})
34+
let mut f = f;
35+
async_stream_block! {
36+
#[for_await]
37+
for item in stream {
38+
yield f(item)
39+
}
40+
}
3841
}
3942

4043
pub fn filter<St, Fut, F>(stream: St, f: F) -> impl Stream<Item = St::Item>
4144
where St: Stream,
4245
F: FnMut(&St::Item) -> Fut,
4346
Fut: Future<Output = bool>
4447
{
45-
let stream = Box::pin(stream);
46-
unfold((stream, f), async move | (mut stream, mut f)| {
47-
while let Some(item) = next(&mut stream).await {
48-
let matched = f(&item).await;
49-
if matched {
50-
return Some((item, (stream, f)))
51-
} else {
52-
continue;
48+
let mut f = f;
49+
async_stream_block! {
50+
#[for_await]
51+
for item in stream {
52+
if f(&item).await {
53+
yield item
5354
}
54-
};
55-
None
56-
})
55+
}
56+
}
5757
}
5858

5959
pub fn filter_map<St, Fut, F, U>(stream: St, f: F) -> impl Stream<Item = U>
6060
where St: Stream,
6161
F: FnMut(St::Item) -> Fut,
6262
Fut: Future<Output = Option<U>>
6363
{
64-
let stream = Box::pin(stream);
65-
unfold((stream, f), async move | (mut stream, mut f)| {
66-
while let Some(item) = next(&mut stream).await {
64+
let mut f = f;
65+
async_stream_block! {
66+
#[for_await]
67+
for item in stream {
6768
if let Some(item) = f(item).await {
68-
return Some((item, (stream, f)))
69-
} else {
70-
continue;
69+
yield item
7170
}
72-
};
73-
None
74-
})
71+
}
72+
}
7573
}
7674

7775
pub async fn into_future<St>(stream: St) -> (Option<St::Item>, impl Stream<Item = St::Item>)
@@ -121,18 +119,18 @@ pub async fn for_each<St, Fut, F>(stream: St, f: F) -> ()
121119
pub fn take<St>(stream: St, n: u64) -> impl Stream<Item = St::Item>
122120
where St: Stream,
123121
{
124-
let stream = Box::pin(stream);
125-
unfold((stream, n), async move | (mut stream, n)| {
126-
if n == 0 {
127-
None
128-
} else {
129-
if let Some(item) = next(&mut stream).await {
130-
Some((item, (stream, n - 1)))
122+
let mut n = n;
123+
async_stream_block! {
124+
#[for_await]
125+
for item in stream {
126+
if n == 0 {
127+
break;
131128
} else {
132-
None
129+
n = n - 1;
130+
yield item
133131
}
134132
}
135-
})
133+
}
136134
}
137135

138136
pub fn repeat<T>(item: T) -> impl Stream<Item = T>
@@ -148,65 +146,47 @@ pub fn flatten<St, SubSt, T>(stream: St) -> impl Stream<Item = T>
148146
where SubSt: Stream<Item = T>,
149147
St: Stream<Item = SubSt>,
150148
{
151-
let stream = Box::pin(stream);
152-
unfold((Some(stream), None), async move | (mut state_stream, mut state_substream)| {
153-
loop {
154-
if let Some(mut substream) = state_substream.take() {
155-
if let Some(item) = next(&mut substream).await {
156-
return Some((item, (state_stream, Some(substream))))
157-
} else {
158-
continue;
159-
}
149+
async_stream_block! {
150+
#[for_await]
151+
for substream in stream {
152+
#[for_await]
153+
for item in substream {
154+
yield item
160155
}
161-
if let Some(mut stream) = state_stream.take() {
162-
if let Some(substream) = next(&mut stream).await {
163-
let substream = Box::pin(substream);
164-
state_stream = Some(stream);
165-
state_substream = Some(substream);
166-
continue;
167-
}
168-
}
169-
return None;
170156
}
171-
})
157+
}
172158
}
173159

174160
pub fn then<St, F, Fut>(stream: St, f: F) -> impl Stream<Item = St::Item>
175161
where St: Stream,
176162
F: FnMut(St::Item) -> Fut,
177163
Fut: Future<Output = St::Item>
178164
{
179-
let stream = Box::pin(stream);
180-
unfold((stream, f), async move | (mut stream, mut f)| {
181-
let item = next(&mut stream).await;
182-
if let Some(item) = item {
165+
let mut f = f;
166+
async_stream_block! {
167+
#[for_await]
168+
for item in stream {
183169
let new_item = f(item).await;
184-
Some((new_item, (stream, f)))
185-
} else {
186-
None
170+
yield new_item
187171
}
188-
})
172+
}
189173
}
190174

191175
pub fn skip<St>(stream: St, n: u64) -> impl Stream<Item = St::Item>
192176
where St: Stream,
193177
{
194-
let stream = Box::pin(stream);
195-
unfold((stream, n), async move | (mut stream, mut n)| {
196-
while n != 0 {
197-
if let Some(_) = next(&mut stream).await {
198-
n = n - 1;
199-
continue
178+
let mut n = n;
179+
async_stream_block! {
180+
#[for_await]
181+
for item in stream {
182+
if n == 0 {
183+
yield item
200184
} else {
201-
return None
185+
n = n - 1;
186+
continue;
202187
}
203188
}
204-
if let Some(item) = next(&mut stream).await {
205-
Some((item, (stream, 0)))
206-
} else {
207-
None
208-
}
209-
})
189+
}
210190
}
211191

212192
pub fn zip<St1, St2>(stream: St1, other: St2) -> impl Stream<Item = (St1::Item, St2::Item)>
@@ -228,67 +208,58 @@ pub fn zip<St1, St2>(stream: St1, other: St2) -> impl Stream<Item = (St1::Item,
228208
pub fn chain<St>(stream: St, other: St) -> impl Stream<Item = St::Item>
229209
where St: Stream,
230210
{
231-
let stream = Box::pin(stream);
232-
let other = Box::pin(other);
233-
let start_with_first = true;
234-
unfold((stream, other, start_with_first), async move | (mut stream, mut other, start_with_first)| {
235-
if start_with_first {
236-
if let Some(item) = next(&mut stream).await {
237-
return Some((item, (stream, other, start_with_first)))
238-
}
211+
async_stream_block! {
212+
#[for_await]
213+
for item in stream {
214+
yield item
239215
}
240-
if let Some(item) = next(&mut other).await {
241-
Some((item, (stream, other, /* start_with_first */ false)))
242-
} else {
243-
None
216+
#[for_await]
217+
for item in other {
218+
yield item
244219
}
245-
})
220+
}
246221
}
247222

248223
pub fn take_while<St, F, Fut>(stream: St, f: F) -> impl Stream<Item = St::Item>
249224
where St: Stream,
250225
F: FnMut(&St::Item) -> Fut,
251226
Fut: Future<Output = bool>,
252227
{
253-
let stream = Box::pin(stream);
254-
unfold((stream, f), async move | (mut stream, mut f)| {
255-
if let Some(item) = next(&mut stream).await {
228+
let mut f = f;
229+
async_stream_block! {
230+
#[for_await]
231+
for item in stream {
256232
if f(&item).await {
257-
Some((item, (stream, f)))
233+
yield item
258234
} else {
259-
None
235+
break;
260236
}
261-
} else {
262-
None
263237
}
264-
})
238+
}
265239
}
266240

267241
pub fn skip_while<St, F, Fut>(stream: St, f: F) -> impl Stream<Item = St::Item>
268242
where St: Stream,
269243
F: FnMut(&St::Item) -> Fut,
270244
Fut: Future<Output = bool>,
271245
{
272-
let stream = Box::pin(stream);
273-
let should_skip = true;
274-
unfold((stream, f, should_skip), async move | (mut stream, mut f, should_skip)| {
275-
while should_skip {
276-
if let Some(item) = next(&mut stream).await {
246+
let mut f = f;
247+
let mut should_skip = true;
248+
async_stream_block! {
249+
#[for_await]
250+
for item in stream {
251+
if should_skip {
277252
if f(&item).await {
278253
continue;
279254
} else {
280-
return Some((item, (stream, f, /* should_skip */ false)))
255+
should_skip = false;
256+
yield item
281257
}
282258
} else {
283-
return None
259+
yield item
284260
}
285261
}
286-
if let Some(item) = next(&mut stream).await {
287-
Some((item, (stream, f, /* should_skip */ false)))
288-
} else {
289-
None
290-
}
291-
})
262+
}
292263
}
293264

294265
pub async fn fold<St, T, F, Fut>(stream: St, init: T, f: F) -> T

0 commit comments

Comments
 (0)