Skip to content

Commit 59460c7

Browse files
authored
delayed column opening during merge (#2132)
* lazy columnar merge This is the first part of addressing #3633 Instead of loading all Column into memory for the merge, only the current column_name group is loaded. This can be done since the sstable streams the columns lexicographically. * refactor * add rustdoc * replace iterator with BTreeMap
1 parent 756156b commit 59460c7

File tree

7 files changed

+279
-159
lines changed

7 files changed

+279
-159
lines changed

columnar/src/columnar/merge/mod.rs

+104-74
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ mod merge_dict_column;
22
mod merge_mapping;
33
mod term_merger;
44

5-
use std::collections::{BTreeMap, HashMap, HashSet};
5+
use std::collections::{BTreeMap, HashSet};
66
use std::io;
77
use std::net::Ipv6Addr;
88
use std::sync::Arc;
@@ -18,7 +18,8 @@ use crate::columnar::writer::CompatibleNumericalTypes;
1818
use crate::columnar::ColumnarReader;
1919
use crate::dynamic_column::DynamicColumn;
2020
use crate::{
21-
BytesColumn, Column, ColumnIndex, ColumnType, ColumnValues, NumericalType, NumericalValue,
21+
BytesColumn, Column, ColumnIndex, ColumnType, ColumnValues, DynamicColumnHandle, NumericalType,
22+
NumericalValue,
2223
};
2324

2425
/// Column types are grouped into different categories.
@@ -28,14 +29,16 @@ use crate::{
2829
/// In practise, today, only Numerical colummns are coerced into one type today.
2930
///
3031
/// See also [README.md].
31-
#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
32+
///
33+
/// The ordering has to match the ordering of the variants in [ColumnType].
34+
#[derive(Copy, Clone, Eq, PartialOrd, Ord, PartialEq, Hash, Debug)]
3235
pub(crate) enum ColumnTypeCategory {
33-
Bool,
34-
Str,
3536
Numerical,
36-
DateTime,
3737
Bytes,
38+
Str,
39+
Bool,
3840
IpAddr,
41+
DateTime,
3942
}
4043

4144
impl From<ColumnType> for ColumnTypeCategory {
@@ -83,9 +86,20 @@ pub fn merge_columnar(
8386
.iter()
8487
.map(|reader| reader.num_rows())
8588
.collect::<Vec<u32>>();
89+
8690
let columns_to_merge =
8791
group_columns_for_merge(columnar_readers, required_columns, &merge_row_order)?;
88-
for ((column_name, column_type), columns) in columns_to_merge {
92+
for res in columns_to_merge {
93+
let ((column_name, _column_type_category), grouped_columns) = res;
94+
let grouped_columns = grouped_columns.open(&merge_row_order)?;
95+
if grouped_columns.is_empty() {
96+
continue;
97+
}
98+
99+
let column_type = grouped_columns.column_type_after_merge();
100+
let mut columns = grouped_columns.columns;
101+
coerce_columns(column_type, &mut columns)?;
102+
89103
let mut column_serializer =
90104
serializer.start_serialize_column(column_name.as_bytes(), column_type);
91105
merge_column(
@@ -97,6 +111,7 @@ pub fn merge_columnar(
97111
)?;
98112
column_serializer.finalize()?;
99113
}
114+
100115
serializer.finalize(merge_row_order.num_rows())?;
101116
Ok(())
102117
}
@@ -210,20 +225,80 @@ fn merge_column(
210225
struct GroupedColumns {
211226
required_column_type: Option<ColumnType>,
212227
columns: Vec<Option<DynamicColumn>>,
213-
column_category: ColumnTypeCategory,
214228
}
215229

216230
impl GroupedColumns {
217-
fn for_category(column_category: ColumnTypeCategory, num_columnars: usize) -> Self {
218-
GroupedColumns {
231+
/// Check is column group can be skipped during serialization.
232+
fn is_empty(&self) -> bool {
233+
self.required_column_type.is_none() && self.columns.iter().all(Option::is_none)
234+
}
235+
236+
/// Returns the column type after merge.
237+
///
238+
/// This method does not check if the column types can actually be coerced to
239+
/// this type.
240+
fn column_type_after_merge(&self) -> ColumnType {
241+
if let Some(required_type) = self.required_column_type {
242+
return required_type;
243+
}
244+
let column_type: HashSet<ColumnType> = self
245+
.columns
246+
.iter()
247+
.flatten()
248+
.map(|column| column.column_type())
249+
.collect();
250+
if column_type.len() == 1 {
251+
return column_type.into_iter().next().unwrap();
252+
}
253+
// At the moment, only the numerical categorical column type has more than one possible
254+
// column type.
255+
assert!(self
256+
.columns
257+
.iter()
258+
.flatten()
259+
.all(|el| ColumnTypeCategory::from(el.column_type()) == ColumnTypeCategory::Numerical));
260+
merged_numerical_columns_type(self.columns.iter().flatten()).into()
261+
}
262+
}
263+
264+
struct GroupedColumnsHandle {
265+
required_column_type: Option<ColumnType>,
266+
columns: Vec<Option<DynamicColumnHandle>>,
267+
}
268+
269+
impl GroupedColumnsHandle {
270+
fn new(num_columnars: usize) -> Self {
271+
GroupedColumnsHandle {
219272
required_column_type: None,
220273
columns: vec![None; num_columnars],
221-
column_category,
222274
}
223275
}
276+
fn open(self, merge_row_order: &MergeRowOrder) -> io::Result<GroupedColumns> {
277+
let mut columns: Vec<Option<DynamicColumn>> = Vec::new();
278+
for (columnar_id, column) in self.columns.iter().enumerate() {
279+
if let Some(column) = column {
280+
let column = column.open()?;
281+
// We skip columns that end up with 0 documents.
282+
// That way, we make sure they don't end up influencing the merge type or
283+
// creating empty columns.
284+
285+
if is_empty_after_merge(merge_row_order, &column, columnar_id) {
286+
columns.push(None);
287+
} else {
288+
columns.push(Some(column));
289+
}
290+
} else {
291+
columns.push(None);
292+
}
293+
}
294+
Ok(GroupedColumns {
295+
required_column_type: self.required_column_type,
296+
columns,
297+
})
298+
}
224299

225300
/// Set the dynamic column for a given columnar.
226-
fn set_column(&mut self, columnar_id: usize, column: DynamicColumn) {
301+
fn set_column(&mut self, columnar_id: usize, column: DynamicColumnHandle) {
227302
self.columns[columnar_id] = Some(column);
228303
}
229304

@@ -245,29 +320,6 @@ impl GroupedColumns {
245320
self.required_column_type = Some(required_type);
246321
Ok(())
247322
}
248-
249-
/// Returns the column type after merge.
250-
///
251-
/// This method does not check if the column types can actually be coerced to
252-
/// this type.
253-
fn column_type_after_merge(&self) -> ColumnType {
254-
if let Some(required_type) = self.required_column_type {
255-
return required_type;
256-
}
257-
let column_type: HashSet<ColumnType> = self
258-
.columns
259-
.iter()
260-
.flatten()
261-
.map(|column| column.column_type())
262-
.collect();
263-
if column_type.len() == 1 {
264-
return column_type.into_iter().next().unwrap();
265-
}
266-
// At the moment, only the numerical categorical column type has more than one possible
267-
// column type.
268-
assert_eq!(self.column_category, ColumnTypeCategory::Numerical);
269-
merged_numerical_columns_type(self.columns.iter().flatten()).into()
270-
}
271323
}
272324

273325
/// Returns the type of the merged numerical column.
@@ -293,7 +345,7 @@ fn merged_numerical_columns_type<'a>(
293345
fn is_empty_after_merge(
294346
merge_row_order: &MergeRowOrder,
295347
column: &DynamicColumn,
296-
columnar_id: usize,
348+
columnar_ord: usize,
297349
) -> bool {
298350
if column.num_values() == 0u32 {
299351
// It was empty before the merge.
@@ -305,7 +357,7 @@ fn is_empty_after_merge(
305357
false
306358
}
307359
MergeRowOrder::Shuffled(shuffled) => {
308-
if let Some(alive_bitset) = &shuffled.alive_bitsets[columnar_id] {
360+
if let Some(alive_bitset) = &shuffled.alive_bitsets[columnar_ord] {
309361
let column_index = column.column_index();
310362
match column_index {
311363
ColumnIndex::Empty { .. } => true,
@@ -348,56 +400,34 @@ fn is_empty_after_merge(
348400
}
349401
}
350402

351-
#[allow(clippy::type_complexity)]
352-
fn group_columns_for_merge(
353-
columnar_readers: &[&ColumnarReader],
354-
required_columns: &[(String, ColumnType)],
355-
merge_row_order: &MergeRowOrder,
356-
) -> io::Result<BTreeMap<(String, ColumnType), Vec<Option<DynamicColumn>>>> {
357-
// Each column name may have multiple types of column associated.
358-
// For merging we are interested in the same column type category since they can be merged.
359-
let mut columns_grouped: HashMap<(String, ColumnTypeCategory), GroupedColumns> = HashMap::new();
403+
/// Iterates over the columns of the columnar readers, grouped by column name.
404+
/// Key functionality is that `open` of the Columns is done lazy per group.
405+
fn group_columns_for_merge<'a>(
406+
columnar_readers: &'a [&'a ColumnarReader],
407+
required_columns: &'a [(String, ColumnType)],
408+
_merge_row_order: &'a MergeRowOrder,
409+
) -> io::Result<BTreeMap<(String, ColumnTypeCategory), GroupedColumnsHandle>> {
410+
let mut columns: BTreeMap<(String, ColumnTypeCategory), GroupedColumnsHandle> = BTreeMap::new();
360411

361412
for &(ref column_name, column_type) in required_columns {
362-
columns_grouped
413+
columns
363414
.entry((column_name.clone(), column_type.into()))
364-
.or_insert_with(|| {
365-
GroupedColumns::for_category(column_type.into(), columnar_readers.len())
366-
})
415+
.or_insert_with(|| GroupedColumnsHandle::new(columnar_readers.len()))
367416
.require_type(column_type)?;
368417
}
369418

370419
for (columnar_id, columnar_reader) in columnar_readers.iter().enumerate() {
371-
let column_name_and_handle = columnar_reader.list_columns()?;
372-
// We skip columns that end up with 0 documents.
373-
// That way, we make sure they don't end up influencing the merge type or
374-
// creating empty columns.
420+
let column_name_and_handle = columnar_reader.iter_columns()?;
375421

376422
for (column_name, handle) in column_name_and_handle {
377423
let column_category: ColumnTypeCategory = handle.column_type().into();
378-
let column = handle.open()?;
379-
if is_empty_after_merge(merge_row_order, &column, columnar_id) {
380-
continue;
381-
}
382-
columns_grouped
424+
columns
383425
.entry((column_name, column_category))
384-
.or_insert_with(|| {
385-
GroupedColumns::for_category(column_category, columnar_readers.len())
386-
})
387-
.set_column(columnar_id, column);
426+
.or_insert_with(|| GroupedColumnsHandle::new(columnar_readers.len()))
427+
.set_column(columnar_id, handle);
388428
}
389429
}
390-
391-
let mut merge_columns: BTreeMap<(String, ColumnType), Vec<Option<DynamicColumn>>> =
392-
Default::default();
393-
394-
for ((column_name, _), mut grouped_columns) in columns_grouped {
395-
let column_type = grouped_columns.column_type_after_merge();
396-
coerce_columns(column_type, &mut grouped_columns.columns)?;
397-
merge_columns.insert((column_name, column_type), grouped_columns.columns);
398-
}
399-
400-
Ok(merge_columns)
430+
Ok(columns)
401431
}
402432

403433
fn coerce_columns(

0 commit comments

Comments
 (0)