Skip to content

Commit dbd1ddd

Browse files
committed
Allow for a same-thread doc compressor.
In addition, it isolates the doc compressor logic, better reports io::Result. In the case of the same-thread doc compressor, the blocks are also not copied.
1 parent 4d634d6 commit dbd1ddd

File tree

7 files changed

+359
-160
lines changed

7 files changed

+359
-160
lines changed

src/core/index_meta.rs

+52-1
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,14 @@ impl InnerSegmentMeta {
235235
}
236236
}
237237

238+
fn return_true() -> bool {
239+
true
240+
}
241+
242+
fn is_true(val: &bool) -> bool {
243+
*val
244+
}
245+
238246
/// Search Index Settings.
239247
///
240248
/// Contains settings which are applied on the whole
@@ -248,6 +256,12 @@ pub struct IndexSettings {
248256
/// The `Compressor` used to compress the doc store.
249257
#[serde(default)]
250258
pub docstore_compression: Compressor,
259+
/// If set to true, docstore compression will happen on a dedicated thread.
260+
/// (defaults: true)
261+
#[doc(hidden)]
262+
#[serde(default = "return_true")]
263+
#[serde(skip_serializing_if = "is_true")]
264+
pub docstore_compress_dedicated_thread: bool,
251265
#[serde(default = "default_docstore_blocksize")]
252266
/// The size of each block that will be compressed and written to disk
253267
pub docstore_blocksize: usize,
@@ -264,6 +278,7 @@ impl Default for IndexSettings {
264278
sort_by_field: None,
265279
docstore_compression: Compressor::default(),
266280
docstore_blocksize: default_docstore_blocksize(),
281+
docstore_compress_dedicated_thread: true,
267282
}
268283
}
269284
}
@@ -395,7 +410,7 @@ mod tests {
395410
use super::IndexMeta;
396411
use crate::core::index_meta::UntrackedIndexMeta;
397412
use crate::schema::{Schema, TEXT};
398-
use crate::store::ZstdCompressor;
413+
use crate::store::{ZstdCompressor, Compressor};
399414
use crate::{IndexSettings, IndexSortByField, Order};
400415

401416
#[test]
@@ -447,6 +462,7 @@ mod tests {
447462
compression_level: Some(4),
448463
}),
449464
docstore_blocksize: 1_000_000,
465+
docstore_compress_dedicated_thread: true,
450466
},
451467
segments: Vec::new(),
452468
schema,
@@ -485,4 +501,39 @@ mod tests {
485501
"unknown zstd option \"bla\" at line 1 column 103".to_string()
486502
);
487503
}
504+
505+
#[test]
506+
#[cfg(feature="lz4-compression")]
507+
fn test_index_settings_default() {
508+
let mut index_settings = IndexSettings::default();
509+
assert_eq!(
510+
index_settings,
511+
IndexSettings {
512+
sort_by_field: None,
513+
docstore_compression: Compressor::default(),
514+
docstore_compress_dedicated_thread: true,
515+
docstore_blocksize: 16_384
516+
}
517+
);
518+
{
519+
let index_settings_json = serde_json::to_value(&index_settings).unwrap();
520+
assert_eq!(index_settings_json, serde_json::json!({
521+
"docstore_compression": "lz4",
522+
"docstore_blocksize": 16384
523+
}));
524+
let index_settings_deser: IndexSettings = serde_json::from_value(index_settings_json).unwrap();
525+
assert_eq!(index_settings_deser, index_settings);
526+
}
527+
{
528+
index_settings.docstore_compress_dedicated_thread = false;
529+
let index_settings_json = serde_json::to_value(&index_settings).unwrap();
530+
assert_eq!(index_settings_json, serde_json::json!({
531+
"docstore_compression": "lz4",
532+
"docstore_blocksize": 16384,
533+
"docstore_compress_dedicated_thread": false,
534+
}));
535+
let index_settings_deser: IndexSettings = serde_json::from_value(index_settings_json).unwrap();
536+
assert_eq!(index_settings_deser, index_settings);
537+
}
538+
}
488539
}

src/indexer/segment_serializer.rs

+8-3
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,16 @@ impl SegmentSerializer {
3838
let fieldnorms_serializer = FieldNormsSerializer::from_write(fieldnorms_write)?;
3939

4040
let postings_serializer = InvertedIndexSerializer::open(&mut segment)?;
41-
let compressor = segment.index().settings().docstore_compression;
42-
let blocksize = segment.index().settings().docstore_blocksize;
41+
let settings = segment.index().settings();
42+
let store_writer = StoreWriter::new(
43+
store_write,
44+
settings.docstore_compression,
45+
settings.docstore_blocksize,
46+
settings.docstore_compress_dedicated_thread,
47+
)?;
4348
Ok(SegmentSerializer {
4449
segment,
45-
store_writer: StoreWriter::new(store_write, compressor, blocksize)?,
50+
store_writer,
4651
fast_field_serializer,
4752
fieldnorms_serializer: Some(fieldnorms_serializer),
4853
postings_serializer,

src/indexer/segment_writer.rs

+8-6
Original file line numberDiff line numberDiff line change
@@ -380,12 +380,14 @@ fn remap_and_write(
380380
let store_write = serializer
381381
.segment_mut()
382382
.open_write(SegmentComponent::Store)?;
383-
let compressor = serializer.segment().index().settings().docstore_compression;
384-
let block_size = serializer.segment().index().settings().docstore_blocksize;
385-
let old_store_writer = std::mem::replace(
386-
&mut serializer.store_writer,
387-
StoreWriter::new(store_write, compressor, block_size)?,
388-
);
383+
let settings = serializer.segment().index().settings();
384+
let store_writer = StoreWriter::new(
385+
store_write,
386+
settings.docstore_compression,
387+
settings.docstore_blocksize,
388+
settings.docstore_compress_dedicated_thread,
389+
)?;
390+
let old_store_writer = std::mem::replace(&mut serializer.store_writer, store_writer);
389391
old_store_writer.close()?;
390392
let store_read = StoreReader::open(
391393
serializer

src/store/mod.rs

+15-7
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ pub use self::decompressors::Decompressor;
4343
pub(crate) use self::reader::DOCSTORE_CACHE_CAPACITY;
4444
pub use self::reader::{CacheStats, StoreReader};
4545
pub use self::writer::StoreWriter;
46+
mod store_compressor;
4647

4748
#[cfg(feature = "lz4-compression")]
4849
mod compression_lz4_block;
@@ -82,14 +83,15 @@ pub mod tests {
8283
num_docs: usize,
8384
compressor: Compressor,
8485
blocksize: usize,
86+
separate_thread: bool
8587
) -> Schema {
8688
let mut schema_builder = Schema::builder();
8789
let field_body = schema_builder.add_text_field("body", TextOptions::default().set_stored());
8890
let field_title =
8991
schema_builder.add_text_field("title", TextOptions::default().set_stored());
9092
let schema = schema_builder.build();
9193
{
92-
let mut store_writer = StoreWriter::new(writer, compressor, blocksize).unwrap();
94+
let mut store_writer = StoreWriter::new(writer, compressor, blocksize, separate_thread).unwrap();
9395
for i in 0..num_docs {
9496
let mut doc = Document::default();
9597
doc.add_field_value(field_body, LOREM.to_string());
@@ -112,7 +114,7 @@ pub mod tests {
112114
let path = Path::new("store");
113115
let directory = RamDirectory::create();
114116
let store_wrt = directory.open_write(path)?;
115-
let schema = write_lorem_ipsum_store(store_wrt, NUM_DOCS, Compressor::Lz4, BLOCK_SIZE);
117+
let schema = write_lorem_ipsum_store(store_wrt, NUM_DOCS, Compressor::Lz4, BLOCK_SIZE, true);
116118
let field_title = schema.get_field("title").unwrap();
117119
let store_file = directory.open_read(path)?;
118120
let store = StoreReader::open(store_file, 10)?;
@@ -148,11 +150,11 @@ pub mod tests {
148150
Ok(())
149151
}
150152

151-
fn test_store(compressor: Compressor, blocksize: usize) -> crate::Result<()> {
153+
fn test_store(compressor: Compressor, blocksize: usize, separate_thread: bool) -> crate::Result<()> {
152154
let path = Path::new("store");
153155
let directory = RamDirectory::create();
154156
let store_wrt = directory.open_write(path)?;
155-
let schema = write_lorem_ipsum_store(store_wrt, NUM_DOCS, compressor, blocksize);
157+
let schema = write_lorem_ipsum_store(store_wrt, NUM_DOCS, compressor, blocksize, separate_thread);
156158
let field_title = schema.get_field("title").unwrap();
157159
let store_file = directory.open_read(path)?;
158160
let store = StoreReader::open(store_file, 10)?;
@@ -177,13 +179,19 @@ pub mod tests {
177179
}
178180

179181
#[test]
180-
fn test_store_noop() -> crate::Result<()> {
181-
test_store(Compressor::None, BLOCK_SIZE)
182+
fn test_store_no_compression_same_thread() -> crate::Result<()> {
183+
test_store(Compressor::None, BLOCK_SIZE, false)
182184
}
185+
186+
#[test]
187+
fn test_store_no_compression() -> crate::Result<()> {
188+
test_store(Compressor::None, BLOCK_SIZE, true)
189+
}
190+
183191
#[cfg(feature = "lz4-compression")]
184192
#[test]
185193
fn test_store_lz4_block() -> crate::Result<()> {
186-
test_store(Compressor::Lz4, BLOCK_SIZE)
194+
test_store(Compressor::Lz4, BLOCK_SIZE, true)
187195
}
188196
#[cfg(feature = "snappy-compression")]
189197
#[test]

src/store/reader.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -393,7 +393,7 @@ mod tests {
393393
let directory = RamDirectory::create();
394394
let path = Path::new("store");
395395
let writer = directory.open_write(path)?;
396-
let schema = write_lorem_ipsum_store(writer, 500, Compressor::default(), BLOCK_SIZE);
396+
let schema = write_lorem_ipsum_store(writer, 500, Compressor::default(), BLOCK_SIZE, true);
397397
let title = schema.get_field("title").unwrap();
398398
let store_file = directory.open_read(path)?;
399399
let store = StoreReader::open(store_file, DOCSTORE_CACHE_CAPACITY)?;

0 commit comments

Comments
 (0)