Skip to content

Commit 2dfe379

Browse files
authored
handle multiple types in term aggregation (#2041)
1 parent e248a49 commit 2dfe379

File tree

6 files changed

+242
-34
lines changed

6 files changed

+242
-34
lines changed

src/aggregation/agg_bench.rs

+39-1
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ mod bench {
4040
)
4141
.set_stored();
4242
let text_field = schema_builder.add_text_field("text", text_fieldtype);
43+
let json_field = schema_builder.add_json_field("json", FAST);
4344
let text_field_many_terms = schema_builder.add_text_field("text_many_terms", STRING | FAST);
4445
let text_field_few_terms = schema_builder.add_text_field("text_few_terms", STRING | FAST);
4546
let score_fieldtype = crate::schema::NumericOptions::default().set_fast();
@@ -56,14 +57,16 @@ mod bench {
5657
.collect::<Vec<_>>();
5758
{
5859
let mut rng = StdRng::from_seed([1u8; 32]);
59-
let mut index_writer = index.writer_with_num_threads(1, 100_000_000)?;
60+
let mut index_writer = index.writer_with_num_threads(1, 200_000_000)?;
6061
// To make the different test cases comparable we just change one doc to force the
6162
// cardinality
6263
if cardinality == Cardinality::Optional {
6364
index_writer.add_document(doc!())?;
6465
}
6566
if cardinality == Cardinality::Multivalued {
6667
index_writer.add_document(doc!(
68+
json_field => json!({"mixed_type": 10.0}),
69+
json_field => json!({"mixed_type": 10.0}),
6770
text_field => "cool",
6871
text_field => "cool",
6972
text_field_many_terms => "cool",
@@ -82,10 +85,18 @@ mod bench {
8285
if cardinality == Cardinality::Sparse {
8386
doc_with_value /= 20;
8487
}
88+
let val_max = 1_000_000.0;
8589
for _ in 0..doc_with_value {
8690
let val: f64 = rng.gen_range(0.0..1_000_000.0);
91+
let json = if rng.gen_bool(0.1) {
92+
// 10% are numeric values
93+
json!({ "mixed_type": val })
94+
} else {
95+
json!({"mixed_type": many_terms_data.choose(&mut rng).unwrap().to_string()})
96+
};
8797
index_writer.add_document(doc!(
8898
text_field => "cool",
99+
json_field => json,
89100
text_field_many_terms => many_terms_data.choose(&mut rng).unwrap().to_string(),
90101
text_field_few_terms => few_terms_data.choose(&mut rng).unwrap().to_string(),
91102
score_field => val as u64,
@@ -303,6 +314,33 @@ mod bench {
303314
});
304315
}
305316

317+
bench_all_cardinalities!(bench_aggregation_terms_many_json_mixed_type_with_sub_agg);
318+
319+
fn bench_aggregation_terms_many_json_mixed_type_with_sub_agg_card(
320+
b: &mut Bencher,
321+
cardinality: Cardinality,
322+
) {
323+
let index = get_test_index_bench(cardinality).unwrap();
324+
let reader = index.reader().unwrap();
325+
326+
b.iter(|| {
327+
let agg_req: Aggregations = serde_json::from_value(json!({
328+
"my_texts": {
329+
"terms": { "field": "json.mixed_type" },
330+
"aggs": {
331+
"average_f64": { "avg": { "field": "score_f64" } }
332+
}
333+
},
334+
}))
335+
.unwrap();
336+
337+
let collector = get_collector(agg_req);
338+
339+
let searcher = reader.searcher();
340+
searcher.search(&AllQuery, &collector).unwrap()
341+
});
342+
}
343+
306344
bench_all_cardinalities!(bench_aggregation_terms_many2);
307345

308346
fn bench_aggregation_terms_many2_card(b: &mut Bencher, cardinality: Cardinality) {

src/aggregation/agg_req_with_accessor.rs

+48-24
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ pub struct AggregationWithAccessor {
3636
pub(crate) accessor: Column<u64>,
3737
pub(crate) str_dict_column: Option<StrColumn>,
3838
pub(crate) field_type: ColumnType,
39+
/// In case there are multiple types of fast fields, e.g. string and numeric.
40+
/// Only used for term aggregations
41+
pub(crate) accessor2: Option<(Column<u64>, ColumnType)>,
3942
pub(crate) sub_aggregation: AggregationsWithAccessor,
4043
pub(crate) limits: ResourceLimitGuard,
4144
pub(crate) column_block_accessor: ColumnBlockAccessor<u64>,
@@ -50,51 +53,51 @@ impl AggregationWithAccessor {
5053
limits: AggregationLimits,
5154
) -> crate::Result<AggregationWithAccessor> {
5255
let mut str_dict_column = None;
56+
let mut accessor2 = None;
5357
use AggregationVariants::*;
5458
let (accessor, field_type) = match &agg.agg {
5559
Range(RangeAggregation {
5660
field: field_name, ..
57-
}) => get_ff_reader_and_validate(
58-
reader,
59-
field_name,
60-
Some(get_numeric_or_date_column_types()),
61-
)?,
61+
}) => get_ff_reader(reader, field_name, Some(get_numeric_or_date_column_types()))?,
6262
Histogram(HistogramAggregation {
6363
field: field_name, ..
64-
}) => get_ff_reader_and_validate(
65-
reader,
66-
field_name,
67-
Some(get_numeric_or_date_column_types()),
68-
)?,
64+
}) => get_ff_reader(reader, field_name, Some(get_numeric_or_date_column_types()))?,
6965
DateHistogram(DateHistogramAggregationReq {
7066
field: field_name, ..
71-
}) => get_ff_reader_and_validate(
72-
reader,
73-
field_name,
74-
Some(get_numeric_or_date_column_types()),
75-
)?,
67+
}) => get_ff_reader(reader, field_name, Some(get_numeric_or_date_column_types()))?,
7668
Terms(TermsAggregation {
7769
field: field_name, ..
7870
}) => {
7971
str_dict_column = reader.fast_fields().str(field_name)?;
80-
get_ff_reader_and_validate(reader, field_name, None)?
72+
let allowed_column_types = [
73+
ColumnType::I64,
74+
ColumnType::U64,
75+
ColumnType::F64,
76+
ColumnType::Bytes,
77+
ColumnType::Str,
78+
// ColumnType::Bool Unsupported
79+
// ColumnType::IpAddr Unsupported
80+
// ColumnType::DateTime Unsupported
81+
];
82+
let mut columns =
83+
get_all_ff_reader(reader, field_name, Some(&allowed_column_types))?;
84+
let first = columns.pop().unwrap();
85+
accessor2 = columns.pop();
86+
first
8187
}
8288
Average(AverageAggregation { field: field_name })
8389
| Count(CountAggregation { field: field_name })
8490
| Max(MaxAggregation { field: field_name })
8591
| Min(MinAggregation { field: field_name })
8692
| Stats(StatsAggregation { field: field_name })
8793
| Sum(SumAggregation { field: field_name }) => {
88-
let (accessor, field_type) = get_ff_reader_and_validate(
89-
reader,
90-
field_name,
91-
Some(get_numeric_or_date_column_types()),
92-
)?;
94+
let (accessor, field_type) =
95+
get_ff_reader(reader, field_name, Some(get_numeric_or_date_column_types()))?;
9396

9497
(accessor, field_type)
9598
}
9699
Percentiles(percentiles) => {
97-
let (accessor, field_type) = get_ff_reader_and_validate(
100+
let (accessor, field_type) = get_ff_reader(
98101
reader,
99102
percentiles.field_name(),
100103
Some(get_numeric_or_date_column_types()),
@@ -105,6 +108,7 @@ impl AggregationWithAccessor {
105108
let sub_aggregation = sub_aggregation.clone();
106109
Ok(AggregationWithAccessor {
107110
accessor,
111+
accessor2,
108112
field_type,
109113
sub_aggregation: get_aggs_with_segment_accessor_and_validate(
110114
&sub_aggregation,
@@ -150,8 +154,8 @@ pub(crate) fn get_aggs_with_segment_accessor_and_validate(
150154
))
151155
}
152156

153-
/// Get fast field reader with given cardinatility.
154-
fn get_ff_reader_and_validate(
157+
/// Get fast field reader or empty as default.
158+
fn get_ff_reader(
155159
reader: &SegmentReader,
156160
field_name: &str,
157161
allowed_column_types: Option<&[ColumnType]>,
@@ -167,3 +171,23 @@ fn get_ff_reader_and_validate(
167171
});
168172
Ok(ff_field_with_type)
169173
}
174+
175+
/// Get all fast field reader or empty as default.
176+
///
177+
/// Is guaranteed to return at least one column.
178+
fn get_all_ff_reader(
179+
reader: &SegmentReader,
180+
field_name: &str,
181+
allowed_column_types: Option<&[ColumnType]>,
182+
) -> crate::Result<Vec<(columnar::Column<u64>, ColumnType)>> {
183+
let ff_fields = reader.fast_fields();
184+
let mut ff_field_with_type =
185+
ff_fields.u64_lenient_for_type_all(allowed_column_types, field_name)?;
186+
if ff_field_with_type.is_empty() {
187+
ff_field_with_type.push((
188+
Column::build_empty_column(reader.num_docs()),
189+
ColumnType::U64,
190+
));
191+
}
192+
Ok(ff_field_with_type)
193+
}

src/aggregation/agg_tests.rs

+2-3
Original file line numberDiff line numberDiff line change
@@ -826,10 +826,9 @@ fn test_aggregation_on_json_object_mixed_types() {
826826
"buckets": [
827827
{ "doc_count": 1, "key": 10.0, "min_price": { "value": 10.0 } },
828828
{ "doc_count": 1, "key": -20.5, "min_price": { "value": -20.5 } },
829-
// TODO red is missing since there is no multi aggregation within one
830-
// segment for multiple types
831829
// TODO bool is also not yet handled in aggregation
832-
{ "doc_count": 1, "key": "blue", "min_price": { "value": null } }
830+
{ "doc_count": 1, "key": "blue", "min_price": { "value": null } },
831+
{ "doc_count": 1, "key": "red", "min_price": { "value": null } },
833832
],
834833
"sum_other_doc_count": 0
835834
}

src/aggregation/bucket/term_agg.rs

+104
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,110 @@ impl TermBuckets {
224224
}
225225
}
226226

227+
/// The composite collector is used, when we have different types under one field, to support a term
228+
/// aggregation on both.
229+
#[derive(Clone, Debug)]
230+
pub struct SegmentTermCollectorComposite {
231+
term_agg1: SegmentTermCollector, // field type 1, e.g. strings
232+
term_agg2: SegmentTermCollector, // field type 2, e.g. u64
233+
accessor_idx: usize,
234+
}
235+
impl SegmentAggregationCollector for SegmentTermCollectorComposite {
236+
fn add_intermediate_aggregation_result(
237+
self: Box<Self>,
238+
agg_with_accessor: &AggregationsWithAccessor,
239+
results: &mut IntermediateAggregationResults,
240+
) -> crate::Result<()> {
241+
let name = agg_with_accessor.aggs.keys[self.accessor_idx].to_string();
242+
let agg_with_accessor = &agg_with_accessor.aggs.values[self.accessor_idx];
243+
244+
let bucket = self
245+
.term_agg1
246+
.into_intermediate_bucket_result(agg_with_accessor)?;
247+
results.push(
248+
name.to_string(),
249+
IntermediateAggregationResult::Bucket(bucket),
250+
)?;
251+
let bucket = self
252+
.term_agg2
253+
.into_intermediate_bucket_result(agg_with_accessor)?;
254+
results.push(name, IntermediateAggregationResult::Bucket(bucket))?;
255+
256+
Ok(())
257+
}
258+
259+
#[inline]
260+
fn collect(
261+
&mut self,
262+
doc: crate::DocId,
263+
agg_with_accessor: &mut AggregationsWithAccessor,
264+
) -> crate::Result<()> {
265+
self.term_agg1.collect_block(&[doc], agg_with_accessor)?;
266+
self.swap_accessor(&mut agg_with_accessor.aggs.values[self.accessor_idx]);
267+
self.term_agg2.collect_block(&[doc], agg_with_accessor)?;
268+
self.swap_accessor(&mut agg_with_accessor.aggs.values[self.accessor_idx]);
269+
Ok(())
270+
}
271+
272+
#[inline]
273+
fn collect_block(
274+
&mut self,
275+
docs: &[crate::DocId],
276+
agg_with_accessor: &mut AggregationsWithAccessor,
277+
) -> crate::Result<()> {
278+
self.term_agg1.collect_block(docs, agg_with_accessor)?;
279+
self.swap_accessor(&mut agg_with_accessor.aggs.values[self.accessor_idx]);
280+
self.term_agg2.collect_block(docs, agg_with_accessor)?;
281+
self.swap_accessor(&mut agg_with_accessor.aggs.values[self.accessor_idx]);
282+
283+
Ok(())
284+
}
285+
286+
fn flush(&mut self, agg_with_accessor: &mut AggregationsWithAccessor) -> crate::Result<()> {
287+
self.term_agg1.flush(agg_with_accessor)?;
288+
self.swap_accessor(&mut agg_with_accessor.aggs.values[self.accessor_idx]);
289+
self.term_agg2.flush(agg_with_accessor)?;
290+
self.swap_accessor(&mut agg_with_accessor.aggs.values[self.accessor_idx]);
291+
292+
Ok(())
293+
}
294+
}
295+
296+
impl SegmentTermCollectorComposite {
297+
/// Swaps the accessor and field type with the second accessor and field type.
298+
/// This way we can use the same code for both aggregations.
299+
fn swap_accessor(&self, aggregations: &mut AggregationWithAccessor) {
300+
if let Some(accessor) = aggregations.accessor2.as_mut() {
301+
std::mem::swap(&mut accessor.0, &mut aggregations.accessor);
302+
std::mem::swap(&mut accessor.1, &mut aggregations.field_type);
303+
}
304+
}
305+
306+
pub(crate) fn from_req_and_validate(
307+
req: &TermsAggregation,
308+
sub_aggregations: &mut AggregationsWithAccessor,
309+
field_type: ColumnType,
310+
field_type2: ColumnType,
311+
accessor_idx: usize,
312+
) -> crate::Result<Self> {
313+
Ok(Self {
314+
term_agg1: SegmentTermCollector::from_req_and_validate(
315+
req,
316+
sub_aggregations,
317+
field_type,
318+
accessor_idx,
319+
)?,
320+
term_agg2: SegmentTermCollector::from_req_and_validate(
321+
req,
322+
sub_aggregations,
323+
field_type2,
324+
accessor_idx,
325+
)?,
326+
accessor_idx,
327+
})
328+
}
329+
}
330+
227331
/// The collector puts values from the fast field into the correct buckets and does a conversion to
228332
/// the correct datatype.
229333
#[derive(Clone, Debug)]

src/aggregation/segment_agg_result.rs

+21-6
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use super::metric::{
1515
SegmentPercentilesCollector, SegmentStatsCollector, SegmentStatsType, StatsAggregation,
1616
SumAggregation,
1717
};
18+
use crate::aggregation::bucket::SegmentTermCollectorComposite;
1819

1920
pub(crate) trait SegmentAggregationCollector: CollectorClone + Debug {
2021
fn add_intermediate_aggregation_result(
@@ -80,12 +81,26 @@ pub(crate) fn build_single_agg_segment_collector(
8081
) -> crate::Result<Box<dyn SegmentAggregationCollector>> {
8182
use AggregationVariants::*;
8283
match &req.agg.agg {
83-
Terms(terms_req) => Ok(Box::new(SegmentTermCollector::from_req_and_validate(
84-
terms_req,
85-
&mut req.sub_aggregation,
86-
req.field_type,
87-
accessor_idx,
88-
)?)),
84+
Terms(terms_req) => {
85+
if let Some(acc2) = req.accessor2.as_ref() {
86+
Ok(Box::new(
87+
SegmentTermCollectorComposite::from_req_and_validate(
88+
terms_req,
89+
&mut req.sub_aggregation,
90+
req.field_type,
91+
acc2.1,
92+
accessor_idx,
93+
)?,
94+
))
95+
} else {
96+
Ok(Box::new(SegmentTermCollector::from_req_and_validate(
97+
terms_req,
98+
&mut req.sub_aggregation,
99+
req.field_type,
100+
accessor_idx,
101+
)?))
102+
}
103+
}
89104
Range(range_req) => Ok(Box::new(SegmentRangeCollector::from_req_and_validate(
90105
range_req,
91106
&mut req.sub_aggregation,

0 commit comments

Comments
 (0)