Skip to content

Commit 033e870

Browse files
committed
Add support for multivalues
1 parent a86b104 commit 033e870

15 files changed

+123
-120
lines changed

columnar/src/column/dictionary_encoded.rs

+15-11
Original file line numberDiff line numberDiff line change
@@ -8,24 +8,35 @@ use crate::column::Column;
88
use crate::RowId;
99

1010
/// Dictionary encoded column.
11+
///
12+
/// The column simply gives access to a regular u64-column that, in
13+
/// which the values are term-ordinals.
14+
///
15+
/// These ordinals are ids uniquely identify the bytes that are stored in
16+
/// the column. These ordinals are small, and sorted in the same order
17+
/// as the term_ord_column.
1118
#[derive(Clone)]
1219
pub struct BytesColumn {
1320
pub(crate) dictionary: Arc<Dictionary<VoidSSTable>>,
1421
pub(crate) term_ord_column: Column<u64>,
1522
}
1623

1724
impl BytesColumn {
25+
/// Fills the given `output` buffer with the term associated to the ordinal `ord`.
26+
///
1827
/// Returns `false` if the term does not exist (e.g. `term_ord` is greater or equal to the
1928
/// overll number of terms).
20-
pub fn ord_to_bytes(&self, term_ord: u64, output: &mut Vec<u8>) -> io::Result<bool> {
21-
self.dictionary.ord_to_term(term_ord, output)
29+
pub fn ord_to_bytes(&self, ord: u64, output: &mut Vec<u8>) -> io::Result<bool> {
30+
self.dictionary.ord_to_term(ord, output)
2231
}
2332

33+
/// Returns the number of rows in the column.
2434
pub fn num_rows(&self) -> RowId {
2535
self.term_ord_column.num_rows()
2636
}
2737

28-
pub fn term_ords(&self) -> &Column<u64> {
38+
/// Returns the column of ordinals
39+
pub fn ords(&self) -> &Column<u64> {
2940
&self.term_ord_column
3041
}
3142
}
@@ -40,6 +51,7 @@ impl From<BytesColumn> for StrColumn {
4051
}
4152

4253
impl StrColumn {
54+
/// Fills the buffer
4355
pub fn ord_to_str(&self, term_ord: u64, output: &mut String) -> io::Result<bool> {
4456
unsafe {
4557
let buf = output.as_mut_vec();
@@ -55,14 +67,6 @@ impl StrColumn {
5567
}
5668
Ok(true)
5769
}
58-
59-
pub fn num_rows(&self) -> RowId {
60-
self.term_ord_column.num_rows()
61-
}
62-
63-
pub fn ordinal_dictionary(&self) -> &Column<u64> {
64-
&self.0.term_ord_column
65-
}
6670
}
6771

6872
impl Deref for StrColumn {

columnar/src/column/mod.rs

+20-12
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,11 @@ use std::ops::Deref;
55
use std::sync::Arc;
66

77
use common::BinarySerializable;
8+
pub use dictionary_encoded::{BytesColumn, StrColumn};
89
pub use serialize::{
9-
open_column_bytes, open_column_u128, open_column_u64, serialize_column_u128,
10+
open_column_bytes, open_column_u128, open_column_u64, serialize_column_mappable_to_u128,
1011
serialize_column_u64,
1112
};
12-
pub use dictionary_encoded::{BytesColumn, StrColumn};
1313

1414
use crate::column_index::ColumnIndex;
1515
use crate::column_values::ColumnValues;
@@ -21,23 +21,31 @@ pub struct Column<T> {
2121
pub values: Arc<dyn ColumnValues<T>>,
2222
}
2323

24-
use crate::column_index::Set;
25-
2624
impl<T: PartialOrd> Column<T> {
27-
pub fn first(&self, row_id: RowId) -> Option<T> {
25+
pub fn num_rows(&self) -> RowId {
2826
match &self.idx {
29-
ColumnIndex::Full => Some(self.values.get_val(row_id)),
30-
ColumnIndex::Optional(opt_idx) => {
31-
let value_row_idx = opt_idx.rank_if_exists(row_id)?;
32-
Some(self.values.get_val(value_row_idx))
33-
}
34-
ColumnIndex::Multivalued(_multivalued_index) => {
35-
todo!();
27+
ColumnIndex::Full => self.values.num_vals() as u32,
28+
ColumnIndex::Optional(optional_index) => optional_index.num_rows(),
29+
ColumnIndex::Multivalued(col_index) => {
30+
// The multivalued index contains all value start row_id,
31+
// and one extra value at the end with the overall number of rows.
32+
col_index.num_vals() - 1
3633
}
3734
}
3835
}
3936
}
4037

38+
impl<T: PartialOrd> Column<T> {
39+
pub fn first(&self, row_id: RowId) -> Option<T> {
40+
self.values(row_id).next()
41+
}
42+
43+
pub fn values(&self, row_id: RowId) -> impl Iterator<Item = T> + '_ {
44+
self.value_row_ids(row_id)
45+
.map(|value_row_id: RowId| self.values.get_val(value_row_id))
46+
}
47+
}
48+
4149
impl<T> Deref for Column<T> {
4250
type Target = ColumnIndex<'static>;
4351

columnar/src/column/serialize.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,10 @@ use crate::column_index::{serialize_column_index, SerializableColumnIndex};
1010
use crate::column_values::serialize::serialize_column_values_u128;
1111
use crate::column_values::{
1212
serialize_column_values, ColumnValues, MonotonicallyMappableToU128, MonotonicallyMappableToU64,
13-
ALL_CODEC_TYPES,
13+
FastFieldCodecType,
1414
};
1515

16-
pub fn serialize_column_u128<
16+
pub fn serialize_column_mappable_to_u128<
1717
F: Fn() -> I,
1818
I: Iterator<Item = T>,
1919
T: MonotonicallyMappableToU128,
@@ -39,7 +39,7 @@ pub fn serialize_column_u64<T: MonotonicallyMappableToU64>(
3939
output: &mut impl Write,
4040
) -> io::Result<()> {
4141
let column_index_num_bytes = serialize_column_index(column_index, output)?;
42-
serialize_column_values(column_values, &ALL_CODEC_TYPES[..], output)?;
42+
serialize_column_values(column_values, &[FastFieldCodecType::Bitpacked, FastFieldCodecType::BlockwiseLinear], output)?;
4343
output.write_all(&column_index_num_bytes.to_le_bytes())?;
4444
Ok(())
4545
}

columnar/src/column_index/mod.rs

+20-7
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ mod multivalued_index;
22
mod optional_index;
33
mod serialize;
44

5+
use std::ops::Range;
56
use std::sync::Arc;
67

78
pub use optional_index::{OptionalIndex, SerializableOptionalIndex, Set};
@@ -14,8 +15,12 @@ use crate::{Cardinality, RowId};
1415
pub enum ColumnIndex<'a> {
1516
Full,
1617
Optional(OptionalIndex),
17-
// TODO remove the Arc<dyn> apart from serialization this is not
18-
// dynamic at all.
18+
// TODO Remove the static by fixing the codec if possible.
19+
/// The column values enclosed contains for all row_id,
20+
/// the value start_index.
21+
///
22+
/// In addition, at index num_rows, an extra value is added
23+
/// containing the overal number of values.
1924
Multivalued(Arc<dyn ColumnValues<RowId> + 'a>),
2025
}
2126

@@ -28,13 +33,21 @@ impl<'a> ColumnIndex<'a> {
2833
}
2934
}
3035

31-
pub fn num_rows(&self) -> RowId {
36+
pub fn value_row_ids(&self, row_id: RowId) -> Range<RowId> {
3237
match self {
33-
ColumnIndex::Full => {
34-
todo!()
38+
ColumnIndex::Full => row_id..row_id + 1,
39+
ColumnIndex::Optional(optional_index) => {
40+
if let Some(val) = optional_index.rank_if_exists(row_id) {
41+
val..val + 1
42+
} else {
43+
0..0
44+
}
45+
}
46+
ColumnIndex::Multivalued(multivalued_index) => {
47+
let start = multivalued_index.get_val(row_id);
48+
let end = multivalued_index.get_val(row_id + 1);
49+
start..end
3550
}
36-
ColumnIndex::Optional(optional_index) => optional_index.num_rows(),
37-
ColumnIndex::Multivalued(multivalued_index) => multivalued_index.num_vals() - 1,
3851
}
3952
}
4053
}

columnar/src/column_index/multivalued_index.rs

+5-3
Original file line numberDiff line numberDiff line change
@@ -11,17 +11,19 @@ use crate::RowId;
1111
pub struct MultivaluedIndex(Arc<dyn ColumnValues<RowId>>);
1212

1313
pub fn serialize_multivalued_index(
14-
multivalued_index: MultivaluedIndex,
14+
multivalued_index: &dyn ColumnValues<RowId>,
1515
output: &mut impl Write,
1616
) -> io::Result<()> {
1717
crate::column_values::serialize_column_values(
18-
&*multivalued_index.0,
18+
&*multivalued_index,
1919
&[FastFieldCodecType::Bitpacked, FastFieldCodecType::Linear],
2020
output,
2121
)?;
2222
Ok(())
2323
}
2424

2525
pub fn open_multivalued_index(bytes: OwnedBytes) -> io::Result<Arc<dyn ColumnValues<RowId>>> {
26-
todo!();
26+
let start_index_column: Arc<dyn ColumnValues<RowId>> =
27+
crate::column_values::open_u64_mapped(bytes)?;
28+
Ok(start_index_column)
2729
}

columnar/src/column_index/serialize.rs

+5-4
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,18 @@ use std::io::Write;
33

44
use common::{CountingWriter, OwnedBytes};
55

6-
use crate::column_index::multivalued_index::{serialize_multivalued_index, MultivaluedIndex};
6+
use crate::column_index::multivalued_index::serialize_multivalued_index;
77
use crate::column_index::optional_index::serialize_optional_index;
88
use crate::column_index::{ColumnIndex, SerializableOptionalIndex};
9-
use crate::Cardinality;
9+
use crate::column_values::ColumnValues;
10+
use crate::{Cardinality, RowId};
1011

1112
pub enum SerializableColumnIndex<'a> {
1213
Full,
1314
Optional(Box<dyn SerializableOptionalIndex<'a> + 'a>),
1415
// TODO remove the Arc<dyn> apart from serialization this is not
1516
// dynamic at all.
16-
Multivalued(MultivaluedIndex),
17+
Multivalued(Box<dyn ColumnValues<RowId> + 'a>),
1718
}
1819

1920
impl<'a> SerializableColumnIndex<'a> {
@@ -39,7 +40,7 @@ pub fn serialize_column_index(
3940
serialize_optional_index(&*optional_index, &mut output)?
4041
}
4142
SerializableColumnIndex::Multivalued(multivalued_index) => {
42-
serialize_multivalued_index(multivalued_index, &mut output)?
43+
serialize_multivalued_index(&*multivalued_index, &mut output)?
4344
}
4445
}
4546
let column_index_num_bytes = output.written_bytes() as u32;

columnar/src/column_values/mod.rs

+3-16
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,9 @@ pub mod serialize;
3838
pub use self::column::{monotonic_map_column, ColumnValues, IterColumn, VecColumn};
3939
pub use self::monotonic_mapping::{MonotonicallyMappableToU64, StrictlyMonotonicFn};
4040
pub use self::monotonic_mapping_u128::MonotonicallyMappableToU128;
41-
pub use self::serialize::{serialize_and_load, serialize_column_values, NormalizedHeader};
41+
pub use self::serialize::{serialize_column_values, NormalizedHeader};
42+
#[cfg(test)]
43+
pub use self::serialize::serialize_and_load;
4244
use crate::column_values::bitpacked::BitpackedCodec;
4345
use crate::column_values::blockwise_linear::BlockwiseLinearCodec;
4446
use crate::column_values::linear::LinearCodec;
@@ -121,21 +123,6 @@ impl U128FastFieldCodecType {
121123
}
122124
}
123125

124-
/// Returns the correct codec reader wrapped in the `Arc` for the data.
125-
// pub fn open_u128<Item: MonotonicallyMappableToU128>(
126-
// bytes: OwnedBytes,
127-
// ) -> io::Result<Arc<dyn Column<Item>>> {
128-
// todo!();
129-
// // let (bytes, _format_version) = read_format_version(bytes)?;
130-
// // let (mut bytes, _null_index_footer) = read_null_index_footer(bytes)?;
131-
// // let header = U128Header::deserialize(&mut bytes)?;
132-
// // assert_eq!(header.codec_type, U128FastFieldCodecType::CompactSpace);
133-
// // let reader = CompactSpaceDecompressor::open(bytes)?;
134-
// // let inverted: StrictlyMonotonicMappingInverter<StrictlyMonotonicMappingToInternal<Item>> =
135-
// // StrictlyMonotonicMappingToInternal::<Item>::new().into();
136-
// // Ok(Arc::new(monotonic_map_column(reader, inverted)))
137-
// }
138-
139126
/// Returns the correct codec reader wrapped in the `Arc` for the data.
140127
pub fn open_u128_mapped<T: MonotonicallyMappableToU128>(
141128
mut bytes: OwnedBytes,

columnar/src/column_values/serialize.rs

-23
Original file line numberDiff line numberDiff line change
@@ -161,28 +161,6 @@ impl BinarySerializable for Header {
161161
}
162162
}
163163

164-
/// Return estimated compression for given codec in the value range [0.0..1.0], where 1.0 means no
165-
/// compression.
166-
pub(crate) fn estimate<T: MonotonicallyMappableToU64>(
167-
typed_column: impl ColumnValues<T>,
168-
codec_type: FastFieldCodecType,
169-
) -> Option<f32> {
170-
let column = monotonic_map_column(typed_column, StrictlyMonotonicMappingToInternal::<T>::new());
171-
let min_value = column.min_value();
172-
let gcd = super::gcd::find_gcd(column.iter().map(|val| val - min_value))
173-
.filter(|gcd| gcd.get() > 1u64);
174-
let mapping = StrictlyMonotonicMappingToInternalGCDBaseval::new(
175-
gcd.map(|gcd| gcd.get()).unwrap_or(1u64),
176-
min_value,
177-
);
178-
let normalized_column = monotonic_map_column(&column, mapping);
179-
match codec_type {
180-
FastFieldCodecType::Bitpacked => BitpackedCodec::estimate(&normalized_column),
181-
FastFieldCodecType::Linear => LinearCodec::estimate(&normalized_column),
182-
FastFieldCodecType::BlockwiseLinear => BlockwiseLinearCodec::estimate(&normalized_column),
183-
}
184-
}
185-
186164
/// Serializes u128 values with the compact space codec.
187165
pub fn serialize_column_values_u128<F: Fn() -> I, I: Iterator<Item = u128>>(
188166
iter_gen: F,
@@ -194,7 +172,6 @@ pub fn serialize_column_values_u128<F: Fn() -> I, I: Iterator<Item = u128>>(
194172
codec_type: U128FastFieldCodecType::CompactSpace,
195173
};
196174
header.serialize(output)?;
197-
198175
let compressor = CompactSpaceCompressor::train_from(iter_gen(), num_vals);
199176
compressor.compress_into(iter_gen(), output)?;
200177

columnar/src/columnar/merge.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@ pub enum MergeDocOrder {
1515
Complex(()),
1616
}
1717

18-
pub fn merge(
19-
columnar_readers: &[ColumnarReader],
18+
pub fn merge_columnar(
19+
_columnar_readers: &[ColumnarReader],
2020
mapping: MergeDocOrder,
21-
output: &mut impl io::Write,
21+
_output: &mut impl io::Write,
2222
) -> io::Result<()> {
2323
match mapping {
2424
MergeDocOrder::Stack => {

columnar/src/columnar/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,6 @@ mod reader;
55
mod writer;
66

77
pub use column_type::ColumnType;
8+
pub use merge::{merge_columnar, MergeDocOrder};
89
pub use reader::ColumnarReader;
910
pub use writer::ColumnarWriter;

0 commit comments

Comments
 (0)