Skip to content

Commit 162041a

Browse files
committed
Add support for multivalues
1 parent a86b104 commit 162041a

File tree

14 files changed

+118
-94
lines changed

14 files changed

+118
-94
lines changed

columnar/src/column/dictionary_encoded.rs

+15-11
Original file line numberDiff line numberDiff line change
@@ -8,24 +8,35 @@ use crate::column::Column;
88
use crate::RowId;
99

1010
/// Dictionary encoded column.
11+
///
12+
/// The column simply gives access to a regular u64-column that, in
13+
/// which the values are term-ordinals.
14+
///
15+
/// These ordinals are ids uniquely identify the bytes that are stored in
16+
/// the column. These ordinals are small, and sorted in the same order
17+
/// as the term_ord_column.
1118
#[derive(Clone)]
1219
pub struct BytesColumn {
1320
pub(crate) dictionary: Arc<Dictionary<VoidSSTable>>,
1421
pub(crate) term_ord_column: Column<u64>,
1522
}
1623

1724
impl BytesColumn {
25+
/// Fills the given `output` buffer with the term associated to the ordinal `ord`.
26+
///
1827
/// Returns `false` if the term does not exist (e.g. `term_ord` is greater or equal to the
1928
/// overll number of terms).
20-
pub fn ord_to_bytes(&self, term_ord: u64, output: &mut Vec<u8>) -> io::Result<bool> {
21-
self.dictionary.ord_to_term(term_ord, output)
29+
pub fn ord_to_bytes(&self, ord: u64, output: &mut Vec<u8>) -> io::Result<bool> {
30+
self.dictionary.ord_to_term(ord, output)
2231
}
2332

33+
/// Returns the number of rows in the column.
2434
pub fn num_rows(&self) -> RowId {
2535
self.term_ord_column.num_rows()
2636
}
2737

28-
pub fn term_ords(&self) -> &Column<u64> {
38+
/// Returns the column of ordinals
39+
pub fn ords(&self) -> &Column<u64> {
2940
&self.term_ord_column
3041
}
3142
}
@@ -40,6 +51,7 @@ impl From<BytesColumn> for StrColumn {
4051
}
4152

4253
impl StrColumn {
54+
/// Fills the buffer
4355
pub fn ord_to_str(&self, term_ord: u64, output: &mut String) -> io::Result<bool> {
4456
unsafe {
4557
let buf = output.as_mut_vec();
@@ -55,14 +67,6 @@ impl StrColumn {
5567
}
5668
Ok(true)
5769
}
58-
59-
pub fn num_rows(&self) -> RowId {
60-
self.term_ord_column.num_rows()
61-
}
62-
63-
pub fn ordinal_dictionary(&self) -> &Column<u64> {
64-
&self.0.term_ord_column
65-
}
6670
}
6771

6872
impl Deref for StrColumn {

columnar/src/column/mod.rs

+20-12
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,11 @@ use std::ops::Deref;
55
use std::sync::Arc;
66

77
use common::BinarySerializable;
8+
pub use dictionary_encoded::{BytesColumn, StrColumn};
89
pub use serialize::{
9-
open_column_bytes, open_column_u128, open_column_u64, serialize_column_u128,
10+
open_column_bytes, open_column_u128, open_column_u64, serialize_column_mappable_to_u128,
1011
serialize_column_u64,
1112
};
12-
pub use dictionary_encoded::{BytesColumn, StrColumn};
1313

1414
use crate::column_index::ColumnIndex;
1515
use crate::column_values::ColumnValues;
@@ -21,23 +21,31 @@ pub struct Column<T> {
2121
pub values: Arc<dyn ColumnValues<T>>,
2222
}
2323

24-
use crate::column_index::Set;
25-
2624
impl<T: PartialOrd> Column<T> {
27-
pub fn first(&self, row_id: RowId) -> Option<T> {
25+
pub fn num_rows(&self) -> RowId {
2826
match &self.idx {
29-
ColumnIndex::Full => Some(self.values.get_val(row_id)),
30-
ColumnIndex::Optional(opt_idx) => {
31-
let value_row_idx = opt_idx.rank_if_exists(row_id)?;
32-
Some(self.values.get_val(value_row_idx))
33-
}
34-
ColumnIndex::Multivalued(_multivalued_index) => {
35-
todo!();
27+
ColumnIndex::Full => self.values.num_vals() as u32,
28+
ColumnIndex::Optional(optional_index) => optional_index.num_rows(),
29+
ColumnIndex::Multivalued(col_index) => {
30+
// The multivalued index contains all value start row_id,
31+
// and one extra value at the end with the overall number of rows.
32+
col_index.num_vals() - 1
3633
}
3734
}
3835
}
3936
}
4037

38+
impl<T: PartialOrd> Column<T> {
39+
pub fn first(&self, row_id: RowId) -> Option<T> {
40+
self.values(row_id).next()
41+
}
42+
43+
pub fn values(&self, row_id: RowId) -> impl Iterator<Item = T> + '_ {
44+
self.value_row_ids(row_id)
45+
.map(|value_row_id: RowId| self.values.get_val(value_row_id))
46+
}
47+
}
48+
4149
impl<T> Deref for Column<T> {
4250
type Target = ColumnIndex<'static>;
4351

columnar/src/column/serialize.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use crate::column_values::{
1313
ALL_CODEC_TYPES,
1414
};
1515

16-
pub fn serialize_column_u128<
16+
pub fn serialize_column_mappable_to_u128<
1717
F: Fn() -> I,
1818
I: Iterator<Item = T>,
1919
T: MonotonicallyMappableToU128,

columnar/src/column_index/mod.rs

+20-7
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ mod multivalued_index;
22
mod optional_index;
33
mod serialize;
44

5+
use std::ops::Range;
56
use std::sync::Arc;
67

78
pub use optional_index::{OptionalIndex, SerializableOptionalIndex, Set};
@@ -14,8 +15,12 @@ use crate::{Cardinality, RowId};
1415
pub enum ColumnIndex<'a> {
1516
Full,
1617
Optional(OptionalIndex),
17-
// TODO remove the Arc<dyn> apart from serialization this is not
18-
// dynamic at all.
18+
// TODO Remove the static by fixing the codec if possible.
19+
/// The column values enclosed contains for all row_id,
20+
/// the value start_index.
21+
///
22+
/// In addition, at index num_rows, an extra value is added
23+
/// containing the overal number of values.
1924
Multivalued(Arc<dyn ColumnValues<RowId> + 'a>),
2025
}
2126

@@ -28,13 +33,21 @@ impl<'a> ColumnIndex<'a> {
2833
}
2934
}
3035

31-
pub fn num_rows(&self) -> RowId {
36+
pub fn value_row_ids(&self, row_id: RowId) -> Range<RowId> {
3237
match self {
33-
ColumnIndex::Full => {
34-
todo!()
38+
ColumnIndex::Full => row_id..row_id + 1,
39+
ColumnIndex::Optional(optional_index) => {
40+
if let Some(val) = optional_index.rank_if_exists(row_id) {
41+
val..val + 1
42+
} else {
43+
0..0
44+
}
45+
}
46+
ColumnIndex::Multivalued(multivalued_index) => {
47+
let start = multivalued_index.get_val(row_id);
48+
let end = multivalued_index.get_val(row_id + 1);
49+
start..end
3550
}
36-
ColumnIndex::Optional(optional_index) => optional_index.num_rows(),
37-
ColumnIndex::Multivalued(multivalued_index) => multivalued_index.num_vals() - 1,
3851
}
3952
}
4053
}

columnar/src/column_index/multivalued_index.rs

+5-3
Original file line numberDiff line numberDiff line change
@@ -11,17 +11,19 @@ use crate::RowId;
1111
pub struct MultivaluedIndex(Arc<dyn ColumnValues<RowId>>);
1212

1313
pub fn serialize_multivalued_index(
14-
multivalued_index: MultivaluedIndex,
14+
multivalued_index: &dyn ColumnValues<RowId>,
1515
output: &mut impl Write,
1616
) -> io::Result<()> {
1717
crate::column_values::serialize_column_values(
18-
&*multivalued_index.0,
18+
&*multivalued_index,
1919
&[FastFieldCodecType::Bitpacked, FastFieldCodecType::Linear],
2020
output,
2121
)?;
2222
Ok(())
2323
}
2424

2525
pub fn open_multivalued_index(bytes: OwnedBytes) -> io::Result<Arc<dyn ColumnValues<RowId>>> {
26-
todo!();
26+
let start_index_column: Arc<dyn ColumnValues<RowId>> =
27+
crate::column_values::open_u64_mapped(bytes)?;
28+
Ok(start_index_column)
2729
}

columnar/src/column_index/serialize.rs

+5-4
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,18 @@ use std::io::Write;
33

44
use common::{CountingWriter, OwnedBytes};
55

6-
use crate::column_index::multivalued_index::{serialize_multivalued_index, MultivaluedIndex};
6+
use crate::column_index::multivalued_index::serialize_multivalued_index;
77
use crate::column_index::optional_index::serialize_optional_index;
88
use crate::column_index::{ColumnIndex, SerializableOptionalIndex};
9-
use crate::Cardinality;
9+
use crate::column_values::ColumnValues;
10+
use crate::{Cardinality, RowId};
1011

1112
pub enum SerializableColumnIndex<'a> {
1213
Full,
1314
Optional(Box<dyn SerializableOptionalIndex<'a> + 'a>),
1415
// TODO remove the Arc<dyn> apart from serialization this is not
1516
// dynamic at all.
16-
Multivalued(MultivaluedIndex),
17+
Multivalued(Box<dyn ColumnValues<RowId> + 'a>),
1718
}
1819

1920
impl<'a> SerializableColumnIndex<'a> {
@@ -39,7 +40,7 @@ pub fn serialize_column_index(
3940
serialize_optional_index(&*optional_index, &mut output)?
4041
}
4142
SerializableColumnIndex::Multivalued(multivalued_index) => {
42-
serialize_multivalued_index(multivalued_index, &mut output)?
43+
serialize_multivalued_index(&*multivalued_index, &mut output)?
4344
}
4445
}
4546
let column_index_num_bytes = output.written_bytes() as u32;

columnar/src/column_values/mod.rs

-15
Original file line numberDiff line numberDiff line change
@@ -121,21 +121,6 @@ impl U128FastFieldCodecType {
121121
}
122122
}
123123

124-
/// Returns the correct codec reader wrapped in the `Arc` for the data.
125-
// pub fn open_u128<Item: MonotonicallyMappableToU128>(
126-
// bytes: OwnedBytes,
127-
// ) -> io::Result<Arc<dyn Column<Item>>> {
128-
// todo!();
129-
// // let (bytes, _format_version) = read_format_version(bytes)?;
130-
// // let (mut bytes, _null_index_footer) = read_null_index_footer(bytes)?;
131-
// // let header = U128Header::deserialize(&mut bytes)?;
132-
// // assert_eq!(header.codec_type, U128FastFieldCodecType::CompactSpace);
133-
// // let reader = CompactSpaceDecompressor::open(bytes)?;
134-
// // let inverted: StrictlyMonotonicMappingInverter<StrictlyMonotonicMappingToInternal<Item>> =
135-
// // StrictlyMonotonicMappingToInternal::<Item>::new().into();
136-
// // Ok(Arc::new(monotonic_map_column(reader, inverted)))
137-
// }
138-
139124
/// Returns the correct codec reader wrapped in the `Arc` for the data.
140125
pub fn open_u128_mapped<T: MonotonicallyMappableToU128>(
141126
mut bytes: OwnedBytes,

columnar/src/columnar/merge.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@ pub enum MergeDocOrder {
1515
Complex(()),
1616
}
1717

18-
pub fn merge(
19-
columnar_readers: &[ColumnarReader],
18+
pub fn merge_columnar(
19+
_columnar_readers: &[ColumnarReader],
2020
mapping: MergeDocOrder,
21-
output: &mut impl io::Write,
21+
_output: &mut impl io::Write,
2222
) -> io::Result<()> {
2323
match mapping {
2424
MergeDocOrder::Stack => {

columnar/src/columnar/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,6 @@ mod reader;
55
mod writer;
66

77
pub use column_type::ColumnType;
8+
pub use merge::{merge_columnar, MergeDocOrder};
89
pub use reader::ColumnarReader;
910
pub use writer::ColumnarWriter;

columnar/src/columnar/writer/mod.rs

+12-14
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use crate::columnar::writer::column_writers::{
2222
use crate::columnar::writer::value_index::{IndexBuilder, PreallocatedIndexBuilders};
2323
use crate::dictionary::{DictionaryBuilder, TermIdMapping, UnorderedId};
2424
use crate::value::{Coerce, NumericalType, NumericalValue};
25-
use crate::{column, Cardinality, RowId};
25+
use crate::{Cardinality, RowId};
2626

2727
/// This is a set of buffers that are used to temporarily write the values into before passing them
2828
/// to the fast field codecs.
@@ -310,7 +310,7 @@ fn serialize_bytes_or_str_column(
310310
ColumnOperation::NewDoc(doc) => ColumnOperation::NewDoc(doc),
311311
}
312312
});
313-
serialize_column(
313+
serialize_column_mappable_to_u64(
314314
operation_iterator,
315315
cardinality,
316316
num_docs,
@@ -339,7 +339,7 @@ fn serialize_numerical_column(
339339
} = buffers;
340340
match numerical_type {
341341
NumericalType::I64 => {
342-
serialize_column(
342+
serialize_column_mappable_to_u64(
343343
coerce_numerical_symbol::<i64>(op_iterator),
344344
cardinality,
345345
num_docs,
@@ -349,7 +349,7 @@ fn serialize_numerical_column(
349349
)?;
350350
}
351351
NumericalType::U64 => {
352-
serialize_column(
352+
serialize_column_mappable_to_u64(
353353
coerce_numerical_symbol::<u64>(op_iterator),
354354
cardinality,
355355
num_docs,
@@ -359,7 +359,7 @@ fn serialize_numerical_column(
359359
)?;
360360
}
361361
NumericalType::F64 => {
362-
serialize_column(
362+
serialize_column_mappable_to_u64(
363363
coerce_numerical_symbol::<f64>(op_iterator),
364364
cardinality,
365365
num_docs,
@@ -384,7 +384,7 @@ fn serialize_bool_column(
384384
bool_values,
385385
..
386386
} = buffers;
387-
serialize_column(
387+
serialize_column_mappable_to_u64(
388388
column_operations_it,
389389
cardinality,
390390
num_docs,
@@ -451,12 +451,11 @@ where
451451
Cardinality::Multivalued => {
452452
let multivalued_index_builder = value_index_builders.borrow_multivalued_index_builder();
453453
consume_operation_iterator(op_iterator, multivalued_index_builder, values);
454-
let _multivalued_index = multivalued_index_builder.finish(num_docs);
455-
todo!();
456-
// SerializableColumnIndex::Multivalued(Box::new(multivalued_index))
454+
let multivalued_index = multivalued_index_builder.finish(num_docs);
455+
SerializableColumnIndex::Multivalued(Box::new(multivalued_index))
457456
}
458457
};
459-
crate::column::serialize_column_u128(
458+
crate::column::serialize_column_mappable_to_u128(
460459
serializable_column_index,
461460
|| values.iter().cloned(),
462461
values.len() as u32,
@@ -465,7 +464,7 @@ where
465464
Ok(())
466465
}
467466

468-
fn serialize_column<
467+
fn serialize_column_mappable_to_u64<
469468
T: Copy + Default + std::fmt::Debug + Send + Sync + MonotonicallyMappableToU64 + PartialOrd,
470469
>(
471470
op_iterator: impl Iterator<Item = ColumnOperation<T>>,
@@ -497,9 +496,8 @@ where
497496
Cardinality::Multivalued => {
498497
let multivalued_index_builder = value_index_builders.borrow_multivalued_index_builder();
499498
consume_operation_iterator(op_iterator, multivalued_index_builder, values);
500-
let _multivalued_index = multivalued_index_builder.finish(num_docs);
501-
todo!();
502-
// SerializableColumnIndex::Multivalued(Box::new(multivalued_index))
499+
let multivalued_index = multivalued_index_builder.finish(num_docs);
500+
SerializableColumnIndex::Multivalued(Box::new(multivalued_index))
503501
}
504502
};
505503
crate::column::serialize_column_u64(

columnar/src/columnar/writer/serializer.rs

-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ use common::CountingWriter;
55
use sstable::value::RangeValueWriter;
66
use sstable::RangeSSTable;
77

8-
use crate::column;
98
use crate::columnar::ColumnType;
109

1110
pub struct ColumnarSerializer<W: io::Write> {

0 commit comments

Comments
 (0)