Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit d5c9f3c

Browse files
committedJul 2, 2019
Implement stream combinators using async_stream_block and generators
1 parent 5ed3443 commit d5c9f3c

File tree

4 files changed

+106
-142
lines changed

4 files changed

+106
-142
lines changed
 

‎Cargo.toml

+4-1
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,8 @@ edition = "2018"
1414
pin-utils = "=0.1.0-alpha.4"
1515

1616
[dependencies.futures]
17-
version = "=0.3.0-alpha.16"
17+
git = "https://github.com/taiki-e/futures-rs.git"
18+
branch = "async-stream"
19+
features = ["async-stream", "nightly"]
20+
#version = "=0.3.0-alpha.16"
1821
package = "futures-preview"

‎src/future.rs

+11-23
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use futures::future::Future;
22
use futures::stream::Stream;
3+
use futures::async_stream_block;
34

45
use core::task::{Poll, Context};
56

@@ -100,35 +101,22 @@ pub fn flatten_stream<Fut, St, T>(future: Fut) -> impl Stream<Item = T>
100101
where Fut: Future<Output = St>,
101102
St: Stream<Item = T>,
102103
{
103-
use crate::stream::next;
104-
futures::stream::unfold((Some(future), None), async move | (future, stream)| {
105-
match (future, stream) {
106-
(Some(future), None) => {
107-
let stream = future.await;
108-
let mut stream = Box::pin(stream);
109-
let item = next(&mut stream).await;
110-
item.map(|item| (item, (None, Some(stream))))
111-
},
112-
(None, Some(mut stream)) => {
113-
let item = next(&mut stream).await;
114-
item.map(|item| (item, (None, Some(stream))))
115-
},
116-
_ => unreachable!()
104+
async_stream_block! {
105+
let stream = future.await;
106+
#[for_await]
107+
for item in stream {
108+
yield item
117109
}
118-
})
110+
}
119111
}
120112

121113
pub fn into_stream<Fut>(future: Fut) -> impl Stream<Item = Fut::Output>
122114
where Fut: Future,
123115
{
124-
futures::stream::unfold(Some(future), async move |future| {
125-
if let Some(future) = future {
126-
let item = future.await;
127-
Some((item, (None)))
128-
} else {
129-
None
130-
}
131-
})
116+
async_stream_block! {
117+
let item = future.await;
118+
yield item
119+
}
132120
}
133121

134122
pub fn poll_fn<F, T>(f: F) -> impl Future<Output = T>

‎src/lib.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
#![feature(async_await, gen_future, generators)]
1+
#![feature(async_await, gen_future, generators, proc_macro_hygiene)]
22

33
pub mod future;
44
pub mod stream;

‎src/stream.rs

+90-117
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use futures::stream::Stream;
22
use futures::future::Future;
3+
use futures::async_stream_block;
34

45
use core::pin::Pin;
56
use core::iter::IntoIterator;
@@ -30,48 +31,45 @@ pub fn map<St, U, F>(stream: St, f: F) -> impl Stream<Item = U>
3031
where St: Stream,
3132
F: FnMut(St::Item) -> U,
3233
{
33-
let stream = Box::pin(stream);
34-
unfold((stream, f), async move | (mut stream, mut f)| {
35-
let item = next(&mut stream).await;
36-
item.map(|item| (f(item), (stream, f)))
37-
})
34+
let mut f = f;
35+
async_stream_block! {
36+
#[for_await]
37+
for item in stream {
38+
yield f(item)
39+
}
40+
}
3841
}
3942

4043
pub fn filter<St, Fut, F>(stream: St, f: F) -> impl Stream<Item = St::Item>
4144
where St: Stream,
4245
F: FnMut(&St::Item) -> Fut,
4346
Fut: Future<Output = bool>
4447
{
45-
let stream = Box::pin(stream);
46-
unfold((stream, f), async move | (mut stream, mut f)| {
47-
while let Some(item) = next(&mut stream).await {
48-
let matched = f(&item).await;
49-
if matched {
50-
return Some((item, (stream, f)))
51-
} else {
52-
continue;
48+
let mut f = f;
49+
async_stream_block! {
50+
#[for_await]
51+
for item in stream {
52+
if f(&item).await {
53+
yield item
5354
}
54-
};
55-
None
56-
})
55+
}
56+
}
5757
}
5858

5959
pub fn filter_map<St, Fut, F, U>(stream: St, f: F) -> impl Stream<Item = U>
6060
where St: Stream,
6161
F: FnMut(St::Item) -> Fut,
6262
Fut: Future<Output = Option<U>>
6363
{
64-
let stream = Box::pin(stream);
65-
unfold((stream, f), async move | (mut stream, mut f)| {
66-
while let Some(item) = next(&mut stream).await {
64+
let mut f = f;
65+
async_stream_block! {
66+
#[for_await]
67+
for item in stream {
6768
if let Some(item) = f(item).await {
68-
return Some((item, (stream, f)))
69-
} else {
70-
continue;
69+
yield item
7170
}
72-
};
73-
None
74-
})
71+
}
72+
}
7573
}
7674

7775
pub async fn into_future<St>(stream: St) -> (Option<St::Item>, impl Stream<Item = St::Item>)
@@ -121,18 +119,18 @@ pub async fn for_each<St, Fut, F>(stream: St, f: F) -> ()
121119
pub fn take<St>(stream: St, n: u64) -> impl Stream<Item = St::Item>
122120
where St: Stream,
123121
{
124-
let stream = Box::pin(stream);
125-
unfold((stream, n), async move | (mut stream, n)| {
126-
if n == 0 {
127-
None
128-
} else {
129-
if let Some(item) = next(&mut stream).await {
130-
Some((item, (stream, n - 1)))
122+
let mut n = n;
123+
async_stream_block! {
124+
#[for_await]
125+
for item in stream {
126+
if n == 0 {
127+
break;
131128
} else {
132-
None
129+
n = n - 1;
130+
yield item
133131
}
134132
}
135-
})
133+
}
136134
}
137135

138136
pub fn repeat<T>(item: T) -> impl Stream<Item = T>
@@ -148,147 +146,122 @@ pub fn flatten<St, SubSt, T>(stream: St) -> impl Stream<Item = T>
148146
where SubSt: Stream<Item = T>,
149147
St: Stream<Item = SubSt>,
150148
{
151-
let stream = Box::pin(stream);
152-
unfold((Some(stream), None), async move | (mut state_stream, mut state_substream)| {
153-
loop {
154-
if let Some(mut substream) = state_substream.take() {
155-
if let Some(item) = next(&mut substream).await {
156-
return Some((item, (state_stream, Some(substream))))
157-
} else {
158-
continue;
159-
}
160-
}
161-
if let Some(mut stream) = state_stream.take() {
162-
if let Some(substream) = next(&mut stream).await {
163-
let substream = Box::pin(substream);
164-
state_stream = Some(stream);
165-
state_substream = Some(substream);
166-
continue;
167-
}
149+
async_stream_block! {
150+
#[for_await]
151+
for substream in stream {
152+
#[for_await]
153+
for item in substream {
154+
yield item
168155
}
169-
return None;
170156
}
171-
})
157+
}
172158
}
173159

174160
pub fn then<St, F, Fut>(stream: St, f: F) -> impl Stream<Item = St::Item>
175161
where St: Stream,
176162
F: FnMut(St::Item) -> Fut,
177163
Fut: Future<Output = St::Item>
178164
{
179-
let stream = Box::pin(stream);
180-
unfold((stream, f), async move | (mut stream, mut f)| {
181-
let item = next(&mut stream).await;
182-
if let Some(item) = item {
165+
let mut f = f;
166+
async_stream_block! {
167+
#[for_await]
168+
for item in stream {
183169
let new_item = f(item).await;
184-
Some((new_item, (stream, f)))
185-
} else {
186-
None
170+
yield new_item
187171
}
188-
})
172+
}
189173
}
190174

191175
pub fn skip<St>(stream: St, n: u64) -> impl Stream<Item = St::Item>
192176
where St: Stream,
193177
{
194-
let stream = Box::pin(stream);
195-
unfold((stream, n), async move | (mut stream, mut n)| {
196-
while n != 0 {
197-
if let Some(_) = next(&mut stream).await {
198-
n = n - 1;
199-
continue
178+
let mut n = n;
179+
async_stream_block! {
180+
#[for_await]
181+
for item in stream {
182+
if n == 0 {
183+
yield item
200184
} else {
201-
return None
185+
n = n - 1;
186+
continue;
202187
}
203188
}
204-
if let Some(item) = next(&mut stream).await {
205-
Some((item, (stream, 0)))
206-
} else {
207-
None
208-
}
209-
})
189+
}
210190
}
211191

212192
pub fn zip<St1, St2>(stream: St1, other: St2) -> impl Stream<Item = (St1::Item, St2::Item)>
213193
where St1: Stream,
214194
St2: Stream,
215195
{
216-
let stream = Box::pin(stream);
217-
let other = Box::pin(other);
218-
unfold((stream, other), async move | (mut stream, mut other)| {
219-
let left = next(&mut stream).await;
220-
let right = next(&mut other).await;
221-
match (left, right) {
222-
(Some(left), Some(right)) => Some(((left, right), (stream, other))),
223-
_ => None
196+
let mut stream = Box::pin(stream);
197+
let mut other = Box::pin(other);
198+
async_stream_block! {
199+
loop {
200+
let left = next(&mut stream).await;
201+
let right = next(&mut other).await;
202+
match (left, right) {
203+
(Some(left), Some(right)) => yield (left, right),
204+
_ => break,
205+
}
224206
}
225-
})
207+
}
226208
}
227209

228210
pub fn chain<St>(stream: St, other: St) -> impl Stream<Item = St::Item>
229211
where St: Stream,
230212
{
231-
let stream = Box::pin(stream);
232-
let other = Box::pin(other);
233-
let start_with_first = true;
234-
unfold((stream, other, start_with_first), async move | (mut stream, mut other, start_with_first)| {
235-
if start_with_first {
236-
if let Some(item) = next(&mut stream).await {
237-
return Some((item, (stream, other, start_with_first)))
238-
}
213+
async_stream_block! {
214+
#[for_await]
215+
for item in stream {
216+
yield item
239217
}
240-
if let Some(item) = next(&mut other).await {
241-
Some((item, (stream, other, /* start_with_first */ false)))
242-
} else {
243-
None
218+
#[for_await]
219+
for item in other {
220+
yield item
244221
}
245-
})
222+
}
246223
}
247224

248225
pub fn take_while<St, F, Fut>(stream: St, f: F) -> impl Stream<Item = St::Item>
249226
where St: Stream,
250227
F: FnMut(&St::Item) -> Fut,
251228
Fut: Future<Output = bool>,
252229
{
253-
let stream = Box::pin(stream);
254-
unfold((stream, f), async move | (mut stream, mut f)| {
255-
if let Some(item) = next(&mut stream).await {
230+
let mut f = f;
231+
async_stream_block! {
232+
#[for_await]
233+
for item in stream {
256234
if f(&item).await {
257-
Some((item, (stream, f)))
235+
yield item
258236
} else {
259-
None
237+
break;
260238
}
261-
} else {
262-
None
263239
}
264-
})
240+
}
265241
}
266242

267243
pub fn skip_while<St, F, Fut>(stream: St, f: F) -> impl Stream<Item = St::Item>
268244
where St: Stream,
269245
F: FnMut(&St::Item) -> Fut,
270246
Fut: Future<Output = bool>,
271247
{
272-
let stream = Box::pin(stream);
273-
let should_skip = true;
274-
unfold((stream, f, should_skip), async move | (mut stream, mut f, should_skip)| {
275-
while should_skip {
276-
if let Some(item) = next(&mut stream).await {
248+
let mut f = f;
249+
let mut should_skip = true;
250+
async_stream_block! {
251+
#[for_await]
252+
for item in stream {
253+
if should_skip {
277254
if f(&item).await {
278255
continue;
279256
} else {
280-
return Some((item, (stream, f, /* should_skip */ false)))
257+
should_skip = false;
258+
yield item
281259
}
282260
} else {
283-
return None
261+
yield item
284262
}
285263
}
286-
if let Some(item) = next(&mut stream).await {
287-
Some((item, (stream, f, /* should_skip */ false)))
288-
} else {
289-
None
290-
}
291-
})
264+
}
292265
}
293266

294267
pub async fn fold<St, T, F, Fut>(stream: St, init: T, f: F) -> T

0 commit comments

Comments
 (0)
Please sign in to comment.