Skip to content

Commit f02ff44

Browse files
committed
Allow for a same-thread doc compressor.
In addition, it isolates the doc compressor logic, better reports io::Result. In the case of the same-thread doc compressor, the blocks are also not copied.
1 parent 4d634d6 commit f02ff44

File tree

7 files changed

+383
-163
lines changed

7 files changed

+383
-163
lines changed

src/core/index_meta.rs

+60-1
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,14 @@ impl InnerSegmentMeta {
235235
}
236236
}
237237

238+
fn return_true() -> bool {
239+
true
240+
}
241+
242+
fn is_true(val: &bool) -> bool {
243+
*val
244+
}
245+
238246
/// Search Index Settings.
239247
///
240248
/// Contains settings which are applied on the whole
@@ -248,6 +256,12 @@ pub struct IndexSettings {
248256
/// The `Compressor` used to compress the doc store.
249257
#[serde(default)]
250258
pub docstore_compression: Compressor,
259+
/// If set to true, docstore compression will happen on a dedicated thread.
260+
/// (defaults: true)
261+
#[doc(hidden)]
262+
#[serde(default = "return_true")]
263+
#[serde(skip_serializing_if = "is_true")]
264+
pub docstore_compress_dedicated_thread: bool,
251265
#[serde(default = "default_docstore_blocksize")]
252266
/// The size of each block that will be compressed and written to disk
253267
pub docstore_blocksize: usize,
@@ -264,6 +278,7 @@ impl Default for IndexSettings {
264278
sort_by_field: None,
265279
docstore_compression: Compressor::default(),
266280
docstore_blocksize: default_docstore_blocksize(),
281+
docstore_compress_dedicated_thread: true,
267282
}
268283
}
269284
}
@@ -395,7 +410,7 @@ mod tests {
395410
use super::IndexMeta;
396411
use crate::core::index_meta::UntrackedIndexMeta;
397412
use crate::schema::{Schema, TEXT};
398-
use crate::store::ZstdCompressor;
413+
use crate::store::{Compressor, ZstdCompressor};
399414
use crate::{IndexSettings, IndexSortByField, Order};
400415

401416
#[test]
@@ -447,6 +462,7 @@ mod tests {
447462
compression_level: Some(4),
448463
}),
449464
docstore_blocksize: 1_000_000,
465+
docstore_compress_dedicated_thread: true,
450466
},
451467
segments: Vec::new(),
452468
schema,
@@ -485,4 +501,47 @@ mod tests {
485501
"unknown zstd option \"bla\" at line 1 column 103".to_string()
486502
);
487503
}
504+
505+
#[test]
506+
#[cfg(feature = "lz4-compression")]
507+
fn test_index_settings_default() {
508+
let mut index_settings = IndexSettings::default();
509+
assert_eq!(
510+
index_settings,
511+
IndexSettings {
512+
sort_by_field: None,
513+
docstore_compression: Compressor::default(),
514+
docstore_compress_dedicated_thread: true,
515+
docstore_blocksize: 16_384
516+
}
517+
);
518+
{
519+
let index_settings_json = serde_json::to_value(&index_settings).unwrap();
520+
assert_eq!(
521+
index_settings_json,
522+
serde_json::json!({
523+
"docstore_compression": "lz4",
524+
"docstore_blocksize": 16384
525+
})
526+
);
527+
let index_settings_deser: IndexSettings =
528+
serde_json::from_value(index_settings_json).unwrap();
529+
assert_eq!(index_settings_deser, index_settings);
530+
}
531+
{
532+
index_settings.docstore_compress_dedicated_thread = false;
533+
let index_settings_json = serde_json::to_value(&index_settings).unwrap();
534+
assert_eq!(
535+
index_settings_json,
536+
serde_json::json!({
537+
"docstore_compression": "lz4",
538+
"docstore_blocksize": 16384,
539+
"docstore_compress_dedicated_thread": false,
540+
})
541+
);
542+
let index_settings_deser: IndexSettings =
543+
serde_json::from_value(index_settings_json).unwrap();
544+
assert_eq!(index_settings_deser, index_settings);
545+
}
546+
}
488547
}

src/indexer/segment_serializer.rs

+8-3
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,16 @@ impl SegmentSerializer {
3838
let fieldnorms_serializer = FieldNormsSerializer::from_write(fieldnorms_write)?;
3939

4040
let postings_serializer = InvertedIndexSerializer::open(&mut segment)?;
41-
let compressor = segment.index().settings().docstore_compression;
42-
let blocksize = segment.index().settings().docstore_blocksize;
41+
let settings = segment.index().settings();
42+
let store_writer = StoreWriter::new(
43+
store_write,
44+
settings.docstore_compression,
45+
settings.docstore_blocksize,
46+
settings.docstore_compress_dedicated_thread,
47+
)?;
4348
Ok(SegmentSerializer {
4449
segment,
45-
store_writer: StoreWriter::new(store_write, compressor, blocksize)?,
50+
store_writer,
4651
fast_field_serializer,
4752
fieldnorms_serializer: Some(fieldnorms_serializer),
4853
postings_serializer,

src/indexer/segment_writer.rs

+8-6
Original file line numberDiff line numberDiff line change
@@ -380,12 +380,14 @@ fn remap_and_write(
380380
let store_write = serializer
381381
.segment_mut()
382382
.open_write(SegmentComponent::Store)?;
383-
let compressor = serializer.segment().index().settings().docstore_compression;
384-
let block_size = serializer.segment().index().settings().docstore_blocksize;
385-
let old_store_writer = std::mem::replace(
386-
&mut serializer.store_writer,
387-
StoreWriter::new(store_write, compressor, block_size)?,
388-
);
383+
let settings = serializer.segment().index().settings();
384+
let store_writer = StoreWriter::new(
385+
store_write,
386+
settings.docstore_compression,
387+
settings.docstore_blocksize,
388+
settings.docstore_compress_dedicated_thread,
389+
)?;
390+
let old_store_writer = std::mem::replace(&mut serializer.store_writer, store_writer);
389391
old_store_writer.close()?;
390392
let store_read = StoreReader::open(
391393
serializer

src/store/mod.rs

+27-10
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ pub use self::decompressors::Decompressor;
4343
pub(crate) use self::reader::DOCSTORE_CACHE_CAPACITY;
4444
pub use self::reader::{CacheStats, StoreReader};
4545
pub use self::writer::StoreWriter;
46+
mod store_compressor;
4647

4748
#[cfg(feature = "lz4-compression")]
4849
mod compression_lz4_block;
@@ -82,14 +83,16 @@ pub mod tests {
8283
num_docs: usize,
8384
compressor: Compressor,
8485
blocksize: usize,
86+
separate_thread: bool,
8587
) -> Schema {
8688
let mut schema_builder = Schema::builder();
8789
let field_body = schema_builder.add_text_field("body", TextOptions::default().set_stored());
8890
let field_title =
8991
schema_builder.add_text_field("title", TextOptions::default().set_stored());
9092
let schema = schema_builder.build();
9193
{
92-
let mut store_writer = StoreWriter::new(writer, compressor, blocksize).unwrap();
94+
let mut store_writer =
95+
StoreWriter::new(writer, compressor, blocksize, separate_thread).unwrap();
9396
for i in 0..num_docs {
9497
let mut doc = Document::default();
9598
doc.add_field_value(field_body, LOREM.to_string());
@@ -112,7 +115,8 @@ pub mod tests {
112115
let path = Path::new("store");
113116
let directory = RamDirectory::create();
114117
let store_wrt = directory.open_write(path)?;
115-
let schema = write_lorem_ipsum_store(store_wrt, NUM_DOCS, Compressor::Lz4, BLOCK_SIZE);
118+
let schema =
119+
write_lorem_ipsum_store(store_wrt, NUM_DOCS, Compressor::Lz4, BLOCK_SIZE, true);
116120
let field_title = schema.get_field("title").unwrap();
117121
let store_file = directory.open_read(path)?;
118122
let store = StoreReader::open(store_file, 10)?;
@@ -148,11 +152,16 @@ pub mod tests {
148152
Ok(())
149153
}
150154

151-
fn test_store(compressor: Compressor, blocksize: usize) -> crate::Result<()> {
155+
fn test_store(
156+
compressor: Compressor,
157+
blocksize: usize,
158+
separate_thread: bool,
159+
) -> crate::Result<()> {
152160
let path = Path::new("store");
153161
let directory = RamDirectory::create();
154162
let store_wrt = directory.open_write(path)?;
155-
let schema = write_lorem_ipsum_store(store_wrt, NUM_DOCS, compressor, blocksize);
163+
let schema =
164+
write_lorem_ipsum_store(store_wrt, NUM_DOCS, compressor, blocksize, separate_thread);
156165
let field_title = schema.get_field("title").unwrap();
157166
let store_file = directory.open_read(path)?;
158167
let store = StoreReader::open(store_file, 10)?;
@@ -177,29 +186,35 @@ pub mod tests {
177186
}
178187

179188
#[test]
180-
fn test_store_noop() -> crate::Result<()> {
181-
test_store(Compressor::None, BLOCK_SIZE)
189+
fn test_store_no_compression_same_thread() -> crate::Result<()> {
190+
test_store(Compressor::None, BLOCK_SIZE, false)
191+
}
192+
193+
#[test]
194+
fn test_store_no_compression() -> crate::Result<()> {
195+
test_store(Compressor::None, BLOCK_SIZE, true)
182196
}
197+
183198
#[cfg(feature = "lz4-compression")]
184199
#[test]
185200
fn test_store_lz4_block() -> crate::Result<()> {
186-
test_store(Compressor::Lz4, BLOCK_SIZE)
201+
test_store(Compressor::Lz4, BLOCK_SIZE, true)
187202
}
188203
#[cfg(feature = "snappy-compression")]
189204
#[test]
190205
fn test_store_snap() -> crate::Result<()> {
191-
test_store(Compressor::Snappy, BLOCK_SIZE)
206+
test_store(Compressor::Snappy, BLOCK_SIZE, true)
192207
}
193208
#[cfg(feature = "brotli-compression")]
194209
#[test]
195210
fn test_store_brotli() -> crate::Result<()> {
196-
test_store(Compressor::Brotli, BLOCK_SIZE)
211+
test_store(Compressor::Brotli, BLOCK_SIZE, true)
197212
}
198213

199214
#[cfg(feature = "zstd-compression")]
200215
#[test]
201216
fn test_store_zstd() -> crate::Result<()> {
202-
test_store(Compressor::Zstd(ZstdCompressor::default()), BLOCK_SIZE)
217+
test_store(Compressor::Zstd(ZstdCompressor::default()), BLOCK_SIZE, true)
203218
}
204219

205220
#[test]
@@ -364,6 +379,7 @@ mod bench {
364379
1_000,
365380
Compressor::default(),
366381
16_384,
382+
true
367383
);
368384
directory.delete(path).unwrap();
369385
});
@@ -378,6 +394,7 @@ mod bench {
378394
1_000,
379395
Compressor::default(),
380396
16_384,
397+
true
381398
);
382399
let store_file = directory.open_read(path).unwrap();
383400
let store = StoreReader::open(store_file, 10).unwrap();

src/store/reader.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -393,7 +393,7 @@ mod tests {
393393
let directory = RamDirectory::create();
394394
let path = Path::new("store");
395395
let writer = directory.open_write(path)?;
396-
let schema = write_lorem_ipsum_store(writer, 500, Compressor::default(), BLOCK_SIZE);
396+
let schema = write_lorem_ipsum_store(writer, 500, Compressor::default(), BLOCK_SIZE, true);
397397
let title = schema.get_field("title").unwrap();
398398
let store_file = directory.open_read(path)?;
399399
let store = StoreReader::open(store_file, DOCSTORE_CACHE_CAPACITY)?;

0 commit comments

Comments
 (0)