Skip to content

Commit 7a86ce1

Browse files
committed
handle ip adresses in term aggregation
Stores IpAdresses during the segment term aggregation via u64 representation and convert to u128(IpV6Adress) via downcast when converting to intermediate results. Enable Downcasting on `ColumnValues` Expose u64 variant for u128 encoded data via `open_u64_lenient` method. Remove lifetime in VecColumn, to avoid 'static lifetime requirement coming from downcast trait.
1 parent d57622d commit 7a86ce1

19 files changed

+260
-62
lines changed

columnar/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ sstable = { version= "0.2", path = "../sstable", package = "tantivy-sstable" }
1717
common = { version= "0.6", path = "../common", package = "tantivy-common" }
1818
tantivy-bitpacker = { version= "0.5", path = "../bitpacker/" }
1919
serde = "1.0.152"
20+
downcast-rs = "1.2.0"
2021

2122
[dev-dependencies]
2223
proptest = "1"

columnar/src/column/mod.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use std::sync::Arc;
99
use common::BinarySerializable;
1010
pub use dictionary_encoded::{BytesColumn, StrColumn};
1111
pub use serialize::{
12-
open_column_bytes, open_column_str, open_column_u128, open_column_u64,
12+
open_column_bytes, open_column_str, open_column_u128, open_column_u128_as_u64, open_column_u64,
1313
serialize_column_mappable_to_u128, serialize_column_mappable_to_u64,
1414
};
1515

@@ -176,6 +176,7 @@ struct FirstValueWithDefault<T: Copy> {
176176
impl<T: PartialOrd + Debug + Send + Sync + Copy + 'static> ColumnValues<T>
177177
for FirstValueWithDefault<T>
178178
{
179+
#[inline(always)]
179180
fn get_val(&self, idx: u32) -> T {
180181
self.column.first(idx).unwrap_or(self.default_value)
181182
}

columnar/src/column/serialize.rs

+20
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,26 @@ pub fn open_column_u128<T: MonotonicallyMappableToU128>(
7676
})
7777
}
7878

79+
/// Open the column as u64.
80+
///
81+
/// See [`open_u128_as_u64`] for more details.
82+
pub fn open_column_u128_as_u64(bytes: OwnedBytes) -> io::Result<Column<u64>> {
83+
let (body, column_index_num_bytes_payload) = bytes.rsplit(4);
84+
let column_index_num_bytes = u32::from_le_bytes(
85+
column_index_num_bytes_payload
86+
.as_slice()
87+
.try_into()
88+
.unwrap(),
89+
);
90+
let (column_index_data, column_values_data) = body.split(column_index_num_bytes as usize);
91+
let column_index = crate::column_index::open_column_index(column_index_data)?;
92+
let column_values = crate::column_values::open_u128_as_u64(column_values_data)?;
93+
Ok(Column {
94+
index: column_index,
95+
values: column_values,
96+
})
97+
}
98+
7999
pub fn open_column_bytes(data: OwnedBytes) -> io::Result<BytesColumn> {
80100
let (body, dictionary_len_bytes) = data.rsplit(4);
81101
let dictionary_len = u32::from_le_bytes(dictionary_len_bytes.as_slice().try_into().unwrap());

columnar/src/column_values/merge.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ pub(crate) struct MergedColumnValues<'a, T> {
1010
pub(crate) merge_row_order: &'a MergeRowOrder,
1111
}
1212

13-
impl<'a, T: Copy + PartialOrd + Debug> Iterable<T> for MergedColumnValues<'a, T> {
13+
impl<'a, T: Copy + PartialOrd + Debug + 'static> Iterable<T> for MergedColumnValues<'a, T> {
1414
fn boxed_iter(&self) -> Box<dyn Iterator<Item = T> + '_> {
1515
match self.merge_row_order {
1616
MergeRowOrder::Stack(_) => Box::new(

columnar/src/column_values/mod.rs

+12-3
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use std::fmt::Debug;
1010
use std::ops::{Range, RangeInclusive};
1111
use std::sync::Arc;
1212

13+
use downcast_rs::DowncastSync;
1314
pub use monotonic_mapping::{MonotonicallyMappableToU64, StrictlyMonotonicFn};
1415
pub use monotonic_mapping_u128::MonotonicallyMappableToU128;
1516

@@ -25,7 +26,9 @@ mod monotonic_column;
2526

2627
pub(crate) use merge::MergedColumnValues;
2728
pub use stats::ColumnStats;
28-
pub use u128_based::{open_u128_mapped, serialize_column_values_u128};
29+
pub use u128_based::{
30+
open_u128_as_u64, open_u128_mapped, serialize_column_values_u128, CompactSpaceDecompressorU64,
31+
};
2932
pub use u64_based::{
3033
load_u64_based_column_values, serialize_and_load_u64_based_column_values,
3134
serialize_u64_based_column_values, CodecType, ALL_U64_CODEC_TYPES,
@@ -41,7 +44,7 @@ use crate::RowId;
4144
///
4245
/// Any methods with a default and specialized implementation need to be called in the
4346
/// wrappers that implement the trait: Arc and MonotonicMappingColumn
44-
pub trait ColumnValues<T: PartialOrd = u64>: Send + Sync {
47+
pub trait ColumnValues<T: PartialOrd = u64>: Send + Sync + DowncastSync {
4548
/// Return the value associated with the given idx.
4649
///
4750
/// This accessor should return as fast as possible.
@@ -51,6 +54,11 @@ pub trait ColumnValues<T: PartialOrd = u64>: Send + Sync {
5154
/// May panic if `idx` is greater than the column length.
5255
fn get_val(&self, idx: u32) -> T;
5356

57+
/// Return the value associated with the given idx as u64 representation.
58+
///
59+
/// This is useful as normalization layer between u128 and u64.
60+
// fn u64_to_t(&self, val: u64) -> T;
61+
5462
/// Allows to push down multiple fetch calls, to avoid dynamic dispatch overhead.
5563
///
5664
/// idx and output should have the same length
@@ -139,6 +147,7 @@ pub trait ColumnValues<T: PartialOrd = u64>: Send + Sync {
139147
Box::new((0..self.num_vals()).map(|idx| self.get_val(idx)))
140148
}
141149
}
150+
downcast_rs::impl_downcast!(sync ColumnValues<T> where T: PartialOrd);
142151

143152
/// Empty column of values.
144153
pub struct EmptyColumnValues;
@@ -161,7 +170,7 @@ impl<T: PartialOrd + Default> ColumnValues<T> for EmptyColumnValues {
161170
}
162171
}
163172

164-
impl<T: Copy + PartialOrd + Debug> ColumnValues<T> for Arc<dyn ColumnValues<T>> {
173+
impl<T: Copy + PartialOrd + Debug + 'static> ColumnValues<T> for Arc<dyn ColumnValues<T>> {
165174
#[inline(always)]
166175
fn get_val(&self, idx: u32) -> T {
167176
self.as_ref().get_val(idx)

columnar/src/column_values/monotonic_column.rs

+9-9
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,10 @@ pub fn monotonic_map_column<C, T, Input, Output>(
3131
monotonic_mapping: T,
3232
) -> impl ColumnValues<Output>
3333
where
34-
C: ColumnValues<Input>,
35-
T: StrictlyMonotonicFn<Input, Output> + Send + Sync,
36-
Input: PartialOrd + Debug + Send + Sync + Clone,
37-
Output: PartialOrd + Debug + Send + Sync + Clone,
34+
C: ColumnValues<Input> + 'static,
35+
T: StrictlyMonotonicFn<Input, Output> + Send + Sync + 'static,
36+
Input: PartialOrd + Debug + Send + Sync + Clone + 'static,
37+
Output: PartialOrd + Debug + Send + Sync + Clone + 'static,
3838
{
3939
MonotonicMappingColumn {
4040
from_column,
@@ -45,10 +45,10 @@ where
4545

4646
impl<C, T, Input, Output> ColumnValues<Output> for MonotonicMappingColumn<C, T, Input>
4747
where
48-
C: ColumnValues<Input>,
49-
T: StrictlyMonotonicFn<Input, Output> + Send + Sync,
50-
Input: PartialOrd + Send + Debug + Sync + Clone,
51-
Output: PartialOrd + Send + Debug + Sync + Clone,
48+
C: ColumnValues<Input> + 'static,
49+
T: StrictlyMonotonicFn<Input, Output> + Send + Sync + 'static,
50+
Input: PartialOrd + Send + Debug + Sync + Clone + 'static,
51+
Output: PartialOrd + Send + Debug + Sync + Clone + 'static,
5252
{
5353
#[inline(always)]
5454
fn get_val(&self, idx: u32) -> Output {
@@ -107,7 +107,7 @@ mod tests {
107107
#[test]
108108
fn test_monotonic_mapping_iter() {
109109
let vals: Vec<u64> = (0..100u64).map(|el| el * 10).collect();
110-
let col = VecColumn::from(&vals);
110+
let col = VecColumn::from(vals);
111111
let mapped = monotonic_map_column(
112112
col,
113113
StrictlyMonotonicMappingInverter::from(StrictlyMonotonicMappingToInternal::<i64>::new()),

columnar/src/column_values/u128_based/compact_space/mod.rs

+64-1
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,63 @@ impl BinarySerializable for IPCodecParams {
292292
}
293293
}
294294

295+
/// Exposes the compact space decompressor for u64 values.
296+
///
297+
/// This allows faster access to the values, as u64 is faster to work with than u128.
298+
/// Additionally it allows to handle u128 values like u64, via the `open_u64_lenient` as a common
299+
/// interface.
300+
///
301+
/// When converting from the internal u64 to u128 `compact_to_u128` can be used.
302+
pub struct CompactSpaceDecompressorU64(CompactSpaceDecompressor);
303+
impl CompactSpaceDecompressorU64 {
304+
pub(crate) fn open(data: OwnedBytes) -> io::Result<CompactSpaceDecompressorU64> {
305+
let decompressor = CompactSpaceDecompressorU64(CompactSpaceDecompressor::open(data)?);
306+
Ok(decompressor)
307+
}
308+
/// Convert a compact space value to u128
309+
pub fn compact_to_u128(&self, compact: u32) -> u128 {
310+
self.0.compact_to_u128(compact)
311+
}
312+
}
313+
314+
impl ColumnValues<u64> for CompactSpaceDecompressorU64 {
315+
#[inline]
316+
fn get_val(&self, doc: u32) -> u64 {
317+
let compact = self.0.get_compact(doc);
318+
compact as u64
319+
}
320+
321+
fn min_value(&self) -> u64 {
322+
self.0.u128_to_compact(self.0.min_value()).unwrap() as u64
323+
}
324+
325+
fn max_value(&self) -> u64 {
326+
self.0.u128_to_compact(self.0.max_value()).unwrap() as u64
327+
}
328+
329+
fn num_vals(&self) -> u32 {
330+
self.0.params.num_vals
331+
}
332+
333+
#[inline]
334+
fn iter(&self) -> Box<dyn Iterator<Item = u64> + '_> {
335+
Box::new(self.0.iter_compact().map(|el| el as u64))
336+
}
337+
338+
#[inline]
339+
fn get_row_ids_for_value_range(
340+
&self,
341+
value_range: RangeInclusive<u64>,
342+
position_range: Range<u32>,
343+
positions: &mut Vec<u32>,
344+
) {
345+
let value_range = self.0.compact_to_u128(*value_range.start() as u32)
346+
..=self.0.compact_to_u128(*value_range.end() as u32);
347+
self.0
348+
.get_row_ids_for_value_range(value_range, position_range, positions)
349+
}
350+
}
351+
295352
impl ColumnValues<u128> for CompactSpaceDecompressor {
296353
#[inline]
297354
fn get_val(&self, doc: u32) -> u128 {
@@ -403,8 +460,14 @@ impl CompactSpaceDecompressor {
403460
}
404461

405462
#[inline]
406-
pub fn get(&self, idx: u32) -> u128 {
463+
pub fn get_compact(&self, idx: u32) -> u32 {
407464
let compact = self.params.bit_unpacker.get(idx, &self.data) as u32;
465+
compact
466+
}
467+
468+
#[inline]
469+
pub fn get(&self, idx: u32) -> u128 {
470+
let compact = self.get_compact(idx);
408471
self.compact_to_u128(compact)
409472
}
410473

columnar/src/column_values/u128_based/mod.rs

+20-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ use std::sync::Arc;
66
mod compact_space;
77

88
use common::{BinarySerializable, OwnedBytes, VInt};
9-
use compact_space::{CompactSpaceCompressor, CompactSpaceDecompressor};
9+
pub use compact_space::{
10+
CompactSpaceCompressor, CompactSpaceDecompressor, CompactSpaceDecompressorU64,
11+
};
1012

1113
use crate::column_values::monotonic_map_column;
1214
use crate::column_values::monotonic_mapping::{
@@ -108,6 +110,23 @@ pub fn open_u128_mapped<T: MonotonicallyMappableToU128 + Debug>(
108110
StrictlyMonotonicMappingToInternal::<T>::new().into();
109111
Ok(Arc::new(monotonic_map_column(reader, inverted)))
110112
}
113+
114+
/// Returns the u64 representation of the u128 data.
115+
/// The internal representation of the data as u64 is useful for faster processing.
116+
///
117+
/// In order to convert to u128 back cast to `CompactSpaceDecompressorU64` and call
118+
/// `compact_to_u128`.
119+
///
120+
/// # Notice
121+
/// In case there are new codecs added, check for usages of `CompactSpaceDecompressorU64` and
122+
/// also handle the new codecs.
123+
pub fn open_u128_as_u64(mut bytes: OwnedBytes) -> io::Result<Arc<dyn ColumnValues<u64>>> {
124+
let header = U128Header::deserialize(&mut bytes)?;
125+
assert_eq!(header.codec_type, U128FastFieldCodecType::CompactSpace);
126+
let reader = CompactSpaceDecompressorU64::open(bytes)?;
127+
Ok(Arc::new(reader))
128+
}
129+
111130
#[cfg(test)]
112131
pub mod tests {
113132
use super::*;

columnar/src/column_values/u64_based/bitpacked.rs

-1
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@ impl ColumnValues for BitpackedReader {
6363
fn get_val(&self, doc: u32) -> u64 {
6464
self.stats.min_value + self.stats.gcd.get() * self.bit_unpacker.get(doc, &self.data)
6565
}
66-
6766
#[inline]
6867
fn min_value(&self) -> u64 {
6968
self.stats.min_value

columnar/src/column_values/u64_based/blockwise_linear.rs

+5-2
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,10 @@ impl BlockwiseLinearEstimator {
6363
if self.block.is_empty() {
6464
return;
6565
}
66-
let line = Line::train(&VecColumn::from(&self.block));
66+
let column = VecColumn::from(std::mem::take(&mut self.block));
67+
let line = Line::train(&column);
68+
self.block = column.into();
69+
6770
let mut max_value = 0u64;
6871
for (i, buffer_val) in self.block.iter().enumerate() {
6972
let interpolated_val = line.eval(i as u32);
@@ -125,7 +128,7 @@ impl ColumnCodecEstimator for BlockwiseLinearEstimator {
125128
*buffer_val = gcd_divider.divide(*buffer_val - stats.min_value);
126129
}
127130

128-
let line = Line::train(&VecColumn::from(&buffer));
131+
let line = Line::train(&VecColumn::from(buffer.to_vec()));
129132

130133
assert!(!buffer.is_empty());
131134

columnar/src/column_values/u64_based/line.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ mod tests {
184184
}
185185

186186
fn test_eval_max_err(ys: &[u64]) -> Option<u64> {
187-
let line = Line::train(&VecColumn::from(&ys));
187+
let line = Line::train(&VecColumn::from(ys.to_vec()));
188188
ys.iter()
189189
.enumerate()
190190
.map(|(x, y)| y.wrapping_sub(line.eval(x as u32)))

columnar/src/column_values/u64_based/linear.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,9 @@ impl LinearCodecEstimator {
173173
fn collect_before_line_estimation(&mut self, value: u64) {
174174
self.block.push(value);
175175
if self.block.len() == LINE_ESTIMATION_BLOCK_LEN {
176-
let line = Line::train(&VecColumn::from(&self.block));
176+
let column = VecColumn::from(std::mem::take(&mut self.block));
177+
let line = Line::train(&column);
178+
self.block = column.into();
177179
let block = std::mem::take(&mut self.block);
178180
for val in block {
179181
self.collect_after_line_estimation(&line, val);

columnar/src/column_values/vec_column.rs

+11-9
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,14 @@ use tantivy_bitpacker::minmax;
44

55
use crate::ColumnValues;
66

7-
/// VecColumn provides `Column` over a slice.
8-
pub struct VecColumn<'a, T = u64> {
9-
pub(crate) values: &'a [T],
7+
/// VecColumn provides `Column` over a `Vec<T>`.
8+
pub struct VecColumn<T = u64> {
9+
pub(crate) values: Vec<T>,
1010
pub(crate) min_value: T,
1111
pub(crate) max_value: T,
1212
}
1313

14-
impl<'a, T: Copy + PartialOrd + Send + Sync + Debug> ColumnValues<T> for VecColumn<'a, T> {
14+
impl<T: Copy + PartialOrd + Send + Sync + Debug + 'static> ColumnValues<T> for VecColumn<T> {
1515
fn get_val(&self, position: u32) -> T {
1616
self.values[position as usize]
1717
}
@@ -37,11 +37,8 @@ impl<'a, T: Copy + PartialOrd + Send + Sync + Debug> ColumnValues<T> for VecColu
3737
}
3838
}
3939

40-
impl<'a, T: Copy + PartialOrd + Default, V> From<&'a V> for VecColumn<'a, T>
41-
where V: AsRef<[T]> + ?Sized
42-
{
43-
fn from(values: &'a V) -> Self {
44-
let values = values.as_ref();
40+
impl<T: Copy + PartialOrd + Default> From<Vec<T>> for VecColumn<T> {
41+
fn from(values: Vec<T>) -> Self {
4542
let (min_value, max_value) = minmax(values.iter().copied()).unwrap_or_default();
4643
Self {
4744
values,
@@ -50,3 +47,8 @@ where V: AsRef<[T]> + ?Sized
5047
}
5148
}
5249
}
50+
impl From<VecColumn> for Vec<u64> {
51+
fn from(column: VecColumn) -> Self {
52+
column.values
53+
}
54+
}

columnar/src/columnar/writer/mod.rs

+3-11
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,7 @@ pub(crate) use serializer::ColumnarSerializer;
1313
use stacker::{Addr, ArenaHashMap, MemoryArena};
1414

1515
use crate::column_index::SerializableColumnIndex;
16-
use crate::column_values::{
17-
ColumnValues, MonotonicallyMappableToU128, MonotonicallyMappableToU64, VecColumn,
18-
};
16+
use crate::column_values::{MonotonicallyMappableToU128, MonotonicallyMappableToU64};
1917
use crate::columnar::column_type::ColumnType;
2018
use crate::columnar::writer::column_writers::{
2119
ColumnWriter, NumericalColumnWriter, StrOrBytesColumnWriter,
@@ -645,10 +643,7 @@ fn send_to_serialize_column_mappable_to_u128<
645643
value_index_builders: &mut PreallocatedIndexBuilders,
646644
values: &mut Vec<T>,
647645
mut wrt: impl io::Write,
648-
) -> io::Result<()>
649-
where
650-
for<'a> VecColumn<'a, T>: ColumnValues<T>,
651-
{
646+
) -> io::Result<()> {
652647
values.clear();
653648
// TODO: split index and values
654649
let serializable_column_index = match cardinality {
@@ -701,10 +696,7 @@ fn send_to_serialize_column_mappable_to_u64(
701696
value_index_builders: &mut PreallocatedIndexBuilders,
702697
values: &mut Vec<u64>,
703698
mut wrt: impl io::Write,
704-
) -> io::Result<()>
705-
where
706-
for<'a> VecColumn<'a, u64>: ColumnValues<u64>,
707-
{
699+
) -> io::Result<()> {
708700
values.clear();
709701
let serializable_column_index = match cardinality {
710702
Cardinality::Full => {

0 commit comments

Comments
 (0)