Skip to content

Commit edcda2d

Browse files
committed
lazy columnar merge
This is the first part of addressing #3633 Instead of loading all Column into memory for the merge, only the current column_name group is loaded. This can be done since the sstable streams the columns lexicographically.
1 parent 820f126 commit edcda2d

File tree

7 files changed

+347
-111
lines changed

7 files changed

+347
-111
lines changed

columnar/src/columnar/merge/mod.rs

+165-54
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,14 @@ mod merge_dict_column;
22
mod merge_mapping;
33
mod term_merger;
44

5-
use std::collections::{BTreeMap, HashMap, HashSet};
5+
use std::collections::{HashMap, HashSet};
66
use std::io;
77
use std::net::Ipv6Addr;
8+
use std::rc::Rc;
89
use std::sync::Arc;
910

10-
use itertools::Itertools;
11+
use common::GroupByIteratorExtended;
12+
use itertools::{EitherOrBoth, Itertools};
1113
pub use merge_mapping::{MergeRowOrder, ShuffleMergeOrder, StackMergeOrder};
1214

1315
use super::writer::ColumnarSerializer;
@@ -18,7 +20,8 @@ use crate::columnar::writer::CompatibleNumericalTypes;
1820
use crate::columnar::ColumnarReader;
1921
use crate::dynamic_column::DynamicColumn;
2022
use crate::{
21-
BytesColumn, Column, ColumnIndex, ColumnType, ColumnValues, NumericalType, NumericalValue,
23+
BytesColumn, Column, ColumnIndex, ColumnType, ColumnValues, DynamicColumnHandle, NumericalType,
24+
NumericalValue,
2225
};
2326

2427
/// Column types are grouped into different categories.
@@ -83,9 +86,13 @@ pub fn merge_columnar(
8386
.iter()
8487
.map(|reader| reader.num_rows())
8588
.collect::<Vec<u32>>();
86-
let columns_to_merge =
87-
group_columns_for_merge(columnar_readers, required_columns, &merge_row_order)?;
88-
for ((column_name, column_type), columns) in columns_to_merge {
89+
90+
let columns_to_merge_iter =
91+
group_columns_for_merge_iter(columnar_readers, required_columns, &merge_row_order)?;
92+
for res in columns_to_merge_iter {
93+
let (column_name, column_type, grouped_columns) = res?;
94+
let columns = grouped_columns.columns;
95+
8996
let mut column_serializer =
9097
serializer.start_serialize_column(column_name.as_bytes(), column_type);
9198
merge_column(
@@ -97,6 +104,7 @@ pub fn merge_columnar(
97104
)?;
98105
column_serializer.finalize()?;
99106
}
107+
100108
serializer.finalize(merge_row_order.num_rows())?;
101109
Ok(())
102110
}
@@ -210,15 +218,13 @@ fn merge_column(
210218
struct GroupedColumns {
211219
required_column_type: Option<ColumnType>,
212220
columns: Vec<Option<DynamicColumn>>,
213-
column_category: ColumnTypeCategory,
214221
}
215222

216223
impl GroupedColumns {
217-
fn for_category(column_category: ColumnTypeCategory, num_columnars: usize) -> Self {
224+
fn new(num_columnars: usize) -> Self {
218225
GroupedColumns {
219226
required_column_type: None,
220227
columns: vec![None; num_columnars],
221-
column_category,
222228
}
223229
}
224230

@@ -265,7 +271,11 @@ impl GroupedColumns {
265271
}
266272
// At the moment, only the numerical categorical column type has more than one possible
267273
// column type.
268-
assert_eq!(self.column_category, ColumnTypeCategory::Numerical);
274+
assert!(self
275+
.columns
276+
.iter()
277+
.flatten()
278+
.all(|el| ColumnTypeCategory::from(el.column_type()) == ColumnTypeCategory::Numerical));
269279
merged_numerical_columns_type(self.columns.iter().flatten()).into()
270280
}
271281
}
@@ -293,7 +303,7 @@ fn merged_numerical_columns_type<'a>(
293303
fn is_empty_after_merge(
294304
merge_row_order: &MergeRowOrder,
295305
column: &DynamicColumn,
296-
columnar_id: usize,
306+
columnar_ord: usize,
297307
) -> bool {
298308
if column.num_values() == 0u32 {
299309
// It was empty before the merge.
@@ -305,7 +315,7 @@ fn is_empty_after_merge(
305315
false
306316
}
307317
MergeRowOrder::Shuffled(shuffled) => {
308-
if let Some(alive_bitset) = &shuffled.alive_bitsets[columnar_id] {
318+
if let Some(alive_bitset) = &shuffled.alive_bitsets[columnar_ord] {
309319
let column_index = column.column_index();
310320
match column_index {
311321
ColumnIndex::Empty { .. } => true,
@@ -348,56 +358,157 @@ fn is_empty_after_merge(
348358
}
349359
}
350360

351-
#[allow(clippy::type_complexity)]
352-
fn group_columns_for_merge(
353-
columnar_readers: &[&ColumnarReader],
354-
required_columns: &[(String, ColumnType)],
355-
merge_row_order: &MergeRowOrder,
356-
) -> io::Result<BTreeMap<(String, ColumnType), Vec<Option<DynamicColumn>>>> {
357-
// Each column name may have multiple types of column associated.
358-
// For merging we are interested in the same column type category since they can be merged.
359-
let mut columns_grouped: HashMap<(String, ColumnTypeCategory), GroupedColumns> = HashMap::new();
360-
361-
for &(ref column_name, column_type) in required_columns {
362-
columns_grouped
363-
.entry((column_name.clone(), column_type.into()))
364-
.or_insert_with(|| {
365-
GroupedColumns::for_category(column_type.into(), columnar_readers.len())
366-
})
367-
.require_type(column_type)?;
361+
type MergeIter<'a> =
362+
Box<dyn Iterator<Item = io::Result<(Rc<str>, ColumnType, GroupedColumns)>> + 'a>;
363+
364+
#[derive(Debug, Clone)]
365+
struct MergeColumn {
366+
column_name: Rc<str>,
367+
reader_ord: usize,
368+
column: DynamicColumnHandle,
369+
}
370+
impl MergeColumn {
371+
fn new(column_name: Rc<str>, reader_ord: usize, column: DynamicColumnHandle) -> Self {
372+
MergeColumn {
373+
column_name,
374+
reader_ord,
375+
column,
376+
}
368377
}
378+
}
369379

370-
for (columnar_id, columnar_reader) in columnar_readers.iter().enumerate() {
371-
let column_name_and_handle = columnar_reader.list_columns()?;
372-
// We skip columns that end up with 0 documents.
373-
// That way, we make sure they don't end up influencing the merge type or
374-
// creating empty columns.
375-
376-
for (column_name, handle) in column_name_and_handle {
377-
let column_category: ColumnTypeCategory = handle.column_type().into();
378-
let column = handle.open()?;
379-
if is_empty_after_merge(merge_row_order, &column, columnar_id) {
380-
continue;
380+
/// Iterates over the columns of the columnar readers, grouped by column name.
381+
/// Key functionality is that `open` of the Columns is done lazy per group.
382+
fn group_columns_for_merge_iter<'a>(
383+
columnar_readers: &'a [&'a ColumnarReader],
384+
required_columns: &'a [(String, ColumnType)],
385+
merge_row_order: &'a MergeRowOrder,
386+
) -> io::Result<impl Iterator<Item = io::Result<(Rc<str>, ColumnType, GroupedColumns)>> + 'a> {
387+
let column_iters: Vec<_> = columnar_readers
388+
.iter()
389+
.enumerate()
390+
.map(|(reader_ord, reader)| {
391+
Ok(reader
392+
.iter_columns()?
393+
.map(move |el| MergeColumn::new(Rc::from(el.0), reader_ord, el.1)))
394+
})
395+
.collect::<io::Result<_>>()?;
396+
let required_columns_map: HashMap<String, ColumnType> = required_columns
397+
.iter()
398+
.map(|(col_name, typ)| (col_name.to_string(), *typ))
399+
.collect::<HashMap<String, _>>();
400+
let mut required_columns_list: Vec<String> = required_columns
401+
.iter()
402+
.map(|(col_name, _)| col_name.to_string())
403+
.collect();
404+
required_columns_list.sort();
405+
406+
// Kmerge and group on column_name.
407+
let group_iter = GroupByIteratorExtended::group_by(
408+
column_iters
409+
.into_iter()
410+
.kmerge_by(|a, b| a.column_name < b.column_name),
411+
|el| el.column_name.clone(),
412+
);
413+
414+
// Weave in the required columns into the sorted by column name iterator.
415+
let groups_with_required = required_columns_list
416+
.into_iter()
417+
.merge_join_by(group_iter, |a, b| (a.as_str()).cmp(&b.0));
418+
419+
Ok(groups_with_required.flat_map(move |either| {
420+
// It should be possible to do the grouping also on the column type in one pass, but some
421+
// tests are failing.
422+
let mut force_type: Option<ColumnType> = None;
423+
let (column_name, group) = match either {
424+
// set required column
425+
EitherOrBoth::Both(_required, (key, group)) => {
426+
force_type = required_columns_map.get(&*key).cloned();
427+
(key, group)
381428
}
382-
columns_grouped
383-
.entry((column_name, column_category))
384-
.or_insert_with(|| {
385-
GroupedColumns::for_category(column_category, columnar_readers.len())
429+
// Only required - Return artificial empty column
430+
EitherOrBoth::Left(column_name) => {
431+
return generate_require_column(
432+
Rc::from(column_name),
433+
columnar_readers,
434+
&required_columns_map,
435+
);
436+
}
437+
// no required column
438+
EitherOrBoth::Right((key, group)) => (key, group),
439+
};
440+
let mut group: Vec<MergeColumn> = group.collect();
441+
// We need to create an iterator that returns the columns in the order of `to_code` of
442+
// ColumnType
443+
group.sort_by_key(|el| el.column.column_type);
444+
let group_iter = GroupByIteratorExtended::group_by(group.into_iter(), |el| {
445+
let cat_type: ColumnTypeCategory = el.column.column_type().into();
446+
cat_type
447+
});
448+
let group_column_iter = group_iter.map(move |(_cat, group)| {
449+
group_columns_iter(
450+
column_name.clone(),
451+
columnar_readers,
452+
force_type,
453+
merge_row_order,
454+
group,
455+
)
456+
});
457+
let iter = group_column_iter.filter(move |res| {
458+
// Filter out empty columns.
459+
res.as_ref()
460+
.map(|(_, _, group)| {
461+
let column_is_required = force_type.is_some();
462+
if column_is_required {
463+
return true;
464+
}
465+
let all_columns_none = group.columns.iter().all(|column| column.is_none());
466+
!all_columns_none
386467
})
387-
.set_column(columnar_id, column);
388-
}
389-
}
468+
.unwrap_or(true)
469+
});
470+
Box::new(iter)
471+
}))
472+
}
390473

391-
let mut merge_columns: BTreeMap<(String, ColumnType), Vec<Option<DynamicColumn>>> =
392-
Default::default();
474+
fn generate_require_column<'a>(
475+
column_name: Rc<str>,
476+
columnar_readers: &'a [&'a ColumnarReader],
477+
required_columns_map: &HashMap<String, ColumnType>,
478+
) -> MergeIter<'a> {
479+
let mut grouped_columns = GroupedColumns::new(columnar_readers.len());
480+
let force_type: ColumnType = required_columns_map.get(&*column_name).cloned().unwrap();
481+
grouped_columns.require_type(force_type).unwrap(); // Can't panic
482+
Box::new(std::iter::once(Ok((
483+
column_name,
484+
force_type,
485+
grouped_columns,
486+
)))) as MergeIter<'a>
487+
}
393488

394-
for ((column_name, _), mut grouped_columns) in columns_grouped {
395-
let column_type = grouped_columns.column_type_after_merge();
396-
coerce_columns(column_type, &mut grouped_columns.columns)?;
397-
merge_columns.insert((column_name, column_type), grouped_columns.columns);
489+
fn group_columns_iter<'a>(
490+
column_name: Rc<str>,
491+
columnar_readers: &'a [&'a ColumnarReader],
492+
force_type: Option<ColumnType>,
493+
merge_row_order: &'a MergeRowOrder,
494+
group: impl Iterator<Item = MergeColumn>,
495+
) -> io::Result<(Rc<str>, ColumnType, GroupedColumns)> {
496+
let mut grouped_columns = GroupedColumns::new(columnar_readers.len());
497+
if let Some(force_type) = force_type {
498+
grouped_columns.require_type(force_type)?;
398499
}
500+
for col in group {
501+
let columnar_ord = col.reader_ord;
502+
let column = col.column.open()?;
503+
if !is_empty_after_merge(merge_row_order, &column, columnar_ord) {
504+
grouped_columns.set_column(col.reader_ord, column);
505+
}
506+
}
507+
508+
let column_type = grouped_columns.column_type_after_merge();
509+
coerce_columns(column_type, &mut grouped_columns.columns)?;
399510

400-
Ok(merge_columns)
511+
Ok((column_name, column_type, grouped_columns))
401512
}
402513

403514
fn coerce_columns(

0 commit comments

Comments
 (0)