Skip to content

Commit 5602efe

Browse files
committed
Allow for a same-thread doc compressor.
In addition, it isolates the doc compressor logic, better reports io::Result. In the case of the same-thread doc compressor, the blocks are also not copied.
1 parent 4d634d6 commit 5602efe

File tree

3 files changed

+250
-142
lines changed

3 files changed

+250
-142
lines changed

src/store/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ pub use self::decompressors::Decompressor;
4343
pub(crate) use self::reader::DOCSTORE_CACHE_CAPACITY;
4444
pub use self::reader::{CacheStats, StoreReader};
4545
pub use self::writer::StoreWriter;
46+
mod store_compressor;
4647

4748
#[cfg(feature = "lz4-compression")]
4849
mod compression_lz4_block;

src/store/store_compressor.rs

+238
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
use std::io::Write;
2+
use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
3+
use std::thread::JoinHandle;
4+
use std::{io, thread};
5+
6+
use common::{BinarySerializable, CountingWriter, TerminatingWrite};
7+
8+
use crate::directory::WritePtr;
9+
use crate::store::footer::DocStoreFooter;
10+
use crate::store::index::{Checkpoint, SkipIndexBuilder};
11+
use crate::store::{Compressor, Decompressor, StoreReader};
12+
use crate::DocId;
13+
14+
pub struct BlockCompressor(Impls);
15+
16+
// The struct wrapping an enum is just here to keep the
17+
// impls private.
18+
enum Impls {
19+
SameThread(BlockCompressorImpl),
20+
DedicatedThread(DedicatedThreadBlockCompressorImpl),
21+
}
22+
23+
impl BlockCompressor {
24+
pub fn new(compressor: Compressor, wrt: WritePtr, separate_thread: bool) -> io::Result<Self> {
25+
let block_compressor_impl = BlockCompressorImpl::new(compressor, wrt);
26+
if separate_thread {
27+
let dedicated_thread_compressor =
28+
DedicatedThreadBlockCompressorImpl::new(block_compressor_impl)?;
29+
Ok(BlockCompressor(Impls::DedicatedThread(
30+
dedicated_thread_compressor,
31+
)))
32+
} else {
33+
Ok(BlockCompressor(Impls::SameThread(block_compressor_impl)))
34+
}
35+
}
36+
37+
pub fn compress_block_and_write(
38+
&mut self,
39+
bytes: &[u8],
40+
num_docs_in_block: u32,
41+
) -> io::Result<()> {
42+
match &mut self.0 {
43+
Impls::SameThread(block_compressor) => {
44+
block_compressor.compress_block_and_write(bytes, num_docs_in_block)?;
45+
}
46+
Impls::DedicatedThread(different_thread_block_compressor) => {
47+
different_thread_block_compressor
48+
.compress_block_and_write(bytes, num_docs_in_block)?;
49+
}
50+
}
51+
Ok(())
52+
}
53+
54+
pub fn stack_reader(&mut self, store_reader: StoreReader) -> io::Result<()> {
55+
match &mut self.0 {
56+
Impls::SameThread(block_compressor) => {
57+
block_compressor.stack(store_reader)?;
58+
}
59+
Impls::DedicatedThread(different_thread_block_compressor) => {
60+
different_thread_block_compressor.stack_reader(store_reader)?;
61+
}
62+
}
63+
Ok(())
64+
}
65+
66+
pub fn close(self) -> io::Result<()> {
67+
let imp = self.0;
68+
match imp {
69+
Impls::SameThread(block_compressor) => block_compressor.close(),
70+
Impls::DedicatedThread(different_thread_block_compressor) => {
71+
different_thread_block_compressor.close()
72+
}
73+
}
74+
}
75+
}
76+
77+
struct BlockCompressorImpl {
78+
compressor: Compressor,
79+
first_doc_in_block: DocId,
80+
offset_index_writer: SkipIndexBuilder,
81+
intermediary_buffer: Vec<u8>,
82+
writer: CountingWriter<WritePtr>,
83+
}
84+
85+
impl BlockCompressorImpl {
86+
fn new(compressor: Compressor, writer: WritePtr) -> Self {
87+
Self {
88+
compressor,
89+
first_doc_in_block: 0,
90+
offset_index_writer: SkipIndexBuilder::new(),
91+
intermediary_buffer: Vec::new(),
92+
writer: CountingWriter::wrap(writer),
93+
}
94+
}
95+
96+
fn compress_block_and_write(&mut self, data: &[u8], num_docs_in_block: u32) -> io::Result<()> {
97+
assert!(num_docs_in_block > 0);
98+
self.intermediary_buffer.clear();
99+
self.compressor
100+
.compress_into(data, &mut self.intermediary_buffer)?;
101+
102+
let start_offset = self.writer.written_bytes() as usize;
103+
self.writer.write_all(&self.intermediary_buffer)?;
104+
let end_offset = self.writer.written_bytes() as usize;
105+
106+
self.register_checkpoint(Checkpoint {
107+
doc_range: self.first_doc_in_block..self.first_doc_in_block + num_docs_in_block,
108+
byte_range: start_offset..end_offset,
109+
});
110+
Ok(())
111+
}
112+
113+
fn register_checkpoint(&mut self, checkpoint: Checkpoint) {
114+
self.offset_index_writer.insert(checkpoint.clone());
115+
self.first_doc_in_block = checkpoint.doc_range.end;
116+
}
117+
118+
/// Stacks a store reader on top of the documents written so far.
119+
/// This method is an optimization compared to iterating over the documents
120+
/// in the store and adding them one by one, as the store's data will
121+
/// not be decompressed and then recompressed.
122+
fn stack(&mut self, store_reader: StoreReader) -> io::Result<()> {
123+
let doc_shift = self.first_doc_in_block;
124+
let start_shift = self.writer.written_bytes() as usize;
125+
126+
// just bulk write all of the block of the given reader.
127+
self.writer
128+
.write_all(store_reader.block_data()?.as_slice())?;
129+
130+
// concatenate the index of the `store_reader`, after translating
131+
// its start doc id and its start file offset.
132+
for mut checkpoint in store_reader.block_checkpoints() {
133+
checkpoint.doc_range.start += doc_shift;
134+
checkpoint.doc_range.end += doc_shift;
135+
checkpoint.byte_range.start += start_shift;
136+
checkpoint.byte_range.end += start_shift;
137+
self.register_checkpoint(checkpoint);
138+
}
139+
Ok(())
140+
}
141+
142+
fn close(mut self) -> io::Result<()> {
143+
let header_offset: u64 = self.writer.written_bytes() as u64;
144+
let docstore_footer =
145+
DocStoreFooter::new(header_offset, Decompressor::from(self.compressor));
146+
self.offset_index_writer.serialize_into(&mut self.writer)?;
147+
docstore_footer.serialize(&mut self.writer)?;
148+
self.writer.terminate()
149+
}
150+
}
151+
152+
// ---------------------------------
153+
154+
enum BlockCompressorMessage {
155+
CompressBlockAndWrite {
156+
block_data: Vec<u8>,
157+
num_docs_in_block: u32,
158+
},
159+
Stack(StoreReader),
160+
}
161+
162+
struct DedicatedThreadBlockCompressorImpl {
163+
join_handle: Option<JoinHandle<io::Result<()>>>,
164+
tx: SyncSender<BlockCompressorMessage>,
165+
}
166+
167+
impl DedicatedThreadBlockCompressorImpl {
168+
fn new(mut block_compressor: BlockCompressorImpl) -> io::Result<Self> {
169+
let (tx, rx): (
170+
SyncSender<BlockCompressorMessage>,
171+
Receiver<BlockCompressorMessage>,
172+
) = sync_channel(3);
173+
let join_handle = thread::Builder::new()
174+
.name("docstore-compressor-thread".to_string())
175+
.spawn(move || {
176+
while let Ok(packet) = rx.recv() {
177+
match packet {
178+
BlockCompressorMessage::CompressBlockAndWrite {
179+
block_data,
180+
num_docs_in_block,
181+
} => {
182+
block_compressor
183+
.compress_block_and_write(&block_data[..], num_docs_in_block)?;
184+
}
185+
BlockCompressorMessage::Stack(store_reader) => {
186+
block_compressor.stack(store_reader)?;
187+
}
188+
}
189+
}
190+
block_compressor.close()?;
191+
Ok(())
192+
})?;
193+
Ok(DedicatedThreadBlockCompressorImpl {
194+
join_handle: Some(join_handle),
195+
tx,
196+
})
197+
}
198+
199+
fn compress_block_and_write(&mut self, bytes: &[u8], num_docs_in_block: u32) -> io::Result<()> {
200+
self.send(BlockCompressorMessage::CompressBlockAndWrite {
201+
block_data: bytes.to_vec(),
202+
num_docs_in_block,
203+
})
204+
}
205+
206+
fn stack_reader(&mut self, store_reader: StoreReader) -> io::Result<()> {
207+
self.send(BlockCompressorMessage::Stack(store_reader))
208+
}
209+
210+
fn send(&mut self, msg: BlockCompressorMessage) -> io::Result<()> {
211+
if self.tx.send(msg).is_err() {
212+
self.harvest_result()?;
213+
return Err(io::Error::new(io::ErrorKind::Other, "Unidentified error."));
214+
}
215+
Ok(())
216+
}
217+
218+
fn harvest_result(&mut self) -> io::Result<()> {
219+
let join_handle = self
220+
.join_handle
221+
.take()
222+
.ok_or_else(|| io::Error::new(io::ErrorKind::Other, "Thread already joined."))?;
223+
join_handle
224+
.join()
225+
.map_err(|_err| io::Error::new(io::ErrorKind::Other, "Compressing thread panicked."))?
226+
}
227+
228+
fn close(mut self) -> io::Result<()> {
229+
drop(self.tx);
230+
let join_handle = self
231+
.join_handle
232+
.take()
233+
.ok_or_else(|| io::Error::new(io::ErrorKind::Other, "Thread already joined."))?;
234+
join_handle
235+
.join()
236+
.map_err(|_err| io::Error::new(io::ErrorKind::Other, "Compressing thread panicked."))?
237+
}
238+
}

0 commit comments

Comments
 (0)