Skip to content

Commit 66bffeb

Browse files
committed
EncoderWriter: no longer adhere to ‘at most one write’
The wording around Write::write method is changing with requirement that it maps to ‘at most one write’ being removed¹. With that, change EncoderWriter::write so that it flushes entire output buffer at the beginning and then proceeds to process new input. This eliminates returning Ok(0) which is effectively an error. Also, change accounting for the occupied portion of the output buffer. Rather than just having occupied length, track occupied range which means moving data to front is no longer necessary. ¹ rust-lang/rust#107200
1 parent 92e94d2 commit 66bffeb

File tree

2 files changed

+129
-139
lines changed

2 files changed

+129
-139
lines changed

src/write/encoder.rs

+106-115
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
use crate::engine::Engine;
2-
use std::{
3-
cmp, fmt, io,
4-
io::{ErrorKind, Result},
5-
};
2+
use std::io::ErrorKind;
3+
use std::{cmp, fmt, io};
64

75
pub(crate) const BUF_SIZE: usize = 1024;
86
/// The most bytes whose encoding will fit in `BUF_SIZE`
@@ -53,13 +51,6 @@ const MIN_ENCODE_CHUNK_SIZE: usize = 3;
5351
///
5452
/// It has some minor performance loss compared to encoding slices (a couple percent).
5553
/// It does not do any heap allocation.
56-
///
57-
/// # Limitations
58-
///
59-
/// Owing to the specification of the `write` and `flush` methods on the `Write` trait and their
60-
/// implications for a buffering implementation, these methods may not behave as expected. In
61-
/// particular, calling `write_all` on this interface may fail with `io::ErrorKind::WriteZero`.
62-
/// See the documentation of the `Write` trait implementation for further details.
6354
pub struct EncoderWriter<'e, E: Engine, W: io::Write> {
6455
engine: &'e E,
6556
/// Where encoded data is written to. It's an Option as it's None immediately before Drop is
@@ -74,21 +65,27 @@ pub struct EncoderWriter<'e, E: Engine, W: io::Write> {
7465
/// Buffer to encode into. May hold leftover encoded bytes from a previous write call that the underlying writer
7566
/// did not write last time.
7667
output: [u8; BUF_SIZE],
77-
/// How much of `output` is occupied with encoded data that couldn't be written last time
78-
output_occupied_len: usize,
68+
/// Occupied portion of output.
69+
///
70+
/// Invariant for the range is that it’s either 0..0 or 0 ≤ start < end ≤
71+
/// BUF_SIZE. This means that if the range is empty, it’s 0..0.
72+
output_range: std::ops::Range<usize>,
7973
/// panic safety: don't write again in destructor if writer panicked while we were writing to it
8074
panicked: bool,
8175
}
8276

8377
impl<'e, E: Engine, W: io::Write> fmt::Debug for EncoderWriter<'e, E, W> {
8478
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
79+
let range = self.output_range.clone();
80+
let truncated_len = range.len().min(5);
81+
let truncated_range = range.start..range.start + truncated_len;
8582
write!(
8683
f,
87-
"extra_input: {:?} extra_input_occupied_len:{:?} output[..5]: {:?} output_occupied_len: {:?}",
88-
self.extra_input,
89-
self.extra_input_occupied_len,
90-
&self.output[0..5],
91-
self.output_occupied_len
84+
"extra_input: {:?} occupied output[..{}]: {:?} output_range: {:?}",
85+
&self.extra_input[..self.extra_input_occupied_len],
86+
truncated_len,
87+
&self.output[truncated_range],
88+
range,
9289
)
9390
}
9491
}
@@ -102,7 +99,7 @@ impl<'e, E: Engine, W: io::Write> EncoderWriter<'e, E, W> {
10299
extra_input: [0u8; MIN_ENCODE_CHUNK_SIZE],
103100
extra_input_occupied_len: 0,
104101
output: [0u8; BUF_SIZE],
105-
output_occupied_len: 0,
102+
output_range: 0..0,
106103
panicked: false,
107104
}
108105
}
@@ -123,7 +120,7 @@ impl<'e, E: Engine, W: io::Write> EncoderWriter<'e, E, W> {
123120
/// # Errors
124121
///
125122
/// The first error that is not of `ErrorKind::Interrupted` will be returned.
126-
pub fn finish(&mut self) -> Result<W> {
123+
pub fn finish(&mut self) -> io::Result<W> {
127124
// If we could consume self in finish(), we wouldn't have to worry about this case, but
128125
// finish() is retryable in the face of I/O errors, so we can't consume here.
129126
if self.delegate.is_none() {
@@ -138,91 +135,96 @@ impl<'e, E: Engine, W: io::Write> EncoderWriter<'e, E, W> {
138135
}
139136

140137
/// Write any remaining buffered data to the delegate writer.
141-
fn write_final_leftovers(&mut self) -> Result<()> {
138+
fn write_final_leftovers(&mut self) -> io::Result<()> {
142139
if self.delegate.is_none() {
143140
// finish() has already successfully called this, and we are now in drop() with a None
144141
// writer, so just no-op
145142
return Ok(());
146143
}
147144

148-
self.write_all_encoded_output()?;
149-
150145
if self.extra_input_occupied_len > 0 {
146+
// Make sure output isn’t full so we can append to it.
147+
if self.output_range.end == self.output.len() {
148+
self.flush_all_output()?;
149+
}
150+
151151
let encoded_len = self
152152
.engine
153153
.encode_slice(
154154
&self.extra_input[..self.extra_input_occupied_len],
155-
&mut self.output[..],
155+
&mut self.output[self.output_range.end..],
156156
)
157157
.expect("buffer is large enough");
158158

159-
self.output_occupied_len = encoded_len;
160-
161-
self.write_all_encoded_output()?;
162-
163-
// write succeeded, do not write the encoding of extra again if finish() is retried
159+
self.output_range.end += encoded_len;
164160
self.extra_input_occupied_len = 0;
165161
}
166162

167-
Ok(())
163+
self.flush_all_output()
168164
}
169165

170-
/// Write as much of the encoded output to the delegate writer as it will accept, and store the
171-
/// leftovers to be attempted at the next write() call. Updates `self.output_occupied_len`.
166+
/// Flushes output buffer to the delegate.
172167
///
173-
/// # Errors
168+
/// Loops writing data to the delegate until output buffer is empty or
169+
/// delegate returns an error. An `Ok(0)` return from the delegate is
170+
/// treated as an error.
174171
///
175-
/// Errors from the delegate writer are returned. In the case of an error,
176-
/// `self.output_occupied_len` will not be updated, as errors from `write` are specified to mean
177-
/// that no write took place.
178-
fn write_to_delegate(&mut self, current_output_len: usize) -> Result<()> {
179-
self.panicked = true;
180-
let res = self
181-
.delegate
182-
.as_mut()
183-
.expect("Writer must be present")
184-
.write(&self.output[..current_output_len]);
185-
self.panicked = false;
186-
187-
res.map(|consumed| {
188-
debug_assert!(consumed <= current_output_len);
189-
190-
if consumed < current_output_len {
191-
self.output_occupied_len = current_output_len.checked_sub(consumed).unwrap();
192-
// If we're blocking on I/O, the minor inefficiency of copying bytes to the
193-
// start of the buffer is the least of our concerns...
194-
// TODO Rotate moves more than we need to; copy_within now stable.
195-
self.output.rotate_left(consumed);
196-
} else {
197-
self.output_occupied_len = 0;
172+
/// Updates `output_range` accordingly.
173+
fn flush_output(&mut self) -> Option<io::Result<usize>> {
174+
if self.output_range.end == 0 {
175+
return None;
176+
}
177+
loop {
178+
match self.write_to_delegate(self.output_range.clone()) {
179+
Ok(0) => break Some(Ok(0)),
180+
Ok(n) if n >= self.output_range.len() => {
181+
self.output_range = 0..0;
182+
break None;
183+
}
184+
Ok(n) => self.output_range.start += n,
185+
Err(err) => break Some(Err(err)),
198186
}
199-
})
187+
}
200188
}
201189

202-
/// Write all buffered encoded output. If this returns `Ok`, `self.output_occupied_len` is `0`.
203-
///
204-
/// This is basically write_all for the remaining buffered data but without the undesirable
205-
/// abort-on-`Ok(0)` behavior.
206-
///
207-
/// # Errors
190+
/// Flushes output buffer to the delegate ignoring interruptions.
208191
///
209-
/// Any error emitted by the delegate writer abort the write loop and is returned, unless it's
210-
/// `Interrupted`, in which case the error is ignored and writes will continue.
211-
fn write_all_encoded_output(&mut self) -> Result<()> {
212-
while self.output_occupied_len > 0 {
213-
let remaining_len = self.output_occupied_len;
214-
match self.write_to_delegate(remaining_len) {
215-
// try again on interrupts ala write_all
216-
Err(ref e) if e.kind() == ErrorKind::Interrupted => {}
217-
// other errors return
218-
Err(e) => return Err(e),
219-
// success no-ops because remaining length is already updated
220-
Ok(_) => {}
221-
};
192+
/// Like [`Self::flush_output`] but ignores [`ErrorKind::Interrupted`]
193+
/// errors and converts `Ok(0)` to [`ErrorKind::WriteZero`].
194+
fn flush_all_output(&mut self) -> io::Result<()> {
195+
if self.output_range.end == 0 {
196+
return Ok(());
222197
}
198+
loop {
199+
match self.write_to_delegate(self.output_range.clone()) {
200+
Ok(0) => {
201+
break Err(io::Error::new(
202+
io::ErrorKind::WriteZero,
203+
"failed to write whole buffer",
204+
))
205+
}
206+
Ok(n) if n >= self.output_range.len() => {
207+
self.output_range = 0..0;
208+
break Ok(());
209+
}
210+
Ok(n) => self.output_range.start += n,
211+
Err(err) if err.kind() == ErrorKind::Interrupted => (),
212+
Err(err) => break Err(err),
213+
}
214+
}
215+
}
223216

224-
debug_assert_eq!(0, self.output_occupied_len);
225-
Ok(())
217+
/// Writes given range of output buffer to the delegate. Performs exactly
218+
/// one write. Sets `panicked` to `true` if delegate panics.
219+
fn write_to_delegate(&mut self, range: std::ops::Range<usize>) -> io::Result<usize> {
220+
self.panicked = true;
221+
let res = self
222+
.delegate
223+
.as_mut()
224+
.expect("Encoder has already had finish() called")
225+
.write(&self.output[range]);
226+
self.panicked = false;
227+
res
226228
}
227229

228230
/// Unwraps this `EncoderWriter`, returning the base writer it writes base64 encoded output
@@ -262,38 +264,24 @@ impl<'e, E: Engine, W: io::Write> io::Write for EncoderWriter<'e, E, W> {
262264
/// # Errors
263265
///
264266
/// Any errors emitted by the delegate writer are returned.
265-
fn write(&mut self, input: &[u8]) -> Result<usize> {
267+
fn write(&mut self, input: &[u8]) -> io::Result<usize> {
266268
if self.delegate.is_none() {
267269
panic!("Cannot write more after calling finish()");
268270
}
269271

270-
if input.is_empty() {
271-
return Ok(0);
272+
if let Some(res) = self.flush_output() {
273+
return res;
272274
}
275+
debug_assert_eq!(0, self.output_range.len());
273276

274-
// The contract of `Write::write` places some constraints on this implementation:
275-
// - a call to `write()` represents at most one call to a wrapped `Write`, so we can't
276-
// iterate over the input and encode multiple chunks.
277-
// - Errors mean that "no bytes were written to this writer", so we need to reset the
278-
// internal state to what it was before the error occurred
279-
280-
// before reading any input, write any leftover encoded output from last time
281-
if self.output_occupied_len > 0 {
282-
let current_len = self.output_occupied_len;
283-
return self
284-
.write_to_delegate(current_len)
285-
// did not read any input
286-
.map(|_| 0);
277+
if input.is_empty() {
278+
return Ok(0);
287279
}
288280

289-
debug_assert_eq!(0, self.output_occupied_len);
290-
291281
// how many bytes, if any, were read into `extra` to create a triple to encode
292282
let mut extra_input_read_len = 0;
293283
let mut input = input;
294284

295-
let orig_extra_len = self.extra_input_occupied_len;
296-
297285
let mut encoded_size = 0;
298286
// always a multiple of MIN_ENCODE_CHUNK_SIZE
299287
let mut max_input_len = MAX_INPUT_LEN;
@@ -322,8 +310,10 @@ impl<'e, E: Engine, W: io::Write> io::Write for EncoderWriter<'e, E, W> {
322310

323311
input = &input[extra_input_read_len..];
324312

325-
// consider extra to be used up, since we encoded it
326-
self.extra_input_occupied_len = 0;
313+
// Note: Not updating self.extra_input_occupied_len yet. It’s
314+
// going to be zeroed at the end of the function if we
315+
// successfully write some data to delegate.
316+
327317
// don't clobber where we just encoded to
328318
encoded_size = 4;
329319
// and don't read more than can be encoded
@@ -367,29 +357,30 @@ impl<'e, E: Engine, W: io::Write> io::Write for EncoderWriter<'e, E, W> {
367357
&mut self.output[encoded_size..],
368358
);
369359

370-
// not updating `self.output_occupied_len` here because if the below write fails, it should
371-
// "never take place" -- the buffer contents we encoded are ignored and perhaps retried
372-
// later, if the consumer chooses.
373-
374-
self.write_to_delegate(encoded_size)
375-
// no matter whether we wrote the full encoded buffer or not, we consumed the same
376-
// input
377-
.map(|_| extra_input_read_len + input_chunks_to_encode_len)
378-
.map_err(|e| {
379-
// in case we filled and encoded `extra`, reset extra_len
380-
self.extra_input_occupied_len = orig_extra_len;
360+
// Not updating `self.output_range` here because if the write fails, it
361+
// should "never take place" -- the buffer contents we encoded are
362+
// ignored and perhaps retried later, if the consumer chooses.
381363

382-
e
383-
})
364+
self.write_to_delegate(0..encoded_size).map(|written| {
365+
if written < encoded_size {
366+
// Update output range to portion which is yet to be written.
367+
self.output_range = written..encoded_size;
368+
} else {
369+
// Everything was written, leave output range empty.
370+
debug_assert_eq!(0..0, self.output_range);
371+
}
372+
self.extra_input_occupied_len = 0;
373+
extra_input_read_len + input_chunks_to_encode_len
374+
})
384375
}
385376

386377
/// Because this is usually treated as OK to call multiple times, it will *not* flush any
387378
/// incomplete chunks of input or write padding.
388379
/// # Errors
389380
///
390381
/// The first error that is not of [`ErrorKind::Interrupted`] will be returned.
391-
fn flush(&mut self) -> Result<()> {
392-
self.write_all_encoded_output()?;
382+
fn flush(&mut self) -> io::Result<()> {
383+
self.flush_all_output()?;
393384
self.delegate
394385
.as_mut()
395386
.expect("Writer must be present")

0 commit comments

Comments
 (0)