Skip to content

Commit 7bf00b3

Browse files
committed
refactor aggregations
1 parent 7f45a6a commit 7bf00b3

File tree

6 files changed

+193
-202
lines changed

6 files changed

+193
-202
lines changed

src/aggregation/agg_result.rs

+7-179
Original file line numberDiff line numberDiff line change
@@ -4,21 +4,15 @@
44
//! intermediate average results, which is the sum and the number of values. The actual average is
55
//! calculated on the step from intermediate to final aggregation result tree.
66
7-
use std::cmp::Ordering;
87
use std::collections::HashMap;
98

109
use serde::{Deserialize, Serialize};
1110

12-
use super::agg_req::{
13-
Aggregations, AggregationsInternal, BucketAggregationInternal, MetricAggregation,
14-
};
15-
use super::bucket::{intermediate_buckets_to_final_buckets, GetDocCount};
16-
use super::intermediate_agg_result::{
17-
IntermediateAggregationResults, IntermediateBucketResult, IntermediateHistogramBucketEntry,
18-
IntermediateMetricResult, IntermediateRangeBucketEntry,
19-
};
11+
use super::agg_req::BucketAggregationInternal;
12+
use super::bucket::GetDocCount;
13+
use super::intermediate_agg_result::{IntermediateBucketResult, IntermediateMetricResult};
2014
use super::metric::{SingleMetricResult, Stats};
21-
use super::{Key, VecWithNames};
15+
use super::Key;
2216
use crate::TantivyError;
2317

2418
#[derive(Clone, Default, Debug, PartialEq, Serialize, Deserialize)]
@@ -41,98 +35,6 @@ impl AggregationResults {
4135
)))
4236
}
4337
}
44-
45-
/// Convert and intermediate result and its aggregation request to the final result
46-
pub fn from_intermediate_and_req(
47-
results: IntermediateAggregationResults,
48-
agg: Aggregations,
49-
) -> crate::Result<Self> {
50-
AggregationResults::from_intermediate_and_req_internal(results, &(agg.into()))
51-
}
52-
53-
/// Convert and intermediate result and its aggregation request to the final result
54-
///
55-
/// Internal function, CollectorAggregations is used instead Aggregations, which is optimized
56-
/// for internal processing, by splitting metric and buckets into seperate groups.
57-
pub(crate) fn from_intermediate_and_req_internal(
58-
intermediate_results: IntermediateAggregationResults,
59-
req: &AggregationsInternal,
60-
) -> crate::Result<Self> {
61-
// Important assumption:
62-
// When the tree contains buckets/metric, we expect it to have all buckets/metrics from the
63-
// request
64-
let mut results: HashMap<String, AggregationResult> = HashMap::new();
65-
66-
if let Some(buckets) = intermediate_results.buckets {
67-
add_coverted_final_buckets_to_result(&mut results, buckets, &req.buckets)?
68-
} else {
69-
// When there are no buckets, we create empty buckets, so that the serialized json
70-
// format is constant
71-
add_empty_final_buckets_to_result(&mut results, &req.buckets)?
72-
};
73-
74-
if let Some(metrics) = intermediate_results.metrics {
75-
add_converted_final_metrics_to_result(&mut results, metrics);
76-
} else {
77-
// When there are no metrics, we create empty metric results, so that the serialized
78-
// json format is constant
79-
add_empty_final_metrics_to_result(&mut results, &req.metrics)?;
80-
}
81-
Ok(Self(results))
82-
}
83-
}
84-
85-
fn add_converted_final_metrics_to_result(
86-
results: &mut HashMap<String, AggregationResult>,
87-
metrics: VecWithNames<IntermediateMetricResult>,
88-
) {
89-
results.extend(
90-
metrics
91-
.into_iter()
92-
.map(|(key, metric)| (key, AggregationResult::MetricResult(metric.into()))),
93-
);
94-
}
95-
96-
fn add_empty_final_metrics_to_result(
97-
results: &mut HashMap<String, AggregationResult>,
98-
req_metrics: &VecWithNames<MetricAggregation>,
99-
) -> crate::Result<()> {
100-
results.extend(req_metrics.iter().map(|(key, req)| {
101-
let empty_bucket = IntermediateMetricResult::empty_from_req(req);
102-
(
103-
key.to_string(),
104-
AggregationResult::MetricResult(empty_bucket.into()),
105-
)
106-
}));
107-
Ok(())
108-
}
109-
110-
fn add_empty_final_buckets_to_result(
111-
results: &mut HashMap<String, AggregationResult>,
112-
req_buckets: &VecWithNames<BucketAggregationInternal>,
113-
) -> crate::Result<()> {
114-
let requested_buckets = req_buckets.iter();
115-
for (key, req) in requested_buckets {
116-
let empty_bucket = AggregationResult::BucketResult(BucketResult::empty_from_req(req)?);
117-
results.insert(key.to_string(), empty_bucket);
118-
}
119-
Ok(())
120-
}
121-
122-
fn add_coverted_final_buckets_to_result(
123-
results: &mut HashMap<String, AggregationResult>,
124-
buckets: VecWithNames<IntermediateBucketResult>,
125-
req_buckets: &VecWithNames<BucketAggregationInternal>,
126-
) -> crate::Result<()> {
127-
assert_eq!(buckets.len(), req_buckets.len());
128-
129-
let buckets_with_request = buckets.into_iter().zip(req_buckets.values());
130-
for ((key, bucket), req) in buckets_with_request {
131-
let result =
132-
AggregationResult::BucketResult(BucketResult::from_intermediate_and_req(bucket, req)?);
133-
results.insert(key, result);
134-
}
135-
Ok(())
13638
}
13739

13840
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
@@ -154,7 +56,8 @@ impl AggregationResult {
15456
match self {
15557
AggregationResult::BucketResult(_bucket) => Err(TantivyError::InternalError(
15658
"Tried to retrieve value from bucket aggregation. This is not supported and \
157-
should not happen during collection, but should be catched during validation"
59+
should not happen during collection phase, but should be catched during \
60+
validation"
15861
.to_string(),
15962
)),
16063
AggregationResult::MetricResult(metric) => metric.get_value(agg_property),
@@ -230,48 +133,7 @@ pub enum BucketResult {
230133
impl BucketResult {
231134
pub(crate) fn empty_from_req(req: &BucketAggregationInternal) -> crate::Result<Self> {
232135
let empty_bucket = IntermediateBucketResult::empty_from_req(&req.bucket_agg);
233-
BucketResult::from_intermediate_and_req(empty_bucket, req)
234-
}
235-
236-
fn from_intermediate_and_req(
237-
bucket_result: IntermediateBucketResult,
238-
req: &BucketAggregationInternal,
239-
) -> crate::Result<Self> {
240-
match bucket_result {
241-
IntermediateBucketResult::Range(range_res) => {
242-
let mut buckets: Vec<RangeBucketEntry> = range_res
243-
.buckets
244-
.into_iter()
245-
.map(|(_, bucket)| {
246-
RangeBucketEntry::from_intermediate_and_req(bucket, &req.sub_aggregation)
247-
})
248-
.collect::<crate::Result<Vec<_>>>()?;
249-
250-
buckets.sort_by(|left, right| {
251-
// TODO use total_cmp next stable rust release
252-
left.from
253-
.unwrap_or(f64::MIN)
254-
.partial_cmp(&right.from.unwrap_or(f64::MIN))
255-
.unwrap_or(Ordering::Equal)
256-
});
257-
Ok(BucketResult::Range { buckets })
258-
}
259-
IntermediateBucketResult::Histogram { buckets } => {
260-
let buckets = intermediate_buckets_to_final_buckets(
261-
buckets,
262-
req.as_histogram()
263-
.expect("unexpected aggregation, expected histogram aggregation"),
264-
&req.sub_aggregation,
265-
)?;
266-
267-
Ok(BucketResult::Histogram { buckets })
268-
}
269-
IntermediateBucketResult::Terms(terms) => terms.into_final_result(
270-
req.as_term()
271-
.expect("unexpected aggregation, expected term aggregation"),
272-
&req.sub_aggregation,
273-
),
274-
}
136+
empty_bucket.into_final_bucket_result(req)
275137
}
276138
}
277139

@@ -311,22 +173,6 @@ pub struct BucketEntry {
311173
/// Sub-aggregations in this bucket.
312174
pub sub_aggregation: AggregationResults,
313175
}
314-
315-
impl BucketEntry {
316-
pub(crate) fn from_intermediate_and_req(
317-
entry: IntermediateHistogramBucketEntry,
318-
req: &AggregationsInternal,
319-
) -> crate::Result<Self> {
320-
Ok(BucketEntry {
321-
key: Key::F64(entry.key),
322-
doc_count: entry.doc_count,
323-
sub_aggregation: AggregationResults::from_intermediate_and_req_internal(
324-
entry.sub_aggregation,
325-
req,
326-
)?,
327-
})
328-
}
329-
}
330176
impl GetDocCount for &BucketEntry {
331177
fn doc_count(&self) -> u64 {
332178
self.doc_count
@@ -384,21 +230,3 @@ pub struct RangeBucketEntry {
384230
#[serde(skip_serializing_if = "Option::is_none")]
385231
pub to: Option<f64>,
386232
}
387-
388-
impl RangeBucketEntry {
389-
fn from_intermediate_and_req(
390-
entry: IntermediateRangeBucketEntry,
391-
req: &AggregationsInternal,
392-
) -> crate::Result<Self> {
393-
Ok(RangeBucketEntry {
394-
key: entry.key,
395-
doc_count: entry.doc_count,
396-
sub_aggregation: AggregationResults::from_intermediate_and_req_internal(
397-
entry.sub_aggregation,
398-
req,
399-
)?,
400-
to: entry.to,
401-
from: entry.from,
402-
})
403-
}
404-
}

src/aggregation/bucket/histogram/histogram.rs

+5-7
Original file line numberDiff line numberDiff line change
@@ -482,14 +482,12 @@ fn intermediate_buckets_to_final_buckets_fill_gaps(
482482
sub_aggregation: empty_sub_aggregation.clone(),
483483
},
484484
})
485-
.map(|intermediate_bucket| {
486-
BucketEntry::from_intermediate_and_req(intermediate_bucket, sub_aggregation)
487-
})
485+
.map(|intermediate_bucket| intermediate_bucket.into_final_bucket_entry(sub_aggregation))
488486
.collect::<crate::Result<Vec<_>>>()
489487
}
490488

491489
// Convert to BucketEntry
492-
pub(crate) fn intermediate_buckets_to_final_buckets(
490+
pub(crate) fn intermediate_histogram_buckets_to_final_buckets(
493491
buckets: Vec<IntermediateHistogramBucketEntry>,
494492
histogram_req: &HistogramAggregation,
495493
sub_aggregation: &AggregationsInternal,
@@ -503,8 +501,8 @@ pub(crate) fn intermediate_buckets_to_final_buckets(
503501
} else {
504502
buckets
505503
.into_iter()
506-
.filter(|bucket| bucket.doc_count >= histogram_req.min_doc_count())
507-
.map(|bucket| BucketEntry::from_intermediate_and_req(bucket, sub_aggregation))
504+
.filter(|histogram_bucket| histogram_bucket.doc_count >= histogram_req.min_doc_count())
505+
.map(|histogram_bucket| histogram_bucket.into_final_bucket_entry(sub_aggregation))
508506
.collect::<crate::Result<Vec<_>>>()
509507
}
510508
}
@@ -546,7 +544,7 @@ pub(crate) fn generate_buckets_with_opt_minmax(
546544
let offset = req.offset.unwrap_or(0.0);
547545
let first_bucket_num = get_bucket_num_f64(min, req.interval, offset) as i64;
548546
let last_bucket_num = get_bucket_num_f64(max, req.interval, offset) as i64;
549-
let mut buckets = vec![];
547+
let mut buckets = Vec::with_capacity((first_bucket_num..=last_bucket_num).count());
550548
for bucket_pos in first_bucket_num..=last_bucket_num {
551549
let bucket_key = bucket_pos as f64 * req.interval + offset;
552550
buckets.push(bucket_key);

src/aggregation/bucket/range.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,7 @@ fn to_u64_range(range: &RangeAggregationRange, field_type: &Type) -> crate::Resu
317317
}
318318

319319
/// Extends the provided buckets to contain the whole value range, by inserting buckets at the
320-
/// beginning and end.
320+
/// beginning and end and filling gaps.
321321
fn extend_validate_ranges(
322322
buckets: &[RangeAggregationRange],
323323
field_type: &Type,

src/aggregation/collector.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ impl Collector for AggregationCollector {
8787
segment_fruits: Vec<<Self::Child as SegmentCollector>::Fruit>,
8888
) -> crate::Result<Self::Fruit> {
8989
let res = merge_fruits(segment_fruits)?;
90-
AggregationResults::from_intermediate_and_req(res, self.agg.clone())
90+
res.into_final_bucket_result(self.agg.clone())
9191
}
9292
}
9393

0 commit comments

Comments
 (0)