Skip to content

Commit 89e19f1

Browse files
authored
Merge pull request #1374 from kryesh/main
Add Zstd compression support, Make block size configurable via IndexSettings
2 parents 1a6a139 + c95013b commit 89e19f1

10 files changed

+135
-19
lines changed

.github/workflows/test.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ jobs:
3333
components: rustfmt, clippy
3434

3535
- name: Run tests
36-
run: cargo +stable test --features mmap,brotli-compression,lz4-compression,snappy-compression,failpoints --verbose --workspace
36+
run: cargo +stable test --features mmap,brotli-compression,lz4-compression,snappy-compression,zstd-compression,failpoints --verbose --workspace
3737

3838
- name: Run tests quickwit feature
3939
run: cargo +stable test --features mmap,quickwit,failpoints --verbose --workspace

Cargo.toml

+2
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ tantivy-fst = "0.3.0"
2323
memmap2 = { version = "0.5.3", optional = true }
2424
lz4_flex = { version = "0.9.2", default-features = false, features = ["checked-decode"], optional = true }
2525
brotli = { version = "3.3.4", optional = true }
26+
zstd = { version = "0.11", optional = true }
2627
snap = { version = "1.0.5", optional = true }
2728
tempfile = { version = "3.3.0", optional = true }
2829
log = "0.4.16"
@@ -93,6 +94,7 @@ mmap = ["fs2", "tempfile", "memmap2"]
9394
brotli-compression = ["brotli"]
9495
lz4-compression = ["lz4_flex"]
9596
snappy-compression = ["snap"]
97+
zstd-compression = ["zstd"]
9698

9799
failpoints = ["fail/failpoints"]
98100
unstable = [] # useful for benches.

src/core/index_meta.rs

+21-2
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ impl InnerSegmentMeta {
239239
///
240240
/// Contains settings which are applied on the whole
241241
/// index, like presort documents.
242-
#[derive(Clone, Debug, Default, Serialize, Deserialize, Eq, PartialEq)]
242+
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
243243
pub struct IndexSettings {
244244
/// Sorts the documents by information
245245
/// provided in `IndexSortByField`
@@ -248,7 +248,26 @@ pub struct IndexSettings {
248248
/// The `Compressor` used to compress the doc store.
249249
#[serde(default)]
250250
pub docstore_compression: Compressor,
251+
#[serde(default = "default_docstore_blocksize")]
252+
/// The size of each block that will be compressed and written to disk
253+
pub docstore_blocksize: usize,
254+
}
255+
256+
/// Must be a function to be compatible with serde defaults
257+
fn default_docstore_blocksize() -> usize {
258+
16_384
259+
}
260+
261+
impl Default for IndexSettings {
262+
fn default() -> Self {
263+
Self {
264+
sort_by_field: None,
265+
docstore_compression: Compressor::default(),
266+
docstore_blocksize: default_docstore_blocksize(),
267+
}
268+
}
251269
}
270+
252271
/// Settings to presort the documents in an index
253272
///
254273
/// Presorting documents can greatly performance
@@ -401,7 +420,7 @@ mod tests {
401420
let json = serde_json::ser::to_string(&index_metas).expect("serialization failed");
402421
assert_eq!(
403422
json,
404-
r#"{"index_settings":{"sort_by_field":{"field":"text","order":"Asc"},"docstore_compression":"lz4"},"segments":[],"schema":[{"name":"text","type":"text","options":{"indexing":{"record":"position","fieldnorms":true,"tokenizer":"default"},"stored":false,"fast":false}}],"opstamp":0}"#
423+
r#"{"index_settings":{"sort_by_field":{"field":"text","order":"Asc"},"docstore_compression":"lz4","docstore_blocksize":16384},"segments":[],"schema":[{"name":"text","type":"text","options":{"indexing":{"record":"position","fieldnorms":true,"tokenizer":"default"},"stored":false,"fast":false}}],"opstamp":0}"#
405424
);
406425

407426
let deser_meta: UntrackedIndexMeta = serde_json::from_str(&json).unwrap();

src/indexer/segment_serializer.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,10 @@ impl SegmentSerializer {
3939

4040
let postings_serializer = InvertedIndexSerializer::open(&mut segment)?;
4141
let compressor = segment.index().settings().docstore_compression;
42+
let blocksize = segment.index().settings().docstore_blocksize;
4243
Ok(SegmentSerializer {
4344
segment,
44-
store_writer: StoreWriter::new(store_write, compressor),
45+
store_writer: StoreWriter::new(store_write, compressor, blocksize),
4546
fast_field_serializer,
4647
fieldnorms_serializer: Some(fieldnorms_serializer),
4748
postings_serializer,

src/indexer/segment_writer.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -372,9 +372,10 @@ fn remap_and_write(
372372
.segment_mut()
373373
.open_write(SegmentComponent::Store)?;
374374
let compressor = serializer.segment().index().settings().docstore_compression;
375+
let block_size = serializer.segment().index().settings().docstore_blocksize;
375376
let old_store_writer = std::mem::replace(
376377
&mut serializer.store_writer,
377-
StoreWriter::new(store_write, compressor),
378+
StoreWriter::new(store_write, compressor, block_size),
378379
);
379380
old_store_writer.close()?;
380381
let store_read = StoreReader::open(

src/store/compression_zstd_block.rs

+50
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
use std::io;
2+
3+
use zstd::bulk::{compress_to_buffer, decompress_to_buffer};
4+
use zstd::DEFAULT_COMPRESSION_LEVEL;
5+
6+
#[inline]
7+
pub fn compress(uncompressed: &[u8], compressed: &mut Vec<u8>) -> io::Result<()> {
8+
let count_size = std::mem::size_of::<u32>();
9+
let max_size = zstd::zstd_safe::compress_bound(uncompressed.len()) + count_size;
10+
11+
compressed.clear();
12+
compressed.resize(max_size, 0);
13+
14+
let compressed_size = compress_to_buffer(
15+
uncompressed,
16+
&mut compressed[count_size..],
17+
DEFAULT_COMPRESSION_LEVEL,
18+
)?;
19+
20+
compressed[0..count_size].copy_from_slice(&(uncompressed.len() as u32).to_le_bytes());
21+
compressed.resize(compressed_size + count_size, 0);
22+
23+
Ok(())
24+
}
25+
26+
#[inline]
27+
pub fn decompress(compressed: &[u8], decompressed: &mut Vec<u8>) -> io::Result<()> {
28+
let count_size = std::mem::size_of::<u32>();
29+
let uncompressed_size = u32::from_le_bytes(
30+
compressed
31+
.get(..count_size)
32+
.ok_or(io::ErrorKind::InvalidData)?
33+
.try_into()
34+
.unwrap(),
35+
) as usize;
36+
37+
decompressed.clear();
38+
decompressed.resize(uncompressed_size, 0);
39+
40+
let decompressed_size = decompress_to_buffer(&compressed[count_size..], decompressed)?;
41+
42+
if decompressed_size != uncompressed_size {
43+
return Err(io::Error::new(
44+
io::ErrorKind::InvalidData,
45+
"doc store block not completely decompressed, data corruption".to_string(),
46+
));
47+
}
48+
49+
Ok(())
50+
}

src/store/compressors.rs

+27
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ pub enum Compressor {
2626
#[serde(rename = "snappy")]
2727
/// Use the snap compressor
2828
Snappy,
29+
#[serde(rename = "zstd")]
30+
/// Use the zstd compressor
31+
Zstd,
2932
}
3033

3134
impl Default for Compressor {
@@ -36,6 +39,8 @@ impl Default for Compressor {
3639
Compressor::Brotli
3740
} else if cfg!(feature = "snappy-compression") {
3841
Compressor::Snappy
42+
} else if cfg!(feature = "zstd-compression") {
43+
Compressor::Zstd
3944
} else {
4045
Compressor::None
4146
}
@@ -49,6 +54,7 @@ impl Compressor {
4954
1 => Compressor::Lz4,
5055
2 => Compressor::Brotli,
5156
3 => Compressor::Snappy,
57+
4 => Compressor::Zstd,
5258
_ => panic!("unknown compressor id {:?}", id),
5359
}
5460
}
@@ -58,6 +64,7 @@ impl Compressor {
5864
Self::Lz4 => 1,
5965
Self::Brotli => 2,
6066
Self::Snappy => 3,
67+
Self::Zstd => 4,
6168
}
6269
}
6370
#[inline]
@@ -98,6 +105,16 @@ impl Compressor {
98105
panic!("snappy-compression feature flag not activated");
99106
}
100107
}
108+
Self::Zstd => {
109+
#[cfg(feature = "zstd-compression")]
110+
{
111+
super::compression_zstd_block::compress(uncompressed, compressed)
112+
}
113+
#[cfg(not(feature = "zstd-compression"))]
114+
{
115+
panic!("zstd-compression feature flag not activated");
116+
}
117+
}
101118
}
102119
}
103120

@@ -143,6 +160,16 @@ impl Compressor {
143160
panic!("snappy-compression feature flag not activated");
144161
}
145162
}
163+
Self::Zstd => {
164+
#[cfg(feature = "zstd-compression")]
165+
{
166+
super::compression_zstd_block::decompress(compressed, decompressed)
167+
}
168+
#[cfg(not(feature = "zstd-compression"))]
169+
{
170+
panic!("zstd-compression feature flag not activated");
171+
}
172+
}
146173
}
147174
}
148175
}

src/store/mod.rs

+22-8
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@ mod compression_brotli;
5050
#[cfg(feature = "snappy-compression")]
5151
mod compression_snap;
5252

53+
#[cfg(feature = "zstd-compression")]
54+
mod compression_zstd_block;
55+
5356
#[cfg(test)]
5457
pub mod tests {
5558

@@ -69,18 +72,21 @@ pub mod tests {
6972
sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt \
7073
mollit anim id est laborum.";
7174

75+
const BLOCK_SIZE: usize = 16_384;
76+
7277
pub fn write_lorem_ipsum_store(
7378
writer: WritePtr,
7479
num_docs: usize,
7580
compressor: Compressor,
81+
blocksize: usize,
7682
) -> Schema {
7783
let mut schema_builder = Schema::builder();
7884
let field_body = schema_builder.add_text_field("body", TextOptions::default().set_stored());
7985
let field_title =
8086
schema_builder.add_text_field("title", TextOptions::default().set_stored());
8187
let schema = schema_builder.build();
8288
{
83-
let mut store_writer = StoreWriter::new(writer, compressor);
89+
let mut store_writer = StoreWriter::new(writer, compressor, blocksize);
8490
for i in 0..num_docs {
8591
let mut doc = Document::default();
8692
doc.add_field_value(field_body, LOREM.to_string());
@@ -103,7 +109,7 @@ pub mod tests {
103109
let path = Path::new("store");
104110
let directory = RamDirectory::create();
105111
let store_wrt = directory.open_write(path)?;
106-
let schema = write_lorem_ipsum_store(store_wrt, NUM_DOCS, Compressor::Lz4);
112+
let schema = write_lorem_ipsum_store(store_wrt, NUM_DOCS, Compressor::Lz4, BLOCK_SIZE);
107113
let field_title = schema.get_field("title").unwrap();
108114
let store_file = directory.open_read(path)?;
109115
let store = StoreReader::open(store_file)?;
@@ -139,11 +145,11 @@ pub mod tests {
139145
Ok(())
140146
}
141147

142-
fn test_store(compressor: Compressor) -> crate::Result<()> {
148+
fn test_store(compressor: Compressor, blocksize: usize) -> crate::Result<()> {
143149
let path = Path::new("store");
144150
let directory = RamDirectory::create();
145151
let store_wrt = directory.open_write(path)?;
146-
let schema = write_lorem_ipsum_store(store_wrt, NUM_DOCS, compressor);
152+
let schema = write_lorem_ipsum_store(store_wrt, NUM_DOCS, compressor, blocksize);
147153
let field_title = schema.get_field("title").unwrap();
148154
let store_file = directory.open_read(path)?;
149155
let store = StoreReader::open(store_file)?;
@@ -169,22 +175,28 @@ pub mod tests {
169175

170176
#[test]
171177
fn test_store_noop() -> crate::Result<()> {
172-
test_store(Compressor::None)
178+
test_store(Compressor::None, BLOCK_SIZE)
173179
}
174180
#[cfg(feature = "lz4-compression")]
175181
#[test]
176182
fn test_store_lz4_block() -> crate::Result<()> {
177-
test_store(Compressor::Lz4)
183+
test_store(Compressor::Lz4, BLOCK_SIZE)
178184
}
179185
#[cfg(feature = "snappy-compression")]
180186
#[test]
181187
fn test_store_snap() -> crate::Result<()> {
182-
test_store(Compressor::Snappy)
188+
test_store(Compressor::Snappy, BLOCK_SIZE)
183189
}
184190
#[cfg(feature = "brotli-compression")]
185191
#[test]
186192
fn test_store_brotli() -> crate::Result<()> {
187-
test_store(Compressor::Brotli)
193+
test_store(Compressor::Brotli, BLOCK_SIZE)
194+
}
195+
196+
#[cfg(feature = "zstd-compression")]
197+
#[test]
198+
fn test_store_zstd() -> crate::Result<()> {
199+
test_store(Compressor::Zstd, BLOCK_SIZE)
188200
}
189201

190202
#[test]
@@ -348,6 +360,7 @@ mod bench {
348360
directory.open_write(path).unwrap(),
349361
1_000,
350362
Compressor::default(),
363+
16_384,
351364
);
352365
directory.delete(path).unwrap();
353366
});
@@ -361,6 +374,7 @@ mod bench {
361374
directory.open_write(path).unwrap(),
362375
1_000,
363376
Compressor::default(),
377+
16_384,
364378
);
365379
let store_file = directory.open_read(path).unwrap();
366380
let store = StoreReader::open(store_file).unwrap();

src/store/reader.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,8 @@ mod tests {
304304
use crate::store::tests::write_lorem_ipsum_store;
305305
use crate::Directory;
306306

307+
const BLOCK_SIZE: usize = 16_384;
308+
307309
fn get_text_field<'a>(doc: &'a Document, field: &'a Field) -> Option<&'a str> {
308310
doc.get_first(*field).and_then(|f| f.as_text())
309311
}
@@ -313,7 +315,7 @@ mod tests {
313315
let directory = RamDirectory::create();
314316
let path = Path::new("store");
315317
let writer = directory.open_write(path)?;
316-
let schema = write_lorem_ipsum_store(writer, 500, Compressor::default());
318+
let schema = write_lorem_ipsum_store(writer, 500, Compressor::default(), BLOCK_SIZE);
317319
let title = schema.get_field("title").unwrap();
318320
let store_file = directory.open_read(path)?;
319321
let store = StoreReader::open(store_file)?;

src/store/writer.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,6 @@ use crate::schema::Document;
1111
use crate::store::index::Checkpoint;
1212
use crate::DocId;
1313

14-
const BLOCK_SIZE: usize = 16_384;
15-
1614
/// Write tantivy's [`Store`](./index.html)
1715
///
1816
/// Contrary to the other components of `tantivy`,
@@ -22,6 +20,7 @@ const BLOCK_SIZE: usize = 16_384;
2220
/// The skip list index on the other hand, is built in memory.
2321
pub struct StoreWriter {
2422
compressor: Compressor,
23+
block_size: usize,
2524
doc: DocId,
2625
first_doc_in_block: DocId,
2726
offset_index_writer: SkipIndexBuilder,
@@ -35,9 +34,10 @@ impl StoreWriter {
3534
///
3635
/// The store writer will writes blocks on disc as
3736
/// document are added.
38-
pub fn new(writer: WritePtr, compressor: Compressor) -> StoreWriter {
37+
pub fn new(writer: WritePtr, compressor: Compressor, block_size: usize) -> StoreWriter {
3938
StoreWriter {
4039
compressor,
40+
block_size,
4141
doc: 0,
4242
first_doc_in_block: 0,
4343
offset_index_writer: SkipIndexBuilder::new(),
@@ -65,7 +65,7 @@ impl StoreWriter {
6565
VInt(doc_num_bytes as u64).serialize(&mut self.current_block)?;
6666
self.current_block.write_all(serialized_document)?;
6767
self.doc += 1;
68-
if self.current_block.len() > BLOCK_SIZE {
68+
if self.current_block.len() > self.block_size {
6969
self.write_and_compress_block()?;
7070
}
7171
Ok(())
@@ -86,7 +86,7 @@ impl StoreWriter {
8686
self.current_block
8787
.write_all(&self.intermediary_buffer[..])?;
8888
self.doc += 1;
89-
if self.current_block.len() > BLOCK_SIZE {
89+
if self.current_block.len() > self.block_size {
9090
self.write_and_compress_block()?;
9191
}
9292
Ok(())

0 commit comments

Comments
 (0)