Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix compressing large chunks #11

Merged
merged 2 commits into from
May 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,6 @@ bytes = "0.4.12"
flate2 = "1.0.7"
futures-preview = "0.3.0-alpha.15"
pin-project = "0.3.2"

[dev-dependencies]
rand = "0.6.5"
14 changes: 7 additions & 7 deletions src/stream/brotli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ use std::io::Result;

use brotli2::raw::{CoStatus, CompressOp};
pub use brotli2::{raw::Compress, CompressParams};
use bytes::{BufMut, Bytes, BytesMut};
use bytes::{Bytes, BytesMut};
use futures::{ready, stream::Stream};
use pin_project::unsafe_project;

#[unsafe_project(Unpin)]
pub struct BrotliStream<S: Stream<Item = Result<Bytes>>> {
#[pin]
inner: S,
flushing: bool,
flush: bool,
compress: Compress,
}

Expand All @@ -26,14 +26,14 @@ impl<S: Stream<Item = Result<Bytes>>> Stream for BrotliStream<S> {

let this = self.project();

if *this.flushing {
if *this.flush {
return Poll::Ready(None);
}

let input_buffer = if let Some(bytes) = ready!(this.inner.poll_next(cx)) {
bytes?
} else {
*this.flushing = true;
*this.flush = true;
Bytes::new()
};

Expand All @@ -42,7 +42,7 @@ impl<S: Stream<Item = Result<Bytes>>> Stream for BrotliStream<S> {
let output_ref = &mut &mut [][..];
loop {
let status = this.compress.compress(
if *this.flushing {
if *this.flush {
CompressOp::Finish
} else {
CompressOp::Process
Expand All @@ -51,7 +51,7 @@ impl<S: Stream<Item = Result<Bytes>>> Stream for BrotliStream<S> {
output_ref,
)?;
while let Some(buf) = this.compress.take_output(None) {
compressed_output.put(buf);
compressed_output.extend_from_slice(buf);
}
match status {
CoStatus::Finished => break,
Expand All @@ -67,7 +67,7 @@ impl<S: Stream<Item = Result<Bytes>>> BrotliStream<S> {
pub fn new(stream: S, compress: Compress) -> BrotliStream<S> {
BrotliStream {
inner: stream,
flushing: false,
flush: false,
compress,
}
}
Expand Down
134 changes: 94 additions & 40 deletions src/stream/flate.rs
Original file line number Diff line number Diff line change
@@ -1,72 +1,126 @@
use core::{
use std::{
io::Result,
mem,
pin::Pin,
task::{Context, Poll},
};
use std::io::Result;

use bytes::{Bytes, BytesMut};
use flate2::FlushCompress;
pub use flate2::{Compress, Compression};
pub(crate) use flate2::Compress;
use flate2::{FlushCompress, Status};
use futures::{ready, stream::Stream};
use pin_project::unsafe_project;

#[derive(Debug)]
enum State {
Reading,
Writing(Bytes),
Flushing,
Done,
Invalid,
}

#[unsafe_project(Unpin)]
pub struct CompressedStream<S: Stream<Item = Result<Bytes>>> {
pub(crate) struct CompressedStream<S: Stream<Item = Result<Bytes>>> {
#[pin]
inner: S,
flushing: bool,
input_buffer: Bytes,
output_buffer: BytesMut,
state: State,
output: BytesMut,
compress: Compress,
}

impl<S: Stream<Item = Result<Bytes>>> Stream for CompressedStream<S> {
type Item = Result<Bytes>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> {
const OUTPUT_BUFFER_SIZE: usize = 8_000;
let mut this = self.project();

let this = self.project();
fn compress(
compress: &mut Compress,
input: &mut Bytes,
output: &mut BytesMut,
flush: FlushCompress,
) -> Result<(Status, Bytes)> {
const OUTPUT_BUFFER_SIZE: usize = 8_000;

if this.input_buffer.is_empty() {
if *this.flushing {
return Poll::Ready(None);
} else if let Some(bytes) = ready!(this.inner.poll_next(cx)) {
*this.input_buffer = bytes?;
} else {
*this.flushing = true;
if output.len() < OUTPUT_BUFFER_SIZE {
output.resize(OUTPUT_BUFFER_SIZE, 0);
}

let (prior_in, prior_out) = (compress.total_in(), compress.total_out());
let status = compress.compress(input, output, flush)?;
let input_len = compress.total_in() - prior_in;
let output_len = compress.total_out() - prior_out;

input.advance(input_len as usize);
Ok((status, output.split_to(output_len as usize).freeze()))
}

this.output_buffer.resize(OUTPUT_BUFFER_SIZE, 0);

let flush = if *this.flushing {
FlushCompress::Finish
} else {
FlushCompress::None
};

let (prior_in, prior_out) = (this.compress.total_in(), this.compress.total_out());
this.compress
.compress(this.input_buffer, this.output_buffer, flush)?;
let input = this.compress.total_in() - prior_in;
let output = this.compress.total_out() - prior_out;

this.input_buffer.advance(input as usize);
Poll::Ready(Some(Ok(this
.output_buffer
.split_to(output as usize)
.freeze())))
#[allow(clippy::never_loop)] // https://github.com/rust-lang/rust-clippy/issues/4058
loop {
break match mem::replace(this.state, State::Invalid) {
State::Reading => {
*this.state = State::Reading;
*this.state = match ready!(this.inner.as_mut().poll_next(cx)) {
Some(chunk) => State::Writing(chunk?),
None => State::Flushing,
};
continue;
}

State::Writing(mut input) => {
if input.is_empty() {
*this.state = State::Reading;
continue;
}

let (status, chunk) = compress(
&mut this.compress,
&mut input,
&mut this.output,
FlushCompress::None,
)?;

*this.state = match status {
Status::Ok => State::Writing(input),
Status::StreamEnd => unreachable!(),
Status::BufError => panic!("unexpected BufError"),
};

Poll::Ready(Some(Ok(chunk)))
}

State::Flushing => {
let (status, chunk) = compress(
&mut this.compress,
&mut Bytes::new(),
&mut this.output,
FlushCompress::Finish,
)?;

*this.state = match status {
Status::Ok => State::Flushing,
Status::StreamEnd => State::Done,
Status::BufError => panic!("unexpected BufError"),
};

Poll::Ready(Some(Ok(chunk)))
}

State::Done => Poll::Ready(None),

State::Invalid => panic!("CompressedStream reached invalid state"),
};
}
}
}

impl<S: Stream<Item = Result<Bytes>>> CompressedStream<S> {
pub fn new(stream: S, compress: Compress) -> CompressedStream<S> {
pub(crate) fn new(stream: S, compress: Compress) -> CompressedStream<S> {
CompressedStream {
inner: stream,
flushing: false,
input_buffer: Bytes::new(),
output_buffer: BytesMut::new(),
state: State::Reading,
output: BytesMut::new(),
compress,
}
}
Expand Down
Loading