Skip to content

Commit e3e59a7

Browse files
committed
Allow for a same-thread doc compressor.
In addition, it isolates the doc compressor logic, better reports io::Result. In the case of the same-thread doc compressor, the blocks are also not copied.
1 parent 4d634d6 commit e3e59a7

File tree

3 files changed

+269
-142
lines changed

3 files changed

+269
-142
lines changed

src/store/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ pub use self::decompressors::Decompressor;
4343
pub(crate) use self::reader::DOCSTORE_CACHE_CAPACITY;
4444
pub use self::reader::{CacheStats, StoreReader};
4545
pub use self::writer::StoreWriter;
46+
mod store_compressor;
4647

4748
#[cfg(feature = "lz4-compression")]
4849
mod compression_lz4_block;

src/store/store_compressor.rs

+257
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,257 @@
1+
// Copyright (C) 2022 Quickwit, Inc.
2+
//
3+
// Quickwit is offered under the AGPL v3.0 and as commercial software.
4+
// For commercial licensing, contact us at [email protected].
5+
//
6+
// AGPL:
7+
// This program is free software: you can redistribute it and/or modify
8+
// it under the terms of the GNU Affero General Public License as
9+
// published by the Free Software Foundation, either version 3 of the
10+
// License, or (at your option) any later version.
11+
//
12+
// This program is distributed in the hope that it will be useful,
13+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
// GNU Affero General Public License for more details.
16+
//
17+
// You should have received a copy of the GNU Affero General Public License
18+
// along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
20+
use std::io::Write;
21+
use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
22+
use std::thread::JoinHandle;
23+
use std::{io, thread};
24+
25+
use common::{BinarySerializable, CountingWriter, TerminatingWrite};
26+
27+
use crate::directory::WritePtr;
28+
use crate::store::footer::DocStoreFooter;
29+
use crate::store::index::{Checkpoint, SkipIndexBuilder};
30+
use crate::store::{Compressor, Decompressor, StoreReader};
31+
use crate::DocId;
32+
33+
pub struct BlockCompressor(Impls);
34+
35+
// The struct wrapping an enum is just here to keep the
36+
// impls private.
37+
enum Impls {
38+
SameThread(BlockCompressorImpl),
39+
DedicatedThread(DedicatedThreadBlockCompressorImpl),
40+
}
41+
42+
impl BlockCompressor {
43+
pub fn new(compressor: Compressor, wrt: WritePtr, separate_thread: bool) -> io::Result<Self> {
44+
let block_compressor_impl = BlockCompressorImpl::new(compressor, wrt);
45+
if separate_thread {
46+
let dedicated_thread_compressor =
47+
DedicatedThreadBlockCompressorImpl::new(block_compressor_impl)?;
48+
Ok(BlockCompressor(Impls::DedicatedThread(
49+
dedicated_thread_compressor,
50+
)))
51+
} else {
52+
Ok(BlockCompressor(Impls::SameThread(block_compressor_impl)))
53+
}
54+
}
55+
56+
pub fn compress_block_and_write(
57+
&mut self,
58+
bytes: &[u8],
59+
num_docs_in_block: u32,
60+
) -> io::Result<()> {
61+
match &mut self.0 {
62+
Impls::SameThread(block_compressor) => {
63+
block_compressor.compress_block_and_write(bytes, num_docs_in_block)?;
64+
}
65+
Impls::DedicatedThread(different_thread_block_compressor) => {
66+
different_thread_block_compressor
67+
.compress_block_and_write(bytes, num_docs_in_block)?;
68+
}
69+
}
70+
Ok(())
71+
}
72+
73+
pub fn stack_reader(&mut self, store_reader: StoreReader) -> io::Result<()> {
74+
match &mut self.0 {
75+
Impls::SameThread(block_compressor) => {
76+
block_compressor.stack(store_reader)?;
77+
}
78+
Impls::DedicatedThread(different_thread_block_compressor) => {
79+
different_thread_block_compressor.stack_reader(store_reader)?;
80+
}
81+
}
82+
Ok(())
83+
}
84+
85+
pub fn close(self) -> io::Result<()> {
86+
let imp = self.0;
87+
match imp {
88+
Impls::SameThread(block_compressor) => block_compressor.close(),
89+
Impls::DedicatedThread(different_thread_block_compressor) => {
90+
different_thread_block_compressor.close()
91+
}
92+
}
93+
}
94+
}
95+
96+
struct BlockCompressorImpl {
97+
compressor: Compressor,
98+
first_doc_in_block: DocId,
99+
offset_index_writer: SkipIndexBuilder,
100+
intermediary_buffer: Vec<u8>,
101+
writer: CountingWriter<WritePtr>,
102+
}
103+
104+
impl BlockCompressorImpl {
105+
fn new(compressor: Compressor, writer: WritePtr) -> Self {
106+
Self {
107+
compressor,
108+
first_doc_in_block: 0,
109+
offset_index_writer: SkipIndexBuilder::new(),
110+
intermediary_buffer: Vec::new(),
111+
writer: CountingWriter::wrap(writer),
112+
}
113+
}
114+
115+
fn compress_block_and_write(&mut self, data: &[u8], num_docs_in_block: u32) -> io::Result<()> {
116+
assert!(num_docs_in_block > 0);
117+
self.intermediary_buffer.clear();
118+
self.compressor
119+
.compress_into(data, &mut self.intermediary_buffer)?;
120+
121+
let start_offset = self.writer.written_bytes() as usize;
122+
self.writer.write_all(&self.intermediary_buffer)?;
123+
let end_offset = self.writer.written_bytes() as usize;
124+
125+
self.register_checkpoint(Checkpoint {
126+
doc_range: self.first_doc_in_block..self.first_doc_in_block + num_docs_in_block,
127+
byte_range: start_offset..end_offset,
128+
});
129+
Ok(())
130+
}
131+
132+
fn register_checkpoint(&mut self, checkpoint: Checkpoint) {
133+
self.offset_index_writer.insert(checkpoint.clone());
134+
self.first_doc_in_block = checkpoint.doc_range.end;
135+
}
136+
137+
/// Stacks a store reader on top of the documents written so far.
138+
/// This method is an optimization compared to iterating over the documents
139+
/// in the store and adding them one by one, as the store's data will
140+
/// not be decompressed and then recompressed.
141+
fn stack(&mut self, store_reader: StoreReader) -> io::Result<()> {
142+
let doc_shift = self.first_doc_in_block;
143+
let start_shift = self.writer.written_bytes() as usize;
144+
145+
// just bulk write all of the block of the given reader.
146+
self.writer
147+
.write_all(store_reader.block_data()?.as_slice())?;
148+
149+
// concatenate the index of the `store_reader`, after translating
150+
// its start doc id and its start file offset.
151+
for mut checkpoint in store_reader.block_checkpoints() {
152+
checkpoint.doc_range.start += doc_shift;
153+
checkpoint.doc_range.end += doc_shift;
154+
checkpoint.byte_range.start += start_shift;
155+
checkpoint.byte_range.end += start_shift;
156+
self.register_checkpoint(checkpoint);
157+
}
158+
Ok(())
159+
}
160+
161+
fn close(mut self) -> io::Result<()> {
162+
let header_offset: u64 = self.writer.written_bytes() as u64;
163+
let docstore_footer =
164+
DocStoreFooter::new(header_offset, Decompressor::from(self.compressor));
165+
self.offset_index_writer.serialize_into(&mut self.writer)?;
166+
docstore_footer.serialize(&mut self.writer)?;
167+
self.writer.terminate()
168+
}
169+
}
170+
171+
// ---------------------------------
172+
173+
enum BlockCompressorMessage {
174+
CompressBlockAndWrite {
175+
block_data: Vec<u8>,
176+
num_docs_in_block: u32,
177+
},
178+
Stack(StoreReader),
179+
}
180+
181+
struct DedicatedThreadBlockCompressorImpl {
182+
join_handle: Option<JoinHandle<io::Result<()>>>,
183+
tx: SyncSender<BlockCompressorMessage>,
184+
}
185+
186+
impl DedicatedThreadBlockCompressorImpl {
187+
fn new(mut block_compressor: BlockCompressorImpl) -> io::Result<Self> {
188+
let (tx, rx): (
189+
SyncSender<BlockCompressorMessage>,
190+
Receiver<BlockCompressorMessage>,
191+
) = sync_channel(3);
192+
let join_handle = thread::Builder::new()
193+
.name("docstore-compressor-thread".to_string())
194+
.spawn(move || {
195+
while let Ok(packet) = rx.recv() {
196+
match packet {
197+
BlockCompressorMessage::CompressBlockAndWrite {
198+
block_data,
199+
num_docs_in_block,
200+
} => {
201+
block_compressor
202+
.compress_block_and_write(&block_data[..], num_docs_in_block)?;
203+
}
204+
BlockCompressorMessage::Stack(store_reader) => {
205+
block_compressor.stack(store_reader)?;
206+
}
207+
}
208+
}
209+
block_compressor.close()?;
210+
Ok(())
211+
})?;
212+
Ok(DedicatedThreadBlockCompressorImpl {
213+
join_handle: Some(join_handle),
214+
tx,
215+
})
216+
}
217+
218+
fn compress_block_and_write(&mut self, bytes: &[u8], num_docs_in_block: u32) -> io::Result<()> {
219+
self.send(BlockCompressorMessage::CompressBlockAndWrite {
220+
block_data: bytes.to_vec(),
221+
num_docs_in_block,
222+
})
223+
}
224+
225+
fn stack_reader(&mut self, store_reader: StoreReader) -> io::Result<()> {
226+
self.send(BlockCompressorMessage::Stack(store_reader))
227+
}
228+
229+
fn send(&mut self, msg: BlockCompressorMessage) -> io::Result<()> {
230+
if self.tx.send(msg).is_err() {
231+
self.harvest_result()?;
232+
return Err(io::Error::new(io::ErrorKind::Other, "Unidentified error."));
233+
}
234+
Ok(())
235+
}
236+
237+
fn harvest_result(&mut self) -> io::Result<()> {
238+
let join_handle = self
239+
.join_handle
240+
.take()
241+
.ok_or_else(|| io::Error::new(io::ErrorKind::Other, "Thread already joined."))?;
242+
join_handle
243+
.join()
244+
.map_err(|_err| io::Error::new(io::ErrorKind::Other, "Compressing thread panicked."))?
245+
}
246+
247+
fn close(mut self) -> io::Result<()> {
248+
drop(self.tx);
249+
let join_handle = self
250+
.join_handle
251+
.take()
252+
.ok_or_else(|| io::Error::new(io::ErrorKind::Other, "Thread already joined."))?;
253+
join_handle
254+
.join()
255+
.map_err(|_err| io::Error::new(io::ErrorKind::Other, "Compressing thread panicked."))?
256+
}
257+
}

0 commit comments

Comments
 (0)