-
-
Notifications
You must be signed in to change notification settings - Fork 2.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
(WIP): Adding Stream mock to tokio-test #4463
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me know if there's any other help you need.
tokio-test/src/io.rs
Outdated
#[derive(Debug, Clone, Default)] | ||
pub struct StreamBuilder { | ||
// Sequence of actions for the Mock to take | ||
actions: VecDeque<Action>, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should go in its own file.
tokio-test/src/io.rs
Outdated
/// call next value ? | ||
pub fn poll_next(&mut self, buf: &[u8]) { | ||
/// TODO | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should not be able to call poll_next
or next
on the builder. You have to actually build it first. The io builder also doesn't implement AsyncRead
or AsyncWrite
.
Sure! I'm trying to get to know Tokio code base and its patterns. |
tokio-test/src/io_stream.rs
Outdated
fn read(&mut self, dst: &mut ReadBuf<'_>) -> io::Result<()> { | ||
match self.action() { | ||
Some(&mut Action::Read(ref mut data)) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are implementing a Stream
. Streams do not have a read
or write
method. Instead, you should make an impl Stream for Mock
block so that Mock
becomes a stream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
now I see where I need to go. In impl Steam for Mock i should use :
type Item = String; // like this because my mock would only use Strings as their items ?
then i would do something like pop from an array or like a loop from async read or async write within poll_next ? like this:
fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// TODO
// Pop item from array who could i iterate self? or do a while loop like in AsyncRead /AsyncWrite ?
Poll::Pending
}
tokio-test/src/io_stream.rs
Outdated
#[derive(Debug, Clone, Default)] | ||
pub struct Builder { | ||
// Sequence of actions for the Mock to take | ||
actions: VecDeque<Action>, | ||
} | ||
#[derive(Debug, Clone, Default)] | ||
pub struct StreamBuilder { | ||
// Sequence of actions for the Mock to take | ||
actions: VecDeque<Action>, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are there two builders?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry :/ I was sleppy but I did remove these two now
You would test it by calling |
tokio-test/src/io_stream.rs
Outdated
impl Stream for Mock { | ||
type Item = String; | ||
|
||
fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | ||
// TODO | ||
// Pop item from array who could i iterate self? or do a while loop like in AsyncRead /AsyncWrite ? | ||
Poll::Pending | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Basically you need to return the next action from the list of actions, but if that action is a sleep, then you need to sleep instead. And if you are polled during a sleep, you need to keep sleeping until the sleep completes.
The other mock type implements this by having a loop, where the loop first checks if its currently sleeping, then if the ready!
macro lets it get past the sleep (i.e. if the sleep has completed), then it pops the next item. If the next item is a sleep, then it sets up that new sleep and goes around the loop. Otherwise it returns the item.
tokio-test/src/io_stream.rs
Outdated
match ready!(self.inner.poll_action(_cx)) { | ||
None => { | ||
return Poll::Pending; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If poll_action
returns None
, then isn't that because we are at the end and the stream is complete? Then you should return Poll::Ready(None)
instead of Poll::Pending
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay now I see what I was doing wrong!!
I don't have any ideia why my mock.next.await is aways returning None for me. perhaps because of the cx ?
Maybe I should use a mock Context too for using pool_next(cx) ? or keep with next() but change the way that I call Action it at io_steram ?
Sorry, this PR appears to have gotten lost. Do you want to finish it? There are some CI failures. |
Yeah, I Would love to finish. It has been some time since I haven't touched Rust but I can give it another try again guess this last note can help me :
|
tokio-test/src/io_stream.rs
Outdated
impl Stream for Mock { | ||
type Item = String; | ||
|
||
fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | ||
|
||
loop { | ||
if let Some(ref mut sleep) = self.inner.sleep { | ||
ready!(Pin::new(sleep).poll(_cx)); | ||
} | ||
|
||
|
||
// If a sleep is set, it has already fired | ||
self.inner.sleep = None; | ||
match ready!(self.inner.poll_action(_cx)) { | ||
Some(action) => { | ||
self.inner.actions.push_back(action); | ||
continue; | ||
} | ||
None => { | ||
return Poll::Ready(None); | ||
} | ||
} | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The poll_next
function never returns Poll::Ready(Some(...))
anywhere, so the stream never produces any items.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OMG!! I think I got it(copilot also helped me understand)
Guess the only thing that is missing is
let now = Instant::now();
if now < until {
break;
}
} else {
self.waiting = Some(Instant::now() + *dur);
break;
}
for this method:
self.inner.sleep = Some(delay_for(dur));
}
copilot could not help in those.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The fact that you're trying to use delay_for
shows that you're looking at old documentation. It was renamed to sleep
in Tokio 1.x.y.
Please check out the documentation for Sleep. You need to use the reset
method to update the deadline. There are examples in the docs.
Co-authored-by: devensiv <[email protected]>
Hey, I'm really sorry that this got lost. Do you still want to work on it? If not, that's ok. |
This has been superseded by #5915. |
Motivation
This issue relates to #4106
Solution
Test the Stream with the
mock
as sugested in issue threadThis is a draft. I got stuck creating the solution, created this PR in hope that I could get some help