4
4
//! intermediate average results, which is the sum and the number of values. The actual average is
5
5
//! calculated on the step from intermediate to final aggregation result tree.
6
6
7
- use std:: cmp:: Ordering ;
8
7
use std:: collections:: HashMap ;
9
8
10
9
use serde:: { Deserialize , Serialize } ;
11
10
12
- use super :: agg_req:: {
13
- Aggregations , AggregationsInternal , BucketAggregationInternal , MetricAggregation ,
14
- } ;
15
- use super :: bucket:: { intermediate_buckets_to_final_buckets, GetDocCount } ;
16
- use super :: intermediate_agg_result:: {
17
- IntermediateAggregationResults , IntermediateBucketResult , IntermediateHistogramBucketEntry ,
18
- IntermediateMetricResult , IntermediateRangeBucketEntry ,
19
- } ;
11
+ use super :: agg_req:: BucketAggregationInternal ;
12
+ use super :: bucket:: GetDocCount ;
13
+ use super :: intermediate_agg_result:: { IntermediateBucketResult , IntermediateMetricResult } ;
20
14
use super :: metric:: { SingleMetricResult , Stats } ;
21
- use super :: { Key , VecWithNames } ;
15
+ use super :: Key ;
22
16
use crate :: TantivyError ;
23
17
24
18
#[ derive( Clone , Default , Debug , PartialEq , Serialize , Deserialize ) ]
@@ -41,98 +35,6 @@ impl AggregationResults {
41
35
) ) )
42
36
}
43
37
}
44
-
45
- /// Convert and intermediate result and its aggregation request to the final result
46
- pub fn from_intermediate_and_req (
47
- results : IntermediateAggregationResults ,
48
- agg : Aggregations ,
49
- ) -> crate :: Result < Self > {
50
- AggregationResults :: from_intermediate_and_req_internal ( results, & ( agg. into ( ) ) )
51
- }
52
-
53
- /// Convert and intermediate result and its aggregation request to the final result
54
- ///
55
- /// Internal function, CollectorAggregations is used instead Aggregations, which is optimized
56
- /// for internal processing, by splitting metric and buckets into seperate groups.
57
- pub ( crate ) fn from_intermediate_and_req_internal (
58
- intermediate_results : IntermediateAggregationResults ,
59
- req : & AggregationsInternal ,
60
- ) -> crate :: Result < Self > {
61
- // Important assumption:
62
- // When the tree contains buckets/metric, we expect it to have all buckets/metrics from the
63
- // request
64
- let mut results: HashMap < String , AggregationResult > = HashMap :: new ( ) ;
65
-
66
- if let Some ( buckets) = intermediate_results. buckets {
67
- add_coverted_final_buckets_to_result ( & mut results, buckets, & req. buckets ) ?
68
- } else {
69
- // When there are no buckets, we create empty buckets, so that the serialized json
70
- // format is constant
71
- add_empty_final_buckets_to_result ( & mut results, & req. buckets ) ?
72
- } ;
73
-
74
- if let Some ( metrics) = intermediate_results. metrics {
75
- add_converted_final_metrics_to_result ( & mut results, metrics) ;
76
- } else {
77
- // When there are no metrics, we create empty metric results, so that the serialized
78
- // json format is constant
79
- add_empty_final_metrics_to_result ( & mut results, & req. metrics ) ?;
80
- }
81
- Ok ( Self ( results) )
82
- }
83
- }
84
-
85
- fn add_converted_final_metrics_to_result (
86
- results : & mut HashMap < String , AggregationResult > ,
87
- metrics : VecWithNames < IntermediateMetricResult > ,
88
- ) {
89
- results. extend (
90
- metrics
91
- . into_iter ( )
92
- . map ( |( key, metric) | ( key, AggregationResult :: MetricResult ( metric. into ( ) ) ) ) ,
93
- ) ;
94
- }
95
-
96
- fn add_empty_final_metrics_to_result (
97
- results : & mut HashMap < String , AggregationResult > ,
98
- req_metrics : & VecWithNames < MetricAggregation > ,
99
- ) -> crate :: Result < ( ) > {
100
- results. extend ( req_metrics. iter ( ) . map ( |( key, req) | {
101
- let empty_bucket = IntermediateMetricResult :: empty_from_req ( req) ;
102
- (
103
- key. to_string ( ) ,
104
- AggregationResult :: MetricResult ( empty_bucket. into ( ) ) ,
105
- )
106
- } ) ) ;
107
- Ok ( ( ) )
108
- }
109
-
110
- fn add_empty_final_buckets_to_result (
111
- results : & mut HashMap < String , AggregationResult > ,
112
- req_buckets : & VecWithNames < BucketAggregationInternal > ,
113
- ) -> crate :: Result < ( ) > {
114
- let requested_buckets = req_buckets. iter ( ) ;
115
- for ( key, req) in requested_buckets {
116
- let empty_bucket = AggregationResult :: BucketResult ( BucketResult :: empty_from_req ( req) ?) ;
117
- results. insert ( key. to_string ( ) , empty_bucket) ;
118
- }
119
- Ok ( ( ) )
120
- }
121
-
122
- fn add_coverted_final_buckets_to_result (
123
- results : & mut HashMap < String , AggregationResult > ,
124
- buckets : VecWithNames < IntermediateBucketResult > ,
125
- req_buckets : & VecWithNames < BucketAggregationInternal > ,
126
- ) -> crate :: Result < ( ) > {
127
- assert_eq ! ( buckets. len( ) , req_buckets. len( ) ) ;
128
-
129
- let buckets_with_request = buckets. into_iter ( ) . zip ( req_buckets. values ( ) ) ;
130
- for ( ( key, bucket) , req) in buckets_with_request {
131
- let result =
132
- AggregationResult :: BucketResult ( BucketResult :: from_intermediate_and_req ( bucket, req) ?) ;
133
- results. insert ( key, result) ;
134
- }
135
- Ok ( ( ) )
136
38
}
137
39
138
40
#[ derive( Clone , Debug , PartialEq , Serialize , Deserialize ) ]
@@ -154,7 +56,7 @@ impl AggregationResult {
154
56
match self {
155
57
AggregationResult :: BucketResult ( _bucket) => Err ( TantivyError :: InternalError (
156
58
"Tried to retrieve value from bucket aggregation. This is not supported and \
157
- should not happen during collection, but should be catched during validation"
59
+ should not happen during collection phase , but should be catched during validation"
158
60
. to_string ( ) ,
159
61
) ) ,
160
62
AggregationResult :: MetricResult ( metric) => metric. get_value ( agg_property) ,
@@ -230,48 +132,7 @@ pub enum BucketResult {
230
132
impl BucketResult {
231
133
pub ( crate ) fn empty_from_req ( req : & BucketAggregationInternal ) -> crate :: Result < Self > {
232
134
let empty_bucket = IntermediateBucketResult :: empty_from_req ( & req. bucket_agg ) ;
233
- BucketResult :: from_intermediate_and_req ( empty_bucket, req)
234
- }
235
-
236
- fn from_intermediate_and_req (
237
- bucket_result : IntermediateBucketResult ,
238
- req : & BucketAggregationInternal ,
239
- ) -> crate :: Result < Self > {
240
- match bucket_result {
241
- IntermediateBucketResult :: Range ( range_res) => {
242
- let mut buckets: Vec < RangeBucketEntry > = range_res
243
- . buckets
244
- . into_iter ( )
245
- . map ( |( _, bucket) | {
246
- RangeBucketEntry :: from_intermediate_and_req ( bucket, & req. sub_aggregation )
247
- } )
248
- . collect :: < crate :: Result < Vec < _ > > > ( ) ?;
249
-
250
- buckets. sort_by ( |left, right| {
251
- // TODO use total_cmp next stable rust release
252
- left. from
253
- . unwrap_or ( f64:: MIN )
254
- . partial_cmp ( & right. from . unwrap_or ( f64:: MIN ) )
255
- . unwrap_or ( Ordering :: Equal )
256
- } ) ;
257
- Ok ( BucketResult :: Range { buckets } )
258
- }
259
- IntermediateBucketResult :: Histogram { buckets } => {
260
- let buckets = intermediate_buckets_to_final_buckets (
261
- buckets,
262
- req. as_histogram ( )
263
- . expect ( "unexpected aggregation, expected histogram aggregation" ) ,
264
- & req. sub_aggregation ,
265
- ) ?;
266
-
267
- Ok ( BucketResult :: Histogram { buckets } )
268
- }
269
- IntermediateBucketResult :: Terms ( terms) => terms. into_final_result (
270
- req. as_term ( )
271
- . expect ( "unexpected aggregation, expected term aggregation" ) ,
272
- & req. sub_aggregation ,
273
- ) ,
274
- }
135
+ empty_bucket. into_final_bucket_result ( req)
275
136
}
276
137
}
277
138
@@ -311,22 +172,6 @@ pub struct BucketEntry {
311
172
/// Sub-aggregations in this bucket.
312
173
pub sub_aggregation : AggregationResults ,
313
174
}
314
-
315
- impl BucketEntry {
316
- pub ( crate ) fn from_intermediate_and_req (
317
- entry : IntermediateHistogramBucketEntry ,
318
- req : & AggregationsInternal ,
319
- ) -> crate :: Result < Self > {
320
- Ok ( BucketEntry {
321
- key : Key :: F64 ( entry. key ) ,
322
- doc_count : entry. doc_count ,
323
- sub_aggregation : AggregationResults :: from_intermediate_and_req_internal (
324
- entry. sub_aggregation ,
325
- req,
326
- ) ?,
327
- } )
328
- }
329
- }
330
175
impl GetDocCount for & BucketEntry {
331
176
fn doc_count ( & self ) -> u64 {
332
177
self . doc_count
@@ -384,21 +229,3 @@ pub struct RangeBucketEntry {
384
229
#[ serde( skip_serializing_if = "Option::is_none" ) ]
385
230
pub to : Option < f64 > ,
386
231
}
387
-
388
- impl RangeBucketEntry {
389
- fn from_intermediate_and_req (
390
- entry : IntermediateRangeBucketEntry ,
391
- req : & AggregationsInternal ,
392
- ) -> crate :: Result < Self > {
393
- Ok ( RangeBucketEntry {
394
- key : entry. key ,
395
- doc_count : entry. doc_count ,
396
- sub_aggregation : AggregationResults :: from_intermediate_and_req_internal (
397
- entry. sub_aggregation ,
398
- req,
399
- ) ?,
400
- to : entry. to ,
401
- from : entry. from ,
402
- } )
403
- }
404
- }
0 commit comments