Skip to content

Commit b60d862

Browse files
authored
docid deltas while indexing (#2249)
* docid deltas while indexing storing deltas is especially helpful for repetitive data like logs. In those cases, recording a doc on a term costed 4 bytes instead of 1 byte now. HDFS Indexing 1.1GB Total memory consumption: Before: 760 MB Now: 590 MB * use scan for delta decoding
1 parent 4837c78 commit b60d862

File tree

1 file changed

+39
-38
lines changed

1 file changed

+39
-38
lines changed

src/postings/recorder.rs

+39-38
Original file line numberDiff line numberDiff line change
@@ -82,21 +82,12 @@ pub(crate) trait Recorder: Copy + Default + Send + Sync + 'static {
8282
}
8383

8484
/// Only records the doc ids
85-
#[derive(Clone, Copy)]
85+
#[derive(Clone, Copy, Default)]
8686
pub struct DocIdRecorder {
8787
stack: ExpUnrolledLinkedList,
8888
current_doc: DocId,
8989
}
9090

91-
impl Default for DocIdRecorder {
92-
fn default() -> Self {
93-
DocIdRecorder {
94-
stack: ExpUnrolledLinkedList::default(),
95-
current_doc: u32::MAX,
96-
}
97-
}
98-
}
99-
10091
impl Recorder for DocIdRecorder {
10192
#[inline]
10293
fn current_doc(&self) -> DocId {
@@ -105,8 +96,9 @@ impl Recorder for DocIdRecorder {
10596

10697
#[inline]
10798
fn new_doc(&mut self, doc: DocId, arena: &mut MemoryArena) {
99+
let delta = doc - self.current_doc;
108100
self.current_doc = doc;
109-
self.stack.writer(arena).write_u32_vint(doc);
101+
self.stack.writer(arena).write_u32_vint(delta);
110102
}
111103

112104
#[inline]
@@ -123,21 +115,20 @@ impl Recorder for DocIdRecorder {
123115
buffer_lender: &mut BufferLender,
124116
) {
125117
let (buffer, doc_ids) = buffer_lender.lend_all();
126-
self.stack.read_to_end(arena, buffer);
127118
// TODO avoid reading twice.
119+
self.stack.read_to_end(arena, buffer);
128120
if let Some(doc_id_map) = doc_id_map {
129-
doc_ids.extend(
130-
VInt32Reader::new(&buffer[..])
131-
.map(|old_doc_id| doc_id_map.get_new_doc_id(old_doc_id)),
132-
);
121+
let iter = get_sum_reader(VInt32Reader::new(&buffer[..]));
122+
doc_ids.extend(iter.map(|old_doc_id| doc_id_map.get_new_doc_id(old_doc_id)));
133123
doc_ids.sort_unstable();
134124

135125
for doc in doc_ids {
136126
serializer.write_doc(*doc, 0u32, &[][..]);
137127
}
138128
} else {
139-
for doc in VInt32Reader::new(&buffer[..]) {
140-
serializer.write_doc(doc, 0u32, &[][..]);
129+
let iter = get_sum_reader(VInt32Reader::new(&buffer[..]));
130+
for doc_id in iter {
131+
serializer.write_doc(doc_id, 0u32, &[][..]);
141132
}
142133
}
143134
}
@@ -147,6 +138,15 @@ impl Recorder for DocIdRecorder {
147138
}
148139
}
149140

141+
/// Takes an Iterator of delta encoded elements and returns an iterator
142+
/// that yields the sum of the elements.
143+
fn get_sum_reader(iter: impl Iterator<Item = u32>) -> impl Iterator<Item = u32> {
144+
iter.scan(0, |state, delta| {
145+
*state += delta;
146+
Some(*state)
147+
})
148+
}
149+
150150
/// Recorder encoding document ids, and term frequencies
151151
#[derive(Clone, Copy, Default)]
152152
pub struct TermFrequencyRecorder {
@@ -164,9 +164,10 @@ impl Recorder for TermFrequencyRecorder {
164164

165165
#[inline]
166166
fn new_doc(&mut self, doc: DocId, arena: &mut MemoryArena) {
167+
let delta = doc - self.current_doc;
167168
self.term_doc_freq += 1;
168169
self.current_doc = doc;
169-
self.stack.writer(arena).write_u32_vint(doc);
170+
self.stack.writer(arena).write_u32_vint(delta);
170171
}
171172

172173
#[inline]
@@ -193,19 +194,25 @@ impl Recorder for TermFrequencyRecorder {
193194
let mut u32_it = VInt32Reader::new(&buffer[..]);
194195
if let Some(doc_id_map) = doc_id_map {
195196
let mut doc_id_and_tf = vec![];
196-
while let Some(old_doc_id) = u32_it.next() {
197+
let mut prev_doc = 0;
198+
while let Some(delta_doc_id) = u32_it.next() {
199+
let doc_id = prev_doc + delta_doc_id;
200+
prev_doc = doc_id;
197201
let term_freq = u32_it.next().unwrap_or(self.current_tf);
198-
doc_id_and_tf.push((doc_id_map.get_new_doc_id(old_doc_id), term_freq));
202+
doc_id_and_tf.push((doc_id_map.get_new_doc_id(doc_id), term_freq));
199203
}
200204
doc_id_and_tf.sort_unstable_by_key(|&(doc_id, _)| doc_id);
201205

202206
for (doc_id, tf) in doc_id_and_tf {
203207
serializer.write_doc(doc_id, tf, &[][..]);
204208
}
205209
} else {
206-
while let Some(doc) = u32_it.next() {
210+
let mut prev_doc = 0;
211+
while let Some(delta_doc_id) = u32_it.next() {
212+
let doc_id = prev_doc + delta_doc_id;
213+
prev_doc = doc_id;
207214
let term_freq = u32_it.next().unwrap_or(self.current_tf);
208-
serializer.write_doc(doc, term_freq, &[][..]);
215+
serializer.write_doc(doc_id, term_freq, &[][..]);
209216
}
210217
}
211218
}
@@ -216,23 +223,13 @@ impl Recorder for TermFrequencyRecorder {
216223
}
217224

218225
/// Recorder encoding term frequencies as well as positions.
219-
#[derive(Clone, Copy)]
226+
#[derive(Clone, Copy, Default)]
220227
pub struct TfAndPositionRecorder {
221228
stack: ExpUnrolledLinkedList,
222229
current_doc: DocId,
223230
term_doc_freq: u32,
224231
}
225232

226-
impl Default for TfAndPositionRecorder {
227-
fn default() -> Self {
228-
TfAndPositionRecorder {
229-
stack: ExpUnrolledLinkedList::default(),
230-
current_doc: u32::MAX,
231-
term_doc_freq: 0u32,
232-
}
233-
}
234-
}
235-
236233
impl Recorder for TfAndPositionRecorder {
237234
#[inline]
238235
fn current_doc(&self) -> DocId {
@@ -241,9 +238,10 @@ impl Recorder for TfAndPositionRecorder {
241238

242239
#[inline]
243240
fn new_doc(&mut self, doc: DocId, arena: &mut MemoryArena) {
241+
let delta = doc - self.current_doc;
244242
self.current_doc = doc;
245243
self.term_doc_freq += 1u32;
246-
self.stack.writer(arena).write_u32_vint(doc);
244+
self.stack.writer(arena).write_u32_vint(delta);
247245
}
248246

249247
#[inline]
@@ -269,7 +267,10 @@ impl Recorder for TfAndPositionRecorder {
269267
self.stack.read_to_end(arena, buffer_u8);
270268
let mut u32_it = VInt32Reader::new(&buffer_u8[..]);
271269
let mut doc_id_and_positions = vec![];
272-
while let Some(doc) = u32_it.next() {
270+
let mut prev_doc = 0;
271+
while let Some(delta_doc_id) = u32_it.next() {
272+
let doc_id = prev_doc + delta_doc_id;
273+
prev_doc = doc_id;
273274
let mut prev_position_plus_one = 1u32;
274275
buffer_positions.clear();
275276
loop {
@@ -287,9 +288,9 @@ impl Recorder for TfAndPositionRecorder {
287288
if let Some(doc_id_map) = doc_id_map {
288289
// this simple variant to remap may consume to much memory
289290
doc_id_and_positions
290-
.push((doc_id_map.get_new_doc_id(doc), buffer_positions.to_vec()));
291+
.push((doc_id_map.get_new_doc_id(doc_id), buffer_positions.to_vec()));
291292
} else {
292-
serializer.write_doc(doc, buffer_positions.len() as u32, buffer_positions);
293+
serializer.write_doc(doc_id, buffer_positions.len() as u32, buffer_positions);
293294
}
294295
}
295296
if doc_id_map.is_some() {

0 commit comments

Comments
 (0)