Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ZstdEncoder should not panic if tokio::io::AsyncWrite::flush is called after tokio::io::AsyncWrite::shutdown #246

Closed
aatifsyed opened this issue Sep 14, 2023 · 4 comments
Labels
bug Something isn't working

Comments

@aatifsyed
Copy link

aatifsyed commented Sep 14, 2023

I'm hitting a Flush after shutdown panic, ultimately here:

State::Finishing | State::Done => panic!("Flush after shutdown"),

Repro

Here's an minimal example, which reliably panics on my machine.

use async_compression::tokio::write::ZstdEncoder; // 0.4.3
use bytes::Bytes; // 1.5.0
use futures::{stream, StreamExt as _}; // 0.3.28
use std::io; // 1.72.0
use tokio::fs::File; // 1.32.0
use tokio_util::codec::{BytesCodec, FramedWrite}; // 0.7.8

async fn _main() -> io::Result<()> {
    let file = File::create("/dev/null").await?;
    let zstd_encoder = ZstdEncoder::new(file);
    let bytes_sink = FramedWrite::new(zstd_encoder, BytesCodec::new());
    stream::empty::<io::Result<Bytes>>()
        .forward(bytes_sink)
        .await
}

#[tokio::main]
async fn main() -> io::Result<()> {
    _main().await
}

Explanation

Here's the sequence of events:

  ┌───────────────────────────────────┐ ┌──────────────────────────────────┬────────────────────────────────┐ 
  │ file::poll_flush -> Ready(Ok(())) │ │ file::poll_write -> Ready(Ok(9)) │ file::poll_shutdown -> Pending │ 
 ┌┴───────────────────────────────────┴┬┴──────────────────────────────────┴────────────────────────────────┴┐ ┌─────────────BOOM
 │  zstd::poll_flush -> Ready(Ok(()))  │                    zstd::poll_shutdown -> Pending                   │ │ zstd::poll_flush
┌┴─────────────────────────────────────┴─────────────────────────────────────────────────────────────────────┴┬┴─────────────BOOM
│                                      sink::poll_close -> Pending                                            │  sink::poll_close
└─────────────────────────────────────────────────────────────────────────────────────────────────────────────┴──────────────BOOM
  • The sink needs to close
  • It flushes the encoder, which flushes the file (writing the zstd header to disk)
  • It shutdowns the encoder, which shutdowns the file, but the file returns Pending.
  • The sink tries to close again, when the file is ready.
  • It flushes the encoder again, which panics

Discussion

The bug is basically an interaction between the above code, and this code in tokio_util:

    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        ready!(self.as_mut().poll_flush(cx))?;
        ready!(self.project().inner.poll_shutdown(cx))?;

        Poll::Ready(Ok(()))
    }

on docs.rs / on github

A fix could be either:

  1. tokio_util::codec keeps track of whether it has flushed the inner reader in poll_close (not flushing it twice)
  2. Remove the panic in async_compression, or enhance its state machine to address the above.

I think the right fix is (2):

The documentation for AsyncWrite doesn't say that you're not allowed to flush after calling shutdown, in fact, I think implementors should be prepared to handle such a case, at least until shutdown returns Poll::Ready.

Take the following from the docs:

Invocation of a shutdown implies an invocation of flush. Once this method returns Ready it implies that a flush successfully happened before the shutdown happened. That is, callers don’t need to call flush before calling shutdown. They can rely that by calling shutdown any pending buffered data will be written out.

So following the API, I could write a simple Transparent<T: AsyncWrite> wrapper:

pin_project! {
struct Transparent<T> {
    #[pin]
    inner: T
}}

impl<T> AsyncWrite for Transparent<T>
where
    T: AsyncWrite,
{
    fn poll_write(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &[u8],
    ) -> Poll<io::Result<usize>> {
        self.project().inner.poll_write(cx, buf)
    }

    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
        self.project().inner.poll_flush(cx)
    }

    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
        let mut this = self.project();
        ready!(this.inner.as_mut().poll_flush(cx))?;
        this.inner.poll_shutdown(cx)
    }
}

impl<T> Transparent<T> {
    fn new(inner: T) -> Self {
        Self { inner }
    }
}

which, of course, panics if it contains a ZstdEncoder.

In fact, with the appropriate interleaving of Poll::Pending, ZstdEncoder::new(ZstdEncoder::new(...)) will panic.

I'm pretty sure this affects all tokio codecs in this crate.

@robjtede
Copy link
Member

should be fixed by #255 pending release tomorrow

@aatifsyed
Copy link
Author

I think this is still a logic bug - the encoder should not return an error in the given case.

Would you prefer I reword the error issue or open a new one?

@robjtede
Copy link
Member

Okay. I'd just tested your repro of the panic and saw that it didn't on master.

New issue would be great please.

@aatifsyed
Copy link
Author

New issue would be great please.

#308

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants