Skip to content

Commit 927b443

Browse files
authored
Perf: use term hashmap in fastfield (#2243)
* add shared arena hashmap * bench fastfield indexing * use shared arena hashmap in columnar lower minimum resize in hashtable * clippy * add comments
1 parent 7a0064d commit 927b443

File tree

8 files changed

+543
-294
lines changed

8 files changed

+543
-294
lines changed

Cargo.toml

+2-2
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,8 @@ more-asserts = "0.3.1"
8282
rand_distr = "0.4.3"
8383

8484
[target.'cfg(not(windows))'.dev-dependencies]
85-
criterion = "0.5"
86-
pprof = { git = "https://github.com/PSeitz/pprof-rs/", rev = "53af24b", features = ["flamegraph", "criterion"] } # temp fork that works with criterion 0.5
85+
criterion = { version = "0.5" }
86+
pprof = { version= "0.13", features = ["flamegraph", "criterion"] }
8787

8888
[dev-dependencies.fail]
8989
version = "0.5.0"

benches/index-bench.rs

+64-3
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use criterion::{criterion_group, criterion_main, Criterion, Throughput};
22
use pprof::criterion::{Output, PProfProfiler};
33
use tantivy::schema::{TantivyDocument, FAST, INDEXED, STORED, STRING, TEXT};
4-
use tantivy::{Index, IndexWriter};
4+
use tantivy::{tokenizer, Index, IndexWriter};
55

66
const HDFS_LOGS: &str = include_str!("hdfs.json");
77
const GH_LOGS: &str = include_str!("gh.json");
@@ -19,6 +19,13 @@ pub fn hdfs_index_benchmark(c: &mut Criterion) {
1919
schema_builder.add_text_field("severity", STRING);
2020
schema_builder.build()
2121
};
22+
let schema_only_fast = {
23+
let mut schema_builder = tantivy::schema::SchemaBuilder::new();
24+
schema_builder.add_u64_field("timestamp", FAST);
25+
schema_builder.add_text_field("body", FAST);
26+
schema_builder.add_text_field("severity", FAST);
27+
schema_builder.build()
28+
};
2229
let schema_with_store = {
2330
let mut schema_builder = tantivy::schema::SchemaBuilder::new();
2431
schema_builder.add_u64_field("timestamp", INDEXED | STORED);
@@ -83,6 +90,30 @@ pub fn hdfs_index_benchmark(c: &mut Criterion) {
8390
index_writer.commit().unwrap();
8491
})
8592
});
93+
group.bench_function("index-hdfs-no-commit-fastfield", |b| {
94+
let lines = get_lines(HDFS_LOGS);
95+
b.iter(|| {
96+
let index = Index::create_in_ram(schema_only_fast.clone());
97+
let index_writer: IndexWriter = index.writer_with_num_threads(1, 100_000_000).unwrap();
98+
for doc_json in &lines {
99+
let doc = TantivyDocument::parse_json(&schema, doc_json).unwrap();
100+
index_writer.add_document(doc).unwrap();
101+
}
102+
})
103+
});
104+
group.bench_function("index-hdfs-with-commit-fastfield", |b| {
105+
let lines = get_lines(HDFS_LOGS);
106+
b.iter(|| {
107+
let index = Index::create_in_ram(schema_only_fast.clone());
108+
let mut index_writer: IndexWriter =
109+
index.writer_with_num_threads(1, 100_000_000).unwrap();
110+
for doc_json in &lines {
111+
let doc = TantivyDocument::parse_json(&schema, doc_json).unwrap();
112+
index_writer.add_document(doc).unwrap();
113+
}
114+
index_writer.commit().unwrap();
115+
})
116+
});
86117
group.bench_function("index-hdfs-no-commit-json-without-docstore", |b| {
87118
let lines = get_lines(HDFS_LOGS);
88119
b.iter(|| {
@@ -107,6 +138,18 @@ pub fn gh_index_benchmark(c: &mut Criterion) {
107138
schema_builder.add_json_field("json", TEXT | FAST);
108139
schema_builder.build()
109140
};
141+
let dynamic_schema_fast = {
142+
let mut schema_builder = tantivy::schema::SchemaBuilder::new();
143+
schema_builder.add_json_field("json", FAST);
144+
schema_builder.build()
145+
};
146+
let ff_tokenizer_manager = tokenizer::TokenizerManager::default();
147+
ff_tokenizer_manager.register(
148+
"raw",
149+
tokenizer::TextAnalyzer::builder(tokenizer::RawTokenizer::default())
150+
.filter(tokenizer::RemoveLongFilter::limit(255))
151+
.build(),
152+
);
110153

111154
let mut group = c.benchmark_group("index-gh");
112155
group.throughput(Throughput::Bytes(GH_LOGS.len() as u64));
@@ -115,7 +158,23 @@ pub fn gh_index_benchmark(c: &mut Criterion) {
115158
let lines = get_lines(GH_LOGS);
116159
b.iter(|| {
117160
let json_field = dynamic_schema.get_field("json").unwrap();
118-
let index = Index::create_in_ram(dynamic_schema.clone());
161+
let mut index = Index::create_in_ram(dynamic_schema.clone());
162+
index.set_fast_field_tokenizers(ff_tokenizer_manager.clone());
163+
let index_writer: IndexWriter = index.writer_with_num_threads(1, 100_000_000).unwrap();
164+
for doc_json in &lines {
165+
let json_val: serde_json::Map<String, serde_json::Value> =
166+
serde_json::from_str(doc_json).unwrap();
167+
let doc = tantivy::doc!(json_field=>json_val);
168+
index_writer.add_document(doc).unwrap();
169+
}
170+
})
171+
});
172+
group.bench_function("index-gh-fast", |b| {
173+
let lines = get_lines(GH_LOGS);
174+
b.iter(|| {
175+
let json_field = dynamic_schema_fast.get_field("json").unwrap();
176+
let mut index = Index::create_in_ram(dynamic_schema_fast.clone());
177+
index.set_fast_field_tokenizers(ff_tokenizer_manager.clone());
119178
let index_writer: IndexWriter = index.writer_with_num_threads(1, 100_000_000).unwrap();
120179
for doc_json in &lines {
121180
let json_val: serde_json::Map<String, serde_json::Value> =
@@ -125,11 +184,13 @@ pub fn gh_index_benchmark(c: &mut Criterion) {
125184
}
126185
})
127186
});
187+
128188
group.bench_function("index-gh-with-commit", |b| {
129189
let lines = get_lines(GH_LOGS);
130190
b.iter(|| {
131191
let json_field = dynamic_schema.get_field("json").unwrap();
132-
let index = Index::create_in_ram(dynamic_schema.clone());
192+
let mut index = Index::create_in_ram(dynamic_schema.clone());
193+
index.set_fast_field_tokenizers(ff_tokenizer_manager.clone());
133194
let mut index_writer: IndexWriter =
134195
index.writer_with_num_threads(1, 100_000_000).unwrap();
135196
for doc_json in &lines {

columnar/src/columnar/writer/column_writers.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,8 @@ impl StrOrBytesColumnWriter {
269269
dictionaries: &mut [DictionaryBuilder],
270270
arena: &mut MemoryArena,
271271
) {
272-
let unordered_id = dictionaries[self.dictionary_id as usize].get_or_allocate_id(bytes);
272+
let unordered_id =
273+
dictionaries[self.dictionary_id as usize].get_or_allocate_id(bytes, arena);
273274
self.column_writer.record(doc, unordered_id, arena);
274275
}
275276

columnar/src/columnar/writer/mod.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -437,6 +437,7 @@ impl ColumnarWriter {
437437
&mut symbol_byte_buffer,
438438
),
439439
buffers,
440+
&self.arena,
440441
&mut column_serializer,
441442
)?;
442443
column_serializer.finalize()?;
@@ -490,13 +491,15 @@ impl ColumnarWriter {
490491

491492
// Serialize [Dictionary, Column, dictionary num bytes U32::LE]
492493
// Column: [Column Index, Column Values, column index num bytes U32::LE]
494+
#[allow(clippy::too_many_arguments)]
493495
fn serialize_bytes_or_str_column(
494496
cardinality: Cardinality,
495497
num_docs: RowId,
496498
sort_values_within_row: bool,
497499
dictionary_builder: &DictionaryBuilder,
498500
operation_it: impl Iterator<Item = ColumnOperation<UnorderedId>>,
499501
buffers: &mut SpareBuffers,
502+
arena: &MemoryArena,
500503
wrt: impl io::Write,
501504
) -> io::Result<()> {
502505
let SpareBuffers {
@@ -505,7 +508,8 @@ fn serialize_bytes_or_str_column(
505508
..
506509
} = buffers;
507510
let mut counting_writer = CountingWriter::wrap(wrt);
508-
let term_id_mapping: TermIdMapping = dictionary_builder.serialize(&mut counting_writer)?;
511+
let term_id_mapping: TermIdMapping =
512+
dictionary_builder.serialize(arena, &mut counting_writer)?;
509513
let dictionary_num_bytes: u32 = counting_writer.written_bytes() as u32;
510514
let mut wrt = counting_writer.finish();
511515
let operation_iterator = operation_it.map(|symbol: ColumnOperation<UnorderedId>| {

columnar/src/dictionary.rs

+30-20
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::io;
22

3-
use fnv::FnvHashMap;
43
use sstable::SSTable;
4+
use stacker::{MemoryArena, SharedArenaHashMap};
55

66
pub(crate) struct TermIdMapping {
77
unordered_to_ord: Vec<OrderedId>,
@@ -31,29 +31,38 @@ pub struct OrderedId(pub u32);
3131
/// mapping.
3232
#[derive(Default)]
3333
pub(crate) struct DictionaryBuilder {
34-
dict: FnvHashMap<Vec<u8>, UnorderedId>,
35-
memory_consumption: usize,
34+
dict: SharedArenaHashMap,
3635
}
3736

3837
impl DictionaryBuilder {
3938
/// Get or allocate an unordered id.
4039
/// (This ID is simply an auto-incremented id.)
41-
pub fn get_or_allocate_id(&mut self, term: &[u8]) -> UnorderedId {
42-
if let Some(term_id) = self.dict.get(term) {
43-
return *term_id;
44-
}
45-
let new_id = UnorderedId(self.dict.len() as u32);
46-
self.dict.insert(term.to_vec(), new_id);
47-
self.memory_consumption += term.len();
48-
self.memory_consumption += 40; // Term Metadata + HashMap overhead
49-
new_id
40+
pub fn get_or_allocate_id(&mut self, term: &[u8], arena: &mut MemoryArena) -> UnorderedId {
41+
let next_id = self.dict.len() as u32;
42+
let unordered_id = self
43+
.dict
44+
.mutate_or_create(term, arena, |unordered_id: Option<u32>| {
45+
if let Some(unordered_id) = unordered_id {
46+
unordered_id
47+
} else {
48+
next_id
49+
}
50+
});
51+
UnorderedId(unordered_id)
5052
}
5153

5254
/// Serialize the dictionary into an fst, and returns the
5355
/// `UnorderedId -> TermOrdinal` map.
54-
pub fn serialize<'a, W: io::Write + 'a>(&self, wrt: &mut W) -> io::Result<TermIdMapping> {
55-
let mut terms: Vec<(&[u8], UnorderedId)> =
56-
self.dict.iter().map(|(k, v)| (k.as_slice(), *v)).collect();
56+
pub fn serialize<'a, W: io::Write + 'a>(
57+
&self,
58+
arena: &MemoryArena,
59+
wrt: &mut W,
60+
) -> io::Result<TermIdMapping> {
61+
let mut terms: Vec<(&[u8], UnorderedId)> = self
62+
.dict
63+
.iter(arena)
64+
.map(|(k, v)| (k, arena.read(v)))
65+
.collect();
5766
terms.sort_unstable_by_key(|(key, _)| *key);
5867
// TODO Remove the allocation.
5968
let mut unordered_to_ord: Vec<OrderedId> = vec![OrderedId(0u32); terms.len()];
@@ -68,7 +77,7 @@ impl DictionaryBuilder {
6877
}
6978

7079
pub(crate) fn mem_usage(&self) -> usize {
71-
self.memory_consumption
80+
self.dict.mem_usage()
7281
}
7382
}
7483

@@ -78,12 +87,13 @@ mod tests {
7887

7988
#[test]
8089
fn test_dictionary_builder() {
90+
let mut arena = MemoryArena::default();
8191
let mut dictionary_builder = DictionaryBuilder::default();
82-
let hello_uid = dictionary_builder.get_or_allocate_id(b"hello");
83-
let happy_uid = dictionary_builder.get_or_allocate_id(b"happy");
84-
let tax_uid = dictionary_builder.get_or_allocate_id(b"tax");
92+
let hello_uid = dictionary_builder.get_or_allocate_id(b"hello", &mut arena);
93+
let happy_uid = dictionary_builder.get_or_allocate_id(b"happy", &mut arena);
94+
let tax_uid = dictionary_builder.get_or_allocate_id(b"tax", &mut arena);
8595
let mut buffer = Vec::new();
86-
let id_mapping = dictionary_builder.serialize(&mut buffer).unwrap();
96+
let id_mapping = dictionary_builder.serialize(&arena, &mut buffer).unwrap();
8797
assert_eq!(id_mapping.to_ord(hello_uid), OrderedId(1));
8898
assert_eq!(id_mapping.to_ord(happy_uid), OrderedId(0));
8999
assert_eq!(id_mapping.to_ord(tax_uid), OrderedId(2));

0 commit comments

Comments
 (0)