Skip to content

Commit dd7e745

Browse files
ditsukePSeitz
authored andcommitted
feat(aggregators/metric): Add a top_hits aggregator (#2198)
* feat(aggregators/metric): Implement a top_hits aggregator * fix: Expose get_fields * fix: Serializer for top_hits request Also removes extraneous the extraneous third-party serialization helper. * chore: Avert panick on parsing invalid top_hits query * refactor: Allow multiple field names from aggregations * perf: Replace binary heap with TopNComputer * fix: Avoid comparator inversion by ComparableDoc * fix: Rank missing field values lower than present values * refactor: Make KeyOrder a struct * feat: Rough attempt at docvalue_fields * feat: Complete stab at docvalue_fields - Rename "SearchResult*" => "Retrieval*" - Revert Vec => HashMap for aggregation accessors. - Split accessors for core aggregation and field retrieval. - Resolve globbed field names in docvalue_fields retrieval. - Handle strings/bytes and other column types with DynamicColumn * test(unit): Add tests for top_hits aggregator * fix: docfield_value field globbing * test(unit): Include dynamic fields * fix: Value -> OwnedValue * fix: Use OwnedValue's native Null variant * chore: Improve readability of test asserts * chore: Remove DocAddress from top_hits result * docs: Update aggregator doc * revert: accidental doc test * chore: enable time macros only for tests * chore: Apply suggestions from review * chore: Apply suggestions from review * fix: Retrieve all values for fields * test(unit): Update for multi-value retrieval * chore: Assert term existence * feat: Include all columns for a column name Since a (name, type) constitutes a unique column. * fix: Resolve json fields Introduces a translation step to bridge the difference between ColumnarReaders null `\0` separated json field keys to the common `.` separated used by SegmentReader. Although, this should probably be the default behavior for ColumnarReader's public API perhaps. * chore: Address review on mutability * chore: s/segment_id/segment_ordinal instances of SegmentOrdinal * chore: Revert erroneous grammar change
1 parent 714fbda commit dd7e745

17 files changed

+1134
-148
lines changed

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ futures = "0.3.21"
7777
paste = "1.0.11"
7878
more-asserts = "0.3.1"
7979
rand_distr = "0.4.3"
80+
time = { version = "0.3.10", features = ["serde-well-known", "macros"] }
8081

8182
[target.'cfg(not(windows))'.dev-dependencies]
8283
criterion = { version = "0.5", default-features = false }

src/aggregation/agg_req.rs

+24-15
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use super::bucket::{
3535
};
3636
use super::metric::{
3737
AverageAggregation, CountAggregation, MaxAggregation, MinAggregation,
38-
PercentilesAggregationReq, StatsAggregation, SumAggregation,
38+
PercentilesAggregationReq, StatsAggregation, SumAggregation, TopHitsAggregation,
3939
};
4040

4141
/// The top-level aggregation request structure, which contains [`Aggregation`] and their user
@@ -93,7 +93,12 @@ impl Aggregation {
9393
}
9494

9595
fn get_fast_field_names(&self, fast_field_names: &mut HashSet<String>) {
96-
fast_field_names.insert(self.agg.get_fast_field_name().to_string());
96+
fast_field_names.extend(
97+
self.agg
98+
.get_fast_field_names()
99+
.iter()
100+
.map(|s| s.to_string()),
101+
);
97102
fast_field_names.extend(get_fast_field_names(&self.sub_aggregation));
98103
}
99104
}
@@ -147,23 +152,27 @@ pub enum AggregationVariants {
147152
/// Computes the sum of the extracted values.
148153
#[serde(rename = "percentiles")]
149154
Percentiles(PercentilesAggregationReq),
155+
/// Finds the top k values matching some order
156+
#[serde(rename = "top_hits")]
157+
TopHits(TopHitsAggregation),
150158
}
151159

152160
impl AggregationVariants {
153-
/// Returns the name of the field used by the aggregation.
154-
pub fn get_fast_field_name(&self) -> &str {
161+
/// Returns the name of the fields used by the aggregation.
162+
pub fn get_fast_field_names(&self) -> Vec<&str> {
155163
match self {
156-
AggregationVariants::Terms(terms) => terms.field.as_str(),
157-
AggregationVariants::Range(range) => range.field.as_str(),
158-
AggregationVariants::Histogram(histogram) => histogram.field.as_str(),
159-
AggregationVariants::DateHistogram(histogram) => histogram.field.as_str(),
160-
AggregationVariants::Average(avg) => avg.field_name(),
161-
AggregationVariants::Count(count) => count.field_name(),
162-
AggregationVariants::Max(max) => max.field_name(),
163-
AggregationVariants::Min(min) => min.field_name(),
164-
AggregationVariants::Stats(stats) => stats.field_name(),
165-
AggregationVariants::Sum(sum) => sum.field_name(),
166-
AggregationVariants::Percentiles(per) => per.field_name(),
164+
AggregationVariants::Terms(terms) => vec![terms.field.as_str()],
165+
AggregationVariants::Range(range) => vec![range.field.as_str()],
166+
AggregationVariants::Histogram(histogram) => vec![histogram.field.as_str()],
167+
AggregationVariants::DateHistogram(histogram) => vec![histogram.field.as_str()],
168+
AggregationVariants::Average(avg) => vec![avg.field_name()],
169+
AggregationVariants::Count(count) => vec![count.field_name()],
170+
AggregationVariants::Max(max) => vec![max.field_name()],
171+
AggregationVariants::Min(min) => vec![min.field_name()],
172+
AggregationVariants::Stats(stats) => vec![stats.field_name()],
173+
AggregationVariants::Sum(sum) => vec![sum.field_name()],
174+
AggregationVariants::Percentiles(per) => vec![per.field_name()],
175+
AggregationVariants::TopHits(top_hits) => top_hits.field_names(),
167176
}
168177
}
169178

src/aggregation/agg_req_with_accessor.rs

+133-48
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
//! This will enhance the request tree with access to the fastfield and metadata.
22
3-
use columnar::{Column, ColumnBlockAccessor, ColumnType, StrColumn};
3+
use std::collections::HashMap;
4+
use std::io;
5+
6+
use columnar::{Column, ColumnBlockAccessor, ColumnType, DynamicColumn, StrColumn};
47

58
use super::agg_limits::ResourceLimitGuard;
69
use super::agg_req::{Aggregation, AggregationVariants, Aggregations};
@@ -14,7 +17,7 @@ use super::metric::{
1417
use super::segment_agg_result::AggregationLimits;
1518
use super::VecWithNames;
1619
use crate::aggregation::{f64_to_fastfield_u64, Key};
17-
use crate::SegmentReader;
20+
use crate::{SegmentOrdinal, SegmentReader};
1821

1922
#[derive(Default)]
2023
pub(crate) struct AggregationsWithAccessor {
@@ -32,6 +35,7 @@ impl AggregationsWithAccessor {
3235
}
3336

3437
pub struct AggregationWithAccessor {
38+
pub(crate) segment_ordinal: SegmentOrdinal,
3539
/// In general there can be buckets without fast field access, e.g. buckets that are created
3640
/// based on search terms. That is not that case currently, but eventually this needs to be
3741
/// Option or moved.
@@ -44,10 +48,16 @@ pub struct AggregationWithAccessor {
4448
pub(crate) limits: ResourceLimitGuard,
4549
pub(crate) column_block_accessor: ColumnBlockAccessor<u64>,
4650
/// Used for missing term aggregation, which checks all columns for existence.
51+
/// And also for `top_hits` aggregation, which may sort on multiple fields.
4752
/// By convention the missing aggregation is chosen, when this property is set
4853
/// (instead bein set in `agg`).
4954
/// If this needs to used by other aggregations, we need to refactor this.
50-
pub(crate) accessors: Vec<Column<u64>>,
55+
// NOTE: we can make all other aggregations use this instead of the `accessor` and `field_type`
56+
// (making them obsolete) But will it have a performance impact?
57+
pub(crate) accessors: Vec<(Column<u64>, ColumnType)>,
58+
/// Map field names to all associated column accessors.
59+
/// This field is used for `docvalue_fields`, which is currently only supported for `top_hits`.
60+
pub(crate) value_accessors: HashMap<String, Vec<DynamicColumn>>,
5161
pub(crate) agg: Aggregation,
5262
}
5363

@@ -57,19 +67,55 @@ impl AggregationWithAccessor {
5767
agg: &Aggregation,
5868
sub_aggregation: &Aggregations,
5969
reader: &SegmentReader,
70+
segment_ordinal: SegmentOrdinal,
6071
limits: AggregationLimits,
6172
) -> crate::Result<Vec<AggregationWithAccessor>> {
62-
let add_agg_with_accessor = |accessor: Column<u64>,
73+
let mut agg = agg.clone();
74+
75+
let add_agg_with_accessor = |agg: &Aggregation,
76+
accessor: Column<u64>,
6377
column_type: ColumnType,
6478
aggs: &mut Vec<AggregationWithAccessor>|
6579
-> crate::Result<()> {
6680
let res = AggregationWithAccessor {
81+
segment_ordinal,
6782
accessor,
68-
accessors: Vec::new(),
83+
accessors: Default::default(),
84+
value_accessors: Default::default(),
6985
field_type: column_type,
7086
sub_aggregation: get_aggs_with_segment_accessor_and_validate(
7187
sub_aggregation,
7288
reader,
89+
segment_ordinal,
90+
&limits,
91+
)?,
92+
agg: agg.clone(),
93+
limits: limits.new_guard(),
94+
missing_value_for_accessor: None,
95+
str_dict_column: None,
96+
column_block_accessor: Default::default(),
97+
};
98+
aggs.push(res);
99+
Ok(())
100+
};
101+
102+
let add_agg_with_accessors = |agg: &Aggregation,
103+
accessors: Vec<(Column<u64>, ColumnType)>,
104+
aggs: &mut Vec<AggregationWithAccessor>,
105+
value_accessors: HashMap<String, Vec<DynamicColumn>>|
106+
-> crate::Result<()> {
107+
let (accessor, field_type) = accessors.first().expect("at least one accessor");
108+
let res = AggregationWithAccessor {
109+
segment_ordinal,
110+
// TODO: We should do away with the `accessor` field altogether
111+
accessor: accessor.clone(),
112+
value_accessors,
113+
field_type: *field_type,
114+
accessors,
115+
sub_aggregation: get_aggs_with_segment_accessor_and_validate(
116+
sub_aggregation,
117+
reader,
118+
segment_ordinal,
73119
&limits,
74120
)?,
75121
agg: agg.clone(),
@@ -84,32 +130,36 @@ impl AggregationWithAccessor {
84130

85131
let mut res: Vec<AggregationWithAccessor> = Vec::new();
86132
use AggregationVariants::*;
87-
match &agg.agg {
133+
134+
match agg.agg {
88135
Range(RangeAggregation {
89-
field: field_name, ..
136+
field: ref field_name,
137+
..
90138
}) => {
91139
let (accessor, column_type) =
92140
get_ff_reader(reader, field_name, Some(get_numeric_or_date_column_types()))?;
93-
add_agg_with_accessor(accessor, column_type, &mut res)?;
141+
add_agg_with_accessor(&agg, accessor, column_type, &mut res)?;
94142
}
95143
Histogram(HistogramAggregation {
96-
field: field_name, ..
144+
field: ref field_name,
145+
..
97146
}) => {
98147
let (accessor, column_type) =
99148
get_ff_reader(reader, field_name, Some(get_numeric_or_date_column_types()))?;
100-
add_agg_with_accessor(accessor, column_type, &mut res)?;
149+
add_agg_with_accessor(&agg, accessor, column_type, &mut res)?;
101150
}
102151
DateHistogram(DateHistogramAggregationReq {
103-
field: field_name, ..
152+
field: ref field_name,
153+
..
104154
}) => {
105155
let (accessor, column_type) =
106156
// Only DateTime is supported for DateHistogram
107157
get_ff_reader(reader, field_name, Some(&[ColumnType::DateTime]))?;
108-
add_agg_with_accessor(accessor, column_type, &mut res)?;
158+
add_agg_with_accessor(&agg, accessor, column_type, &mut res)?;
109159
}
110160
Terms(TermsAggregation {
111-
field: field_name,
112-
missing,
161+
field: ref field_name,
162+
ref missing,
113163
..
114164
}) => {
115165
let str_dict_column = reader.fast_fields().str(field_name)?;
@@ -162,24 +212,11 @@ impl AggregationWithAccessor {
162212
let column_and_types =
163213
get_all_ff_reader_or_empty(reader, field_name, None, fallback_type)?;
164214

165-
let accessors: Vec<Column> =
166-
column_and_types.iter().map(|(a, _)| a.clone()).collect();
167-
let agg_wit_acc = AggregationWithAccessor {
168-
missing_value_for_accessor: None,
169-
accessor: accessors[0].clone(),
170-
accessors,
171-
field_type: ColumnType::U64,
172-
sub_aggregation: get_aggs_with_segment_accessor_and_validate(
173-
sub_aggregation,
174-
reader,
175-
&limits,
176-
)?,
177-
agg: agg.clone(),
178-
str_dict_column: str_dict_column.clone(),
179-
limits: limits.new_guard(),
180-
column_block_accessor: Default::default(),
181-
};
182-
res.push(agg_wit_acc);
215+
let accessors = column_and_types
216+
.iter()
217+
.map(|c_t| (c_t.0.clone(), c_t.1))
218+
.collect();
219+
add_agg_with_accessors(&agg, accessors, &mut res, Default::default())?;
183220
}
184221

185222
for (accessor, column_type) in column_and_types {
@@ -189,21 +226,25 @@ impl AggregationWithAccessor {
189226
missing.clone()
190227
};
191228

192-
let missing_value_for_accessor =
193-
if let Some(missing) = missing_value_term_agg.as_ref() {
194-
get_missing_val(column_type, missing, agg.agg.get_fast_field_name())?
195-
} else {
196-
None
197-
};
229+
let missing_value_for_accessor = if let Some(missing) =
230+
missing_value_term_agg.as_ref()
231+
{
232+
get_missing_val(column_type, missing, agg.agg.get_fast_field_names()[0])?
233+
} else {
234+
None
235+
};
198236

199237
let agg = AggregationWithAccessor {
238+
segment_ordinal,
200239
missing_value_for_accessor,
201240
accessor,
202-
accessors: Vec::new(),
241+
accessors: Default::default(),
242+
value_accessors: Default::default(),
203243
field_type: column_type,
204244
sub_aggregation: get_aggs_with_segment_accessor_and_validate(
205245
sub_aggregation,
206246
reader,
247+
segment_ordinal,
207248
&limits,
208249
)?,
209250
agg: agg.clone(),
@@ -215,34 +256,63 @@ impl AggregationWithAccessor {
215256
}
216257
}
217258
Average(AverageAggregation {
218-
field: field_name, ..
259+
field: ref field_name,
260+
..
219261
})
220262
| Count(CountAggregation {
221-
field: field_name, ..
263+
field: ref field_name,
264+
..
222265
})
223266
| Max(MaxAggregation {
224-
field: field_name, ..
267+
field: ref field_name,
268+
..
225269
})
226270
| Min(MinAggregation {
227-
field: field_name, ..
271+
field: ref field_name,
272+
..
228273
})
229274
| Stats(StatsAggregation {
230-
field: field_name, ..
275+
field: ref field_name,
276+
..
231277
})
232278
| Sum(SumAggregation {
233-
field: field_name, ..
279+
field: ref field_name,
280+
..
234281
}) => {
235282
let (accessor, column_type) =
236283
get_ff_reader(reader, field_name, Some(get_numeric_or_date_column_types()))?;
237-
add_agg_with_accessor(accessor, column_type, &mut res)?;
284+
add_agg_with_accessor(&agg, accessor, column_type, &mut res)?;
238285
}
239-
Percentiles(percentiles) => {
286+
Percentiles(ref percentiles) => {
240287
let (accessor, column_type) = get_ff_reader(
241288
reader,
242289
percentiles.field_name(),
243290
Some(get_numeric_or_date_column_types()),
244291
)?;
245-
add_agg_with_accessor(accessor, column_type, &mut res)?;
292+
add_agg_with_accessor(&agg, accessor, column_type, &mut res)?;
293+
}
294+
TopHits(ref mut top_hits) => {
295+
top_hits.validate_and_resolve(reader.fast_fields().columnar())?;
296+
let accessors: Vec<(Column<u64>, ColumnType)> = top_hits
297+
.field_names()
298+
.iter()
299+
.map(|field| {
300+
get_ff_reader(reader, field, Some(get_numeric_or_date_column_types()))
301+
})
302+
.collect::<crate::Result<_>>()?;
303+
304+
let value_accessors = top_hits
305+
.value_field_names()
306+
.iter()
307+
.map(|field_name| {
308+
Ok((
309+
field_name.to_string(),
310+
get_dynamic_columns(reader, field_name)?,
311+
))
312+
})
313+
.collect::<crate::Result<_>>()?;
314+
315+
add_agg_with_accessors(&agg, accessors, &mut res, value_accessors)?;
246316
}
247317
};
248318

@@ -284,6 +354,7 @@ fn get_numeric_or_date_column_types() -> &'static [ColumnType] {
284354
pub(crate) fn get_aggs_with_segment_accessor_and_validate(
285355
aggs: &Aggregations,
286356
reader: &SegmentReader,
357+
segment_ordinal: SegmentOrdinal,
287358
limits: &AggregationLimits,
288359
) -> crate::Result<AggregationsWithAccessor> {
289360
let mut aggss = Vec::new();
@@ -292,6 +363,7 @@ pub(crate) fn get_aggs_with_segment_accessor_and_validate(
292363
agg,
293364
agg.sub_aggregation(),
294365
reader,
366+
segment_ordinal,
295367
limits.clone(),
296368
)?;
297369
for agg in aggs {
@@ -321,6 +393,19 @@ fn get_ff_reader(
321393
Ok(ff_field_with_type)
322394
}
323395

396+
fn get_dynamic_columns(
397+
reader: &SegmentReader,
398+
field_name: &str,
399+
) -> crate::Result<Vec<columnar::DynamicColumn>> {
400+
let ff_fields = reader.fast_fields().dynamic_column_handles(field_name)?;
401+
let cols = ff_fields
402+
.iter()
403+
.map(|h| h.open())
404+
.collect::<io::Result<_>>()?;
405+
assert!(!ff_fields.is_empty(), "field {} not found", field_name);
406+
Ok(cols)
407+
}
408+
324409
/// Get all fast field reader or empty as default.
325410
///
326411
/// Is guaranteed to return at least one column.

0 commit comments

Comments
 (0)