Skip to content

Commit 6ca5f77

Browse files
authored
Merge pull request #1363 from quickwit-oss/refactor_aggregation
Add aggregation bucket limit
2 parents de178a1 + 2e2822f commit 6ca5f77

11 files changed

+455
-299
lines changed

examples/aggregation.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ fn main() -> tantivy::Result<()> {
117117
.into_iter()
118118
.collect();
119119

120-
let collector = AggregationCollector::from_aggs(agg_req_1);
120+
let collector = AggregationCollector::from_aggs(agg_req_1, None);
121121

122122
let searcher = reader.searcher();
123123
let agg_res: AggregationResults = searcher.search(&term_query, &collector).unwrap();

src/aggregation/agg_req_with_accessor.rs

+20-1
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
//! This will enhance the request tree with access to the fastfield and metadata.
22
3+
use std::rc::Rc;
4+
use std::sync::atomic::AtomicU32;
35
use std::sync::Arc;
46

57
use super::agg_req::{Aggregation, Aggregations, BucketAggregationType, MetricAggregation};
68
use super::bucket::{HistogramAggregation, RangeAggregation, TermsAggregation};
79
use super::metric::{AverageAggregation, StatsAggregation};
10+
use super::segment_agg_result::BucketCount;
811
use super::VecWithNames;
912
use crate::fastfield::{
1013
type_and_cardinality, DynamicFastFieldReader, FastType, MultiValuedFastFieldReader,
@@ -60,13 +63,16 @@ pub struct BucketAggregationWithAccessor {
6063
pub(crate) field_type: Type,
6164
pub(crate) bucket_agg: BucketAggregationType,
6265
pub(crate) sub_aggregation: AggregationsWithAccessor,
66+
pub(crate) bucket_count: BucketCount,
6367
}
6468

6569
impl BucketAggregationWithAccessor {
6670
fn try_from_bucket(
6771
bucket: &BucketAggregationType,
6872
sub_aggregation: &Aggregations,
6973
reader: &SegmentReader,
74+
bucket_count: Rc<AtomicU32>,
75+
max_bucket_count: u32,
7076
) -> crate::Result<BucketAggregationWithAccessor> {
7177
let mut inverted_index = None;
7278
let (accessor, field_type) = match &bucket {
@@ -92,9 +98,18 @@ impl BucketAggregationWithAccessor {
9298
Ok(BucketAggregationWithAccessor {
9399
accessor,
94100
field_type,
95-
sub_aggregation: get_aggs_with_accessor_and_validate(&sub_aggregation, reader)?,
101+
sub_aggregation: get_aggs_with_accessor_and_validate(
102+
&sub_aggregation,
103+
reader,
104+
bucket_count.clone(),
105+
max_bucket_count,
106+
)?,
96107
bucket_agg: bucket.clone(),
97108
inverted_index,
109+
bucket_count: BucketCount {
110+
bucket_count,
111+
max_bucket_count,
112+
},
98113
})
99114
}
100115
}
@@ -134,6 +149,8 @@ impl MetricAggregationWithAccessor {
134149
pub(crate) fn get_aggs_with_accessor_and_validate(
135150
aggs: &Aggregations,
136151
reader: &SegmentReader,
152+
bucket_count: Rc<AtomicU32>,
153+
max_bucket_count: u32,
137154
) -> crate::Result<AggregationsWithAccessor> {
138155
let mut metrics = vec![];
139156
let mut buckets = vec![];
@@ -145,6 +162,8 @@ pub(crate) fn get_aggs_with_accessor_and_validate(
145162
&bucket.bucket_agg,
146163
&bucket.sub_aggregation,
147164
reader,
165+
Rc::clone(&bucket_count),
166+
max_bucket_count,
148167
)?,
149168
)),
150169
Aggregation::Metric(metric) => metrics.push((

src/aggregation/agg_result.rs

+7-179
Original file line numberDiff line numberDiff line change
@@ -4,21 +4,15 @@
44
//! intermediate average results, which is the sum and the number of values. The actual average is
55
//! calculated on the step from intermediate to final aggregation result tree.
66
7-
use std::cmp::Ordering;
87
use std::collections::HashMap;
98

109
use serde::{Deserialize, Serialize};
1110

12-
use super::agg_req::{
13-
Aggregations, AggregationsInternal, BucketAggregationInternal, MetricAggregation,
14-
};
15-
use super::bucket::{intermediate_buckets_to_final_buckets, GetDocCount};
16-
use super::intermediate_agg_result::{
17-
IntermediateAggregationResults, IntermediateBucketResult, IntermediateHistogramBucketEntry,
18-
IntermediateMetricResult, IntermediateRangeBucketEntry,
19-
};
11+
use super::agg_req::BucketAggregationInternal;
12+
use super::bucket::GetDocCount;
13+
use super::intermediate_agg_result::{IntermediateBucketResult, IntermediateMetricResult};
2014
use super::metric::{SingleMetricResult, Stats};
21-
use super::{Key, VecWithNames};
15+
use super::Key;
2216
use crate::TantivyError;
2317

2418
#[derive(Clone, Default, Debug, PartialEq, Serialize, Deserialize)]
@@ -41,98 +35,6 @@ impl AggregationResults {
4135
)))
4236
}
4337
}
44-
45-
/// Convert and intermediate result and its aggregation request to the final result
46-
pub fn from_intermediate_and_req(
47-
results: IntermediateAggregationResults,
48-
agg: Aggregations,
49-
) -> crate::Result<Self> {
50-
AggregationResults::from_intermediate_and_req_internal(results, &(agg.into()))
51-
}
52-
53-
/// Convert and intermediate result and its aggregation request to the final result
54-
///
55-
/// Internal function, CollectorAggregations is used instead Aggregations, which is optimized
56-
/// for internal processing, by splitting metric and buckets into seperate groups.
57-
pub(crate) fn from_intermediate_and_req_internal(
58-
intermediate_results: IntermediateAggregationResults,
59-
req: &AggregationsInternal,
60-
) -> crate::Result<Self> {
61-
// Important assumption:
62-
// When the tree contains buckets/metric, we expect it to have all buckets/metrics from the
63-
// request
64-
let mut results: HashMap<String, AggregationResult> = HashMap::new();
65-
66-
if let Some(buckets) = intermediate_results.buckets {
67-
add_coverted_final_buckets_to_result(&mut results, buckets, &req.buckets)?
68-
} else {
69-
// When there are no buckets, we create empty buckets, so that the serialized json
70-
// format is constant
71-
add_empty_final_buckets_to_result(&mut results, &req.buckets)?
72-
};
73-
74-
if let Some(metrics) = intermediate_results.metrics {
75-
add_converted_final_metrics_to_result(&mut results, metrics);
76-
} else {
77-
// When there are no metrics, we create empty metric results, so that the serialized
78-
// json format is constant
79-
add_empty_final_metrics_to_result(&mut results, &req.metrics)?;
80-
}
81-
Ok(Self(results))
82-
}
83-
}
84-
85-
fn add_converted_final_metrics_to_result(
86-
results: &mut HashMap<String, AggregationResult>,
87-
metrics: VecWithNames<IntermediateMetricResult>,
88-
) {
89-
results.extend(
90-
metrics
91-
.into_iter()
92-
.map(|(key, metric)| (key, AggregationResult::MetricResult(metric.into()))),
93-
);
94-
}
95-
96-
fn add_empty_final_metrics_to_result(
97-
results: &mut HashMap<String, AggregationResult>,
98-
req_metrics: &VecWithNames<MetricAggregation>,
99-
) -> crate::Result<()> {
100-
results.extend(req_metrics.iter().map(|(key, req)| {
101-
let empty_bucket = IntermediateMetricResult::empty_from_req(req);
102-
(
103-
key.to_string(),
104-
AggregationResult::MetricResult(empty_bucket.into()),
105-
)
106-
}));
107-
Ok(())
108-
}
109-
110-
fn add_empty_final_buckets_to_result(
111-
results: &mut HashMap<String, AggregationResult>,
112-
req_buckets: &VecWithNames<BucketAggregationInternal>,
113-
) -> crate::Result<()> {
114-
let requested_buckets = req_buckets.iter();
115-
for (key, req) in requested_buckets {
116-
let empty_bucket = AggregationResult::BucketResult(BucketResult::empty_from_req(req)?);
117-
results.insert(key.to_string(), empty_bucket);
118-
}
119-
Ok(())
120-
}
121-
122-
fn add_coverted_final_buckets_to_result(
123-
results: &mut HashMap<String, AggregationResult>,
124-
buckets: VecWithNames<IntermediateBucketResult>,
125-
req_buckets: &VecWithNames<BucketAggregationInternal>,
126-
) -> crate::Result<()> {
127-
assert_eq!(buckets.len(), req_buckets.len());
128-
129-
let buckets_with_request = buckets.into_iter().zip(req_buckets.values());
130-
for ((key, bucket), req) in buckets_with_request {
131-
let result =
132-
AggregationResult::BucketResult(BucketResult::from_intermediate_and_req(bucket, req)?);
133-
results.insert(key, result);
134-
}
135-
Ok(())
13638
}
13739

13840
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
@@ -154,7 +56,8 @@ impl AggregationResult {
15456
match self {
15557
AggregationResult::BucketResult(_bucket) => Err(TantivyError::InternalError(
15658
"Tried to retrieve value from bucket aggregation. This is not supported and \
157-
should not happen during collection, but should be catched during validation"
59+
should not happen during collection phase, but should be catched during \
60+
validation"
15861
.to_string(),
15962
)),
16063
AggregationResult::MetricResult(metric) => metric.get_value(agg_property),
@@ -230,48 +133,7 @@ pub enum BucketResult {
230133
impl BucketResult {
231134
pub(crate) fn empty_from_req(req: &BucketAggregationInternal) -> crate::Result<Self> {
232135
let empty_bucket = IntermediateBucketResult::empty_from_req(&req.bucket_agg);
233-
BucketResult::from_intermediate_and_req(empty_bucket, req)
234-
}
235-
236-
fn from_intermediate_and_req(
237-
bucket_result: IntermediateBucketResult,
238-
req: &BucketAggregationInternal,
239-
) -> crate::Result<Self> {
240-
match bucket_result {
241-
IntermediateBucketResult::Range(range_res) => {
242-
let mut buckets: Vec<RangeBucketEntry> = range_res
243-
.buckets
244-
.into_iter()
245-
.map(|(_, bucket)| {
246-
RangeBucketEntry::from_intermediate_and_req(bucket, &req.sub_aggregation)
247-
})
248-
.collect::<crate::Result<Vec<_>>>()?;
249-
250-
buckets.sort_by(|left, right| {
251-
// TODO use total_cmp next stable rust release
252-
left.from
253-
.unwrap_or(f64::MIN)
254-
.partial_cmp(&right.from.unwrap_or(f64::MIN))
255-
.unwrap_or(Ordering::Equal)
256-
});
257-
Ok(BucketResult::Range { buckets })
258-
}
259-
IntermediateBucketResult::Histogram { buckets } => {
260-
let buckets = intermediate_buckets_to_final_buckets(
261-
buckets,
262-
req.as_histogram()
263-
.expect("unexpected aggregation, expected histogram aggregation"),
264-
&req.sub_aggregation,
265-
)?;
266-
267-
Ok(BucketResult::Histogram { buckets })
268-
}
269-
IntermediateBucketResult::Terms(terms) => terms.into_final_result(
270-
req.as_term()
271-
.expect("unexpected aggregation, expected term aggregation"),
272-
&req.sub_aggregation,
273-
),
274-
}
136+
empty_bucket.into_final_bucket_result(req)
275137
}
276138
}
277139

@@ -311,22 +173,6 @@ pub struct BucketEntry {
311173
/// Sub-aggregations in this bucket.
312174
pub sub_aggregation: AggregationResults,
313175
}
314-
315-
impl BucketEntry {
316-
pub(crate) fn from_intermediate_and_req(
317-
entry: IntermediateHistogramBucketEntry,
318-
req: &AggregationsInternal,
319-
) -> crate::Result<Self> {
320-
Ok(BucketEntry {
321-
key: Key::F64(entry.key),
322-
doc_count: entry.doc_count,
323-
sub_aggregation: AggregationResults::from_intermediate_and_req_internal(
324-
entry.sub_aggregation,
325-
req,
326-
)?,
327-
})
328-
}
329-
}
330176
impl GetDocCount for &BucketEntry {
331177
fn doc_count(&self) -> u64 {
332178
self.doc_count
@@ -384,21 +230,3 @@ pub struct RangeBucketEntry {
384230
#[serde(skip_serializing_if = "Option::is_none")]
385231
pub to: Option<f64>,
386232
}
387-
388-
impl RangeBucketEntry {
389-
fn from_intermediate_and_req(
390-
entry: IntermediateRangeBucketEntry,
391-
req: &AggregationsInternal,
392-
) -> crate::Result<Self> {
393-
Ok(RangeBucketEntry {
394-
key: entry.key,
395-
doc_count: entry.doc_count,
396-
sub_aggregation: AggregationResults::from_intermediate_and_req_internal(
397-
entry.sub_aggregation,
398-
req,
399-
)?,
400-
to: entry.to,
401-
from: entry.from,
402-
})
403-
}
404-
}

0 commit comments

Comments
 (0)