Skip to content

Commit 2029e08

Browse files
committed
handle ip adresses in term aggregation (#2319)
* handle ip adresses in term aggregation Stores IpAdresses during the segment term aggregation via u64 representation and convert to u128(IpV6Adress) via downcast when converting to intermediate results. Enable Downcasting on `ColumnValues` Expose u64 variant for u128 encoded data via `open_u64_lenient` method. Remove lifetime in VecColumn, to avoid 'static lifetime requirement coming from downcast trait. * rename method
1 parent d00ea85 commit 2029e08

19 files changed

+256
-63
lines changed

columnar/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ sstable = { version= "0.2", path = "../sstable", package = "tantivy-sstable" }
1717
common = { version= "0.6", path = "../common", package = "tantivy-common" }
1818
tantivy-bitpacker = { version= "0.5", path = "../bitpacker/" }
1919
serde = "1.0.152"
20+
downcast-rs = "1.2.0"
2021

2122
[dev-dependencies]
2223
proptest = "1"

columnar/src/column/mod.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ use std::sync::Arc;
99
use common::BinarySerializable;
1010
pub use dictionary_encoded::{BytesColumn, StrColumn};
1111
pub use serialize::{
12-
open_column_bytes, open_column_str, open_column_u128, open_column_u64,
13-
serialize_column_mappable_to_u128, serialize_column_mappable_to_u64,
12+
open_column_bytes, open_column_str, open_column_u128, open_column_u128_as_compact_u64,
13+
open_column_u64, serialize_column_mappable_to_u128, serialize_column_mappable_to_u64,
1414
};
1515

1616
use crate::column_index::ColumnIndex;
@@ -169,6 +169,7 @@ struct FirstValueWithDefault<T: Copy> {
169169
impl<T: PartialOrd + Debug + Send + Sync + Copy + 'static> ColumnValues<T>
170170
for FirstValueWithDefault<T>
171171
{
172+
#[inline(always)]
172173
fn get_val(&self, idx: u32) -> T {
173174
self.column.first(idx).unwrap_or(self.default_value)
174175
}

columnar/src/column/serialize.rs

+20
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,26 @@ pub fn open_column_u128<T: MonotonicallyMappableToU128>(
7676
})
7777
}
7878

79+
/// Open the column as u64.
80+
///
81+
/// See [`open_u128_as_compact_u64`] for more details.
82+
pub fn open_column_u128_as_compact_u64(bytes: OwnedBytes) -> io::Result<Column<u64>> {
83+
let (body, column_index_num_bytes_payload) = bytes.rsplit(4);
84+
let column_index_num_bytes = u32::from_le_bytes(
85+
column_index_num_bytes_payload
86+
.as_slice()
87+
.try_into()
88+
.unwrap(),
89+
);
90+
let (column_index_data, column_values_data) = body.split(column_index_num_bytes as usize);
91+
let column_index = crate::column_index::open_column_index(column_index_data)?;
92+
let column_values = crate::column_values::open_u128_as_compact_u64(column_values_data)?;
93+
Ok(Column {
94+
index: column_index,
95+
values: column_values,
96+
})
97+
}
98+
7999
pub fn open_column_bytes(data: OwnedBytes) -> io::Result<BytesColumn> {
80100
let (body, dictionary_len_bytes) = data.rsplit(4);
81101
let dictionary_len = u32::from_le_bytes(dictionary_len_bytes.as_slice().try_into().unwrap());

columnar/src/column_values/merge.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ pub(crate) struct MergedColumnValues<'a, T> {
1010
pub(crate) merge_row_order: &'a MergeRowOrder,
1111
}
1212

13-
impl<'a, T: Copy + PartialOrd + Debug> Iterable<T> for MergedColumnValues<'a, T> {
13+
impl<'a, T: Copy + PartialOrd + Debug + 'static> Iterable<T> for MergedColumnValues<'a, T> {
1414
fn boxed_iter(&self) -> Box<dyn Iterator<Item = T> + '_> {
1515
match self.merge_row_order {
1616
MergeRowOrder::Stack(_) => Box::new(

columnar/src/column_values/mod.rs

+8-3
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use std::fmt::Debug;
1010
use std::ops::{Range, RangeInclusive};
1111
use std::sync::Arc;
1212

13+
use downcast_rs::DowncastSync;
1314
pub use monotonic_mapping::{MonotonicallyMappableToU64, StrictlyMonotonicFn};
1415
pub use monotonic_mapping_u128::MonotonicallyMappableToU128;
1516

@@ -25,7 +26,10 @@ mod monotonic_column;
2526

2627
pub(crate) use merge::MergedColumnValues;
2728
pub use stats::ColumnStats;
28-
pub use u128_based::{open_u128_mapped, serialize_column_values_u128};
29+
pub use u128_based::{
30+
open_u128_as_compact_u64, open_u128_mapped, serialize_column_values_u128,
31+
CompactSpaceU64Accessor,
32+
};
2933
pub use u64_based::{
3034
load_u64_based_column_values, serialize_and_load_u64_based_column_values,
3135
serialize_u64_based_column_values, CodecType, ALL_U64_CODEC_TYPES,
@@ -41,7 +45,7 @@ use crate::RowId;
4145
///
4246
/// Any methods with a default and specialized implementation need to be called in the
4347
/// wrappers that implement the trait: Arc and MonotonicMappingColumn
44-
pub trait ColumnValues<T: PartialOrd = u64>: Send + Sync {
48+
pub trait ColumnValues<T: PartialOrd = u64>: Send + Sync + DowncastSync {
4549
/// Return the value associated with the given idx.
4650
///
4751
/// This accessor should return as fast as possible.
@@ -139,6 +143,7 @@ pub trait ColumnValues<T: PartialOrd = u64>: Send + Sync {
139143
Box::new((0..self.num_vals()).map(|idx| self.get_val(idx)))
140144
}
141145
}
146+
downcast_rs::impl_downcast!(sync ColumnValues<T> where T: PartialOrd);
142147

143148
/// Empty column of values.
144149
pub struct EmptyColumnValues;
@@ -161,7 +166,7 @@ impl<T: PartialOrd + Default> ColumnValues<T> for EmptyColumnValues {
161166
}
162167
}
163168

164-
impl<T: Copy + PartialOrd + Debug> ColumnValues<T> for Arc<dyn ColumnValues<T>> {
169+
impl<T: Copy + PartialOrd + Debug + 'static> ColumnValues<T> for Arc<dyn ColumnValues<T>> {
165170
#[inline(always)]
166171
fn get_val(&self, idx: u32) -> T {
167172
self.as_ref().get_val(idx)

columnar/src/column_values/monotonic_column.rs

+9-9
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,10 @@ pub fn monotonic_map_column<C, T, Input, Output>(
3131
monotonic_mapping: T,
3232
) -> impl ColumnValues<Output>
3333
where
34-
C: ColumnValues<Input>,
35-
T: StrictlyMonotonicFn<Input, Output> + Send + Sync,
36-
Input: PartialOrd + Debug + Send + Sync + Clone,
37-
Output: PartialOrd + Debug + Send + Sync + Clone,
34+
C: ColumnValues<Input> + 'static,
35+
T: StrictlyMonotonicFn<Input, Output> + Send + Sync + 'static,
36+
Input: PartialOrd + Debug + Send + Sync + Clone + 'static,
37+
Output: PartialOrd + Debug + Send + Sync + Clone + 'static,
3838
{
3939
MonotonicMappingColumn {
4040
from_column,
@@ -45,10 +45,10 @@ where
4545

4646
impl<C, T, Input, Output> ColumnValues<Output> for MonotonicMappingColumn<C, T, Input>
4747
where
48-
C: ColumnValues<Input>,
49-
T: StrictlyMonotonicFn<Input, Output> + Send + Sync,
50-
Input: PartialOrd + Send + Debug + Sync + Clone,
51-
Output: PartialOrd + Send + Debug + Sync + Clone,
48+
C: ColumnValues<Input> + 'static,
49+
T: StrictlyMonotonicFn<Input, Output> + Send + Sync + 'static,
50+
Input: PartialOrd + Send + Debug + Sync + Clone + 'static,
51+
Output: PartialOrd + Send + Debug + Sync + Clone + 'static,
5252
{
5353
#[inline(always)]
5454
fn get_val(&self, idx: u32) -> Output {
@@ -107,7 +107,7 @@ mod tests {
107107
#[test]
108108
fn test_monotonic_mapping_iter() {
109109
let vals: Vec<u64> = (0..100u64).map(|el| el * 10).collect();
110-
let col = VecColumn::from(&vals);
110+
let col = VecColumn::from(vals);
111111
let mapped = monotonic_map_column(
112112
col,
113113
StrictlyMonotonicMappingInverter::from(StrictlyMonotonicMappingToInternal::<i64>::new()),

columnar/src/column_values/u128_based/compact_space/mod.rs

+63-1
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,63 @@ impl BinarySerializable for IPCodecParams {
292292
}
293293
}
294294

295+
/// Exposes the compact space compressed values as u64.
296+
///
297+
/// This allows faster access to the values, as u64 is faster to work with than u128.
298+
/// It also allows to handle u128 values like u64, via the `open_u64_lenient` as a uniform
299+
/// access interface.
300+
///
301+
/// When converting from the internal u64 to u128 `compact_to_u128` can be used.
302+
pub struct CompactSpaceU64Accessor(CompactSpaceDecompressor);
303+
impl CompactSpaceU64Accessor {
304+
pub(crate) fn open(data: OwnedBytes) -> io::Result<CompactSpaceU64Accessor> {
305+
let decompressor = CompactSpaceU64Accessor(CompactSpaceDecompressor::open(data)?);
306+
Ok(decompressor)
307+
}
308+
/// Convert a compact space value to u128
309+
pub fn compact_to_u128(&self, compact: u32) -> u128 {
310+
self.0.compact_to_u128(compact)
311+
}
312+
}
313+
314+
impl ColumnValues<u64> for CompactSpaceU64Accessor {
315+
#[inline]
316+
fn get_val(&self, doc: u32) -> u64 {
317+
let compact = self.0.get_compact(doc);
318+
compact as u64
319+
}
320+
321+
fn min_value(&self) -> u64 {
322+
self.0.u128_to_compact(self.0.min_value()).unwrap() as u64
323+
}
324+
325+
fn max_value(&self) -> u64 {
326+
self.0.u128_to_compact(self.0.max_value()).unwrap() as u64
327+
}
328+
329+
fn num_vals(&self) -> u32 {
330+
self.0.params.num_vals
331+
}
332+
333+
#[inline]
334+
fn iter(&self) -> Box<dyn Iterator<Item = u64> + '_> {
335+
Box::new(self.0.iter_compact().map(|el| el as u64))
336+
}
337+
338+
#[inline]
339+
fn get_row_ids_for_value_range(
340+
&self,
341+
value_range: RangeInclusive<u64>,
342+
position_range: Range<u32>,
343+
positions: &mut Vec<u32>,
344+
) {
345+
let value_range = self.0.compact_to_u128(*value_range.start() as u32)
346+
..=self.0.compact_to_u128(*value_range.end() as u32);
347+
self.0
348+
.get_row_ids_for_value_range(value_range, position_range, positions)
349+
}
350+
}
351+
295352
impl ColumnValues<u128> for CompactSpaceDecompressor {
296353
#[inline]
297354
fn get_val(&self, doc: u32) -> u128 {
@@ -402,9 +459,14 @@ impl CompactSpaceDecompressor {
402459
.map(|compact| self.compact_to_u128(compact))
403460
}
404461

462+
#[inline]
463+
pub fn get_compact(&self, idx: u32) -> u32 {
464+
self.params.bit_unpacker.get(idx, &self.data) as u32
465+
}
466+
405467
#[inline]
406468
pub fn get(&self, idx: u32) -> u128 {
407-
let compact = self.params.bit_unpacker.get(idx, &self.data) as u32;
469+
let compact = self.get_compact(idx);
408470
self.compact_to_u128(compact)
409471
}
410472

columnar/src/column_values/u128_based/mod.rs

+20-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ use std::sync::Arc;
66
mod compact_space;
77

88
use common::{BinarySerializable, OwnedBytes, VInt};
9-
use compact_space::{CompactSpaceCompressor, CompactSpaceDecompressor};
9+
pub use compact_space::{
10+
CompactSpaceCompressor, CompactSpaceDecompressor, CompactSpaceU64Accessor,
11+
};
1012

1113
use crate::column_values::monotonic_map_column;
1214
use crate::column_values::monotonic_mapping::{
@@ -108,6 +110,23 @@ pub fn open_u128_mapped<T: MonotonicallyMappableToU128 + Debug>(
108110
StrictlyMonotonicMappingToInternal::<T>::new().into();
109111
Ok(Arc::new(monotonic_map_column(reader, inverted)))
110112
}
113+
114+
/// Returns the u64 representation of the u128 data.
115+
/// The internal representation of the data as u64 is useful for faster processing.
116+
///
117+
/// In order to convert to u128 back cast to `CompactSpaceU64Accessor` and call
118+
/// `compact_to_u128`.
119+
///
120+
/// # Notice
121+
/// In case there are new codecs added, check for usages of `CompactSpaceDecompressorU64` and
122+
/// also handle the new codecs.
123+
pub fn open_u128_as_compact_u64(mut bytes: OwnedBytes) -> io::Result<Arc<dyn ColumnValues<u64>>> {
124+
let header = U128Header::deserialize(&mut bytes)?;
125+
assert_eq!(header.codec_type, U128FastFieldCodecType::CompactSpace);
126+
let reader = CompactSpaceU64Accessor::open(bytes)?;
127+
Ok(Arc::new(reader))
128+
}
129+
111130
#[cfg(test)]
112131
pub mod tests {
113132
use super::*;

columnar/src/column_values/u64_based/bitpacked.rs

-1
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@ impl ColumnValues for BitpackedReader {
6363
fn get_val(&self, doc: u32) -> u64 {
6464
self.stats.min_value + self.stats.gcd.get() * self.bit_unpacker.get(doc, &self.data)
6565
}
66-
6766
#[inline]
6867
fn min_value(&self) -> u64 {
6968
self.stats.min_value

columnar/src/column_values/u64_based/blockwise_linear.rs

+5-2
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,10 @@ impl BlockwiseLinearEstimator {
6363
if self.block.is_empty() {
6464
return;
6565
}
66-
let line = Line::train(&VecColumn::from(&self.block));
66+
let column = VecColumn::from(std::mem::take(&mut self.block));
67+
let line = Line::train(&column);
68+
self.block = column.into();
69+
6770
let mut max_value = 0u64;
6871
for (i, buffer_val) in self.block.iter().enumerate() {
6972
let interpolated_val = line.eval(i as u32);
@@ -125,7 +128,7 @@ impl ColumnCodecEstimator for BlockwiseLinearEstimator {
125128
*buffer_val = gcd_divider.divide(*buffer_val - stats.min_value);
126129
}
127130

128-
let line = Line::train(&VecColumn::from(&buffer));
131+
let line = Line::train(&VecColumn::from(buffer.to_vec()));
129132

130133
assert!(!buffer.is_empty());
131134

columnar/src/column_values/u64_based/line.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ mod tests {
184184
}
185185

186186
fn test_eval_max_err(ys: &[u64]) -> Option<u64> {
187-
let line = Line::train(&VecColumn::from(&ys));
187+
let line = Line::train(&VecColumn::from(ys.to_vec()));
188188
ys.iter()
189189
.enumerate()
190190
.map(|(x, y)| y.wrapping_sub(line.eval(x as u32)))

columnar/src/column_values/u64_based/linear.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,9 @@ impl LinearCodecEstimator {
173173
fn collect_before_line_estimation(&mut self, value: u64) {
174174
self.block.push(value);
175175
if self.block.len() == LINE_ESTIMATION_BLOCK_LEN {
176-
let line = Line::train(&VecColumn::from(&self.block));
176+
let column = VecColumn::from(std::mem::take(&mut self.block));
177+
let line = Line::train(&column);
178+
self.block = column.into();
177179
let block = std::mem::take(&mut self.block);
178180
for val in block {
179181
self.collect_after_line_estimation(&line, val);

columnar/src/column_values/vec_column.rs

+11-9
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,14 @@ use tantivy_bitpacker::minmax;
44

55
use crate::ColumnValues;
66

7-
/// VecColumn provides `Column` over a slice.
8-
pub struct VecColumn<'a, T = u64> {
9-
pub(crate) values: &'a [T],
7+
/// VecColumn provides `Column` over a `Vec<T>`.
8+
pub struct VecColumn<T = u64> {
9+
pub(crate) values: Vec<T>,
1010
pub(crate) min_value: T,
1111
pub(crate) max_value: T,
1212
}
1313

14-
impl<'a, T: Copy + PartialOrd + Send + Sync + Debug> ColumnValues<T> for VecColumn<'a, T> {
14+
impl<T: Copy + PartialOrd + Send + Sync + Debug + 'static> ColumnValues<T> for VecColumn<T> {
1515
fn get_val(&self, position: u32) -> T {
1616
self.values[position as usize]
1717
}
@@ -37,11 +37,8 @@ impl<'a, T: Copy + PartialOrd + Send + Sync + Debug> ColumnValues<T> for VecColu
3737
}
3838
}
3939

40-
impl<'a, T: Copy + PartialOrd + Default, V> From<&'a V> for VecColumn<'a, T>
41-
where V: AsRef<[T]> + ?Sized
42-
{
43-
fn from(values: &'a V) -> Self {
44-
let values = values.as_ref();
40+
impl<T: Copy + PartialOrd + Default> From<Vec<T>> for VecColumn<T> {
41+
fn from(values: Vec<T>) -> Self {
4542
let (min_value, max_value) = minmax(values.iter().copied()).unwrap_or_default();
4643
Self {
4744
values,
@@ -50,3 +47,8 @@ where V: AsRef<[T]> + ?Sized
5047
}
5148
}
5249
}
50+
impl From<VecColumn> for Vec<u64> {
51+
fn from(column: VecColumn) -> Self {
52+
column.values
53+
}
54+
}

columnar/src/columnar/writer/mod.rs

+3-11
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,7 @@ pub(crate) use serializer::ColumnarSerializer;
1313
use stacker::{Addr, ArenaHashMap, MemoryArena};
1414

1515
use crate::column_index::SerializableColumnIndex;
16-
use crate::column_values::{
17-
ColumnValues, MonotonicallyMappableToU128, MonotonicallyMappableToU64, VecColumn,
18-
};
16+
use crate::column_values::{MonotonicallyMappableToU128, MonotonicallyMappableToU64};
1917
use crate::columnar::column_type::ColumnType;
2018
use crate::columnar::writer::column_writers::{
2119
ColumnWriter, NumericalColumnWriter, StrOrBytesColumnWriter,
@@ -645,10 +643,7 @@ fn send_to_serialize_column_mappable_to_u128<
645643
value_index_builders: &mut PreallocatedIndexBuilders,
646644
values: &mut Vec<T>,
647645
mut wrt: impl io::Write,
648-
) -> io::Result<()>
649-
where
650-
for<'a> VecColumn<'a, T>: ColumnValues<T>,
651-
{
646+
) -> io::Result<()> {
652647
values.clear();
653648
// TODO: split index and values
654649
let serializable_column_index = match cardinality {
@@ -701,10 +696,7 @@ fn send_to_serialize_column_mappable_to_u64(
701696
value_index_builders: &mut PreallocatedIndexBuilders,
702697
values: &mut Vec<u64>,
703698
mut wrt: impl io::Write,
704-
) -> io::Result<()>
705-
where
706-
for<'a> VecColumn<'a, u64>: ColumnValues<u64>,
707-
{
699+
) -> io::Result<()> {
708700
values.clear();
709701
let serializable_column_index = match cardinality {
710702
Cardinality::Full => {

0 commit comments

Comments
 (0)