Skip to content

Commit 6a7a110

Browse files
authored
work in batches of docs (#1937)
* work in batches of docs * add fill_buffer test
1 parent 9e2faec commit 6a7a110

File tree

15 files changed

+151
-106
lines changed

15 files changed

+151
-106
lines changed

columnar/src/column_values/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ pub trait ColumnValues<T: PartialOrd = u64>: Send + Sync {
7272
let cutoff = indexes.len() - indexes.len() % step_size;
7373

7474
for idx in cutoff..indexes.len() {
75-
output[idx] = self.get_val(indexes[idx] as u32);
75+
output[idx] = self.get_val(indexes[idx]);
7676
}
7777
}
7878

src/aggregation/bucket/term_agg.rs

+9-56
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ use crate::TantivyError;
5353
/// into segment_size.
5454
///
5555
/// Result type is [`BucketResult`](crate::aggregation::agg_result::BucketResult) with
56-
/// [`TermBucketEntry`](crate::aggregation::agg_result::BucketEntry) on the
56+
/// [`BucketEntry`](crate::aggregation::agg_result::BucketEntry) on the
5757
/// `AggregationCollector`.
5858
///
5959
/// Result type is
@@ -209,45 +209,6 @@ struct TermBuckets {
209209
pub(crate) sub_aggs: FxHashMap<u64, Box<dyn SegmentAggregationCollector>>,
210210
}
211211

212-
#[derive(Clone, Default)]
213-
struct TermBucketEntry {
214-
doc_count: u64,
215-
sub_aggregations: Option<Box<dyn SegmentAggregationCollector>>,
216-
}
217-
218-
impl Debug for TermBucketEntry {
219-
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
220-
f.debug_struct("TermBucketEntry")
221-
.field("doc_count", &self.doc_count)
222-
.finish()
223-
}
224-
}
225-
226-
impl TermBucketEntry {
227-
fn from_blueprint(blueprint: &Option<Box<dyn SegmentAggregationCollector>>) -> Self {
228-
Self {
229-
doc_count: 0,
230-
sub_aggregations: blueprint.clone(),
231-
}
232-
}
233-
234-
pub(crate) fn into_intermediate_bucket_entry(
235-
self,
236-
agg_with_accessor: &AggregationsWithAccessor,
237-
) -> crate::Result<IntermediateTermBucketEntry> {
238-
let sub_aggregation = if let Some(sub_aggregation) = self.sub_aggregations {
239-
sub_aggregation.into_intermediate_aggregations_result(agg_with_accessor)?
240-
} else {
241-
Default::default()
242-
};
243-
244-
Ok(IntermediateTermBucketEntry {
245-
doc_count: self.doc_count,
246-
sub_aggregation,
247-
})
248-
}
249-
}
250-
251212
impl TermBuckets {
252213
fn force_flush(&mut self, agg_with_accessor: &AggregationsWithAccessor) -> crate::Result<()> {
253214
for sub_aggregations in &mut self.sub_aggs.values_mut() {
@@ -314,7 +275,7 @@ impl SegmentAggregationCollector for SegmentTermCollector {
314275
if accessor.get_cardinality() == Cardinality::Full {
315276
self.val_cache.resize(docs.len(), 0);
316277
accessor.values.get_vals(docs, &mut self.val_cache);
317-
for (doc, term_id) in docs.iter().zip(self.val_cache.iter().cloned()) {
278+
for term_id in self.val_cache.iter().cloned() {
318279
let entry = self.term_buckets.entries.entry(term_id).or_default();
319280
*entry += 1;
320281
}
@@ -445,17 +406,19 @@ impl SegmentTermCollector {
445406

446407
let mut into_intermediate_bucket_entry =
447408
|id, doc_count| -> crate::Result<IntermediateTermBucketEntry> {
448-
let intermediate_entry = if let Some(blueprint) = self.blueprint.as_ref() {
409+
let intermediate_entry = if self.blueprint.as_ref().is_some() {
449410
IntermediateTermBucketEntry {
450411
doc_count,
451412
sub_aggregation: self
452413
.term_buckets
453414
.sub_aggs
454415
.remove(&id)
455-
.expect(&format!(
456-
"Internal Error: could not find subaggregation for id {}",
457-
id
458-
))
416+
.unwrap_or_else(|| {
417+
panic!(
418+
"Internal Error: could not find subaggregation for id {}",
419+
id
420+
)
421+
})
459422
.into_intermediate_aggregations_result(
460423
&agg_with_accessor.sub_aggregation,
461424
)?,
@@ -525,21 +488,11 @@ impl SegmentTermCollector {
525488
pub(crate) trait GetDocCount {
526489
fn doc_count(&self) -> u64;
527490
}
528-
impl GetDocCount for (u32, TermBucketEntry) {
529-
fn doc_count(&self) -> u64 {
530-
self.1.doc_count
531-
}
532-
}
533491
impl GetDocCount for (u64, u64) {
534492
fn doc_count(&self) -> u64 {
535493
self.1
536494
}
537495
}
538-
impl GetDocCount for (u64, TermBucketEntry) {
539-
fn doc_count(&self) -> u64 {
540-
self.1.doc_count
541-
}
542-
}
543496
impl GetDocCount for (String, IntermediateTermBucketEntry) {
544497
fn doc_count(&self) -> u64 {
545498
self.1.doc_count

src/aggregation/buf_collector.rs

+2-3
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,8 @@ impl SegmentAggregationCollector for BufAggregationCollector {
6464
docs: &[crate::DocId],
6565
agg_with_accessor: &AggregationsWithAccessor,
6666
) -> crate::Result<()> {
67-
for doc in docs {
68-
self.collect(*doc, agg_with_accessor)?;
69-
}
67+
self.collector.collect_block(docs, agg_with_accessor)?;
68+
7069
Ok(())
7170
}
7271

src/aggregation/collector.rs

+22-7
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use super::segment_agg_result::{
88
};
99
use crate::aggregation::agg_req_with_accessor::get_aggs_with_accessor_and_validate;
1010
use crate::collector::{Collector, SegmentCollector};
11-
use crate::{SegmentReader, TantivyError};
11+
use crate::{DocId, SegmentReader, TantivyError};
1212

1313
/// The default max bucket count, before the aggregation fails.
1414
pub const DEFAULT_BUCKET_LIMIT: u32 = 65000;
@@ -125,7 +125,7 @@ fn merge_fruits(
125125
/// `AggregationSegmentCollector` does the aggregation collection on a segment.
126126
pub struct AggregationSegmentCollector {
127127
aggs_with_accessor: AggregationsWithAccessor,
128-
result: BufAggregationCollector,
128+
agg_collector: BufAggregationCollector,
129129
error: Option<TantivyError>,
130130
}
131131

@@ -142,7 +142,7 @@ impl AggregationSegmentCollector {
142142
BufAggregationCollector::new(build_segment_agg_collector(&aggs_with_accessor)?);
143143
Ok(AggregationSegmentCollector {
144144
aggs_with_accessor,
145-
result,
145+
agg_collector: result,
146146
error: None,
147147
})
148148
}
@@ -152,11 +152,26 @@ impl SegmentCollector for AggregationSegmentCollector {
152152
type Fruit = crate::Result<IntermediateAggregationResults>;
153153

154154
#[inline]
155-
fn collect(&mut self, doc: crate::DocId, _score: crate::Score) {
155+
fn collect(&mut self, doc: DocId, _score: crate::Score) {
156156
if self.error.is_some() {
157157
return;
158158
}
159-
if let Err(err) = self.result.collect(doc, &self.aggs_with_accessor) {
159+
if let Err(err) = self.agg_collector.collect(doc, &self.aggs_with_accessor) {
160+
self.error = Some(err);
161+
}
162+
}
163+
164+
/// The query pushes the documents to the collector via this method.
165+
///
166+
/// Only valid for Collectors that ignore docs
167+
fn collect_block(&mut self, docs: &[DocId]) {
168+
if self.error.is_some() {
169+
return;
170+
}
171+
if let Err(err) = self
172+
.agg_collector
173+
.collect_block(docs, &self.aggs_with_accessor)
174+
{
160175
self.error = Some(err);
161176
}
162177
}
@@ -165,7 +180,7 @@ impl SegmentCollector for AggregationSegmentCollector {
165180
if let Some(err) = self.error {
166181
return Err(err);
167182
}
168-
self.result.flush(&self.aggs_with_accessor)?;
169-
Box::new(self.result).into_intermediate_aggregations_result(&self.aggs_with_accessor)
183+
self.agg_collector.flush(&self.aggs_with_accessor)?;
184+
Box::new(self.agg_collector).into_intermediate_aggregations_result(&self.aggs_with_accessor)
170185
}
171186
}

src/collector/mod.rs

+14-5
Original file line numberDiff line numberDiff line change
@@ -180,9 +180,11 @@ pub trait Collector: Sync + Send {
180180
})?;
181181
}
182182
(Some(alive_bitset), false) => {
183-
weight.for_each_no_score(reader, &mut |doc| {
184-
if alive_bitset.is_alive(doc) {
185-
segment_collector.collect(doc, 0.0);
183+
weight.for_each_no_score(reader, &mut |docs| {
184+
for doc in docs.iter().cloned() {
185+
if alive_bitset.is_alive(doc) {
186+
segment_collector.collect(doc, 0.0);
187+
}
186188
}
187189
})?;
188190
}
@@ -192,8 +194,8 @@ pub trait Collector: Sync + Send {
192194
})?;
193195
}
194196
(None, false) => {
195-
weight.for_each_no_score(reader, &mut |doc| {
196-
segment_collector.collect(doc, 0.0);
197+
weight.for_each_no_score(reader, &mut |docs| {
198+
segment_collector.collect_block(docs);
197199
})?;
198200
}
199201
}
@@ -270,6 +272,13 @@ pub trait SegmentCollector: 'static {
270272
/// The query pushes the scored document to the collector via this method.
271273
fn collect(&mut self, doc: DocId, score: Score);
272274

275+
/// The query pushes the scored document to the collector via this method.
276+
fn collect_block(&mut self, docs: &[DocId]) {
277+
for doc in docs {
278+
self.collect(*doc, 0.0);
279+
}
280+
}
281+
273282
/// Extract the fruit of the collection from the `SegmentCollector`.
274283
fn harvest(self) -> Self::Fruit;
275284
}

src/docset.rs

+8-1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ use crate::DocId;
99
/// to compare `[u32; 4]`.
1010
pub const TERMINATED: DocId = i32::MAX as u32;
1111

12+
pub const BUFFER_LEN: usize = 64;
13+
1214
/// Represents an iterable set of sorted doc ids.
1315
pub trait DocSet: Send {
1416
/// Goes to the next element.
@@ -59,7 +61,7 @@ pub trait DocSet: Send {
5961
/// This method is only here for specific high-performance
6062
/// use case where batching. The normal way to
6163
/// go through the `DocId`'s is to call `.advance()`.
62-
fn fill_buffer(&mut self, buffer: &mut [DocId]) -> usize {
64+
fn fill_buffer(&mut self, buffer: &mut [DocId; BUFFER_LEN]) -> usize {
6365
if self.doc() == TERMINATED {
6466
return 0;
6567
}
@@ -149,6 +151,11 @@ impl<TDocSet: DocSet + ?Sized> DocSet for Box<TDocSet> {
149151
unboxed.seek(target)
150152
}
151153

154+
fn fill_buffer(&mut self, buffer: &mut [DocId; BUFFER_LEN]) -> usize {
155+
let unboxed: &mut TDocSet = self.borrow_mut();
156+
unboxed.fill_buffer(buffer)
157+
}
158+
152159
fn doc(&self) -> DocId {
153160
let unboxed: &TDocSet = self.borrow();
154161
unboxed.doc()

src/indexer/index_writer.rs

+6-4
Original file line numberDiff line numberDiff line change
@@ -94,10 +94,12 @@ fn compute_deleted_bitset(
9494
// document that were inserted before it.
9595
delete_op
9696
.target
97-
.for_each_no_score(segment_reader, &mut |doc_matching_delete_query| {
98-
if doc_opstamps.is_deleted(doc_matching_delete_query, delete_op.opstamp) {
99-
alive_bitset.remove(doc_matching_delete_query);
100-
might_have_changed = true;
97+
.for_each_no_score(segment_reader, &mut |docs_matching_delete_query| {
98+
for doc_matching_delete_query in docs_matching_delete_query.iter().cloned() {
99+
if doc_opstamps.is_deleted(doc_matching_delete_query, delete_op.opstamp) {
100+
alive_bitset.remove(doc_matching_delete_query);
101+
might_have_changed = true;
102+
}
101103
}
102104
})?;
103105
delete_cursor.advance();

src/query/all_query.rs

+46-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::core::SegmentReader;
2-
use crate::docset::{DocSet, TERMINATED};
2+
use crate::docset::{DocSet, BUFFER_LEN, TERMINATED};
33
use crate::query::boost_query::BoostScorer;
44
use crate::query::explanation::does_not_match;
55
use crate::query::{EnableScoring, Explanation, Query, Scorer, Weight};
@@ -44,6 +44,7 @@ pub struct AllScorer {
4444
}
4545

4646
impl DocSet for AllScorer {
47+
#[inline(always)]
4748
fn advance(&mut self) -> DocId {
4849
if self.doc + 1 >= self.max_doc {
4950
self.doc = TERMINATED;
@@ -53,6 +54,30 @@ impl DocSet for AllScorer {
5354
self.doc
5455
}
5556

57+
fn fill_buffer(&mut self, buffer: &mut [DocId; BUFFER_LEN]) -> usize {
58+
if self.doc() == TERMINATED {
59+
return 0;
60+
}
61+
let is_safe_distance = self.doc() + (buffer.len() as u32) < self.max_doc;
62+
if is_safe_distance {
63+
let num_items = buffer.len();
64+
for buffer_val in buffer {
65+
*buffer_val = self.doc();
66+
self.doc += 1;
67+
}
68+
num_items
69+
} else {
70+
for (i, buffer_val) in buffer.iter_mut().enumerate() {
71+
*buffer_val = self.doc();
72+
if self.advance() == TERMINATED {
73+
return i + 1;
74+
}
75+
}
76+
buffer.len()
77+
}
78+
}
79+
80+
#[inline(always)]
5681
fn doc(&self) -> DocId {
5782
self.doc
5883
}
@@ -71,8 +96,8 @@ impl Scorer for AllScorer {
7196
#[cfg(test)]
7297
mod tests {
7398
use super::AllQuery;
74-
use crate::docset::TERMINATED;
75-
use crate::query::{EnableScoring, Query};
99+
use crate::docset::{DocSet, BUFFER_LEN, TERMINATED};
100+
use crate::query::{AllScorer, EnableScoring, Query};
76101
use crate::schema::{Schema, TEXT};
77102
use crate::Index;
78103

@@ -132,4 +157,22 @@ mod tests {
132157
}
133158
Ok(())
134159
}
160+
161+
#[test]
162+
pub fn test_fill_buffer() {
163+
let mut postings = AllScorer {
164+
doc: 0u32,
165+
max_doc: BUFFER_LEN as u32 * 2 + 9,
166+
};
167+
let mut buffer = [0u32; BUFFER_LEN];
168+
assert_eq!(postings.fill_buffer(&mut buffer), BUFFER_LEN);
169+
for i in 0u32..BUFFER_LEN as u32 {
170+
assert_eq!(buffer[i as usize], i);
171+
}
172+
assert_eq!(postings.fill_buffer(&mut buffer), BUFFER_LEN);
173+
for i in 0u32..BUFFER_LEN as u32 {
174+
assert_eq!(buffer[i as usize], i + BUFFER_LEN as u32);
175+
}
176+
assert_eq!(postings.fill_buffer(&mut buffer), 9);
177+
}
135178
}

src/query/bitset/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ impl From<BitSet> for BitSetDocSet {
4545
}
4646

4747
impl DocSet for BitSetDocSet {
48+
#[inline]
4849
fn advance(&mut self) -> DocId {
4950
if let Some(lower) = self.cursor_tinybitset.pop_lowest() {
5051
self.doc = (self.cursor_bucket * 64u32) | lower;

0 commit comments

Comments
 (0)