Skip to content

Commit 07093a4

Browse files
authored
Speedup take_bytes (-35% -69%) by precalculating capacity (#7422)
* Speedup take_bytes * Speedup take_bytes 2 * Speedup take_bytes 3 * WIP * WIP * WIP * Refactor * Refactor * Refactor * Refactor * Fix capacity * Fix * Fix * Fix * Refactor and speedup null indices / null values * Fix * Fix * Fmt
1 parent 5f0aed6 commit 07093a4

File tree

1 file changed

+61
-50
lines changed

1 file changed

+61
-50
lines changed

arrow-select/src/take.rs

+61-50
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,10 @@ use arrow_array::cast::AsArray;
2424
use arrow_array::types::*;
2525
use arrow_array::*;
2626
use arrow_buffer::{
27-
bit_util, ArrowNativeType, BooleanBuffer, Buffer, MutableBuffer, NullBuffer, ScalarBuffer,
27+
bit_util, ArrowNativeType, BooleanBuffer, Buffer, MutableBuffer, NullBuffer, OffsetBuffer,
28+
ScalarBuffer,
2829
};
29-
use arrow_data::{ArrayData, ArrayDataBuilder};
30+
use arrow_data::ArrayDataBuilder;
3031
use arrow_schema::{ArrowError, DataType, FieldRef, UnionMode};
3132

3233
use num::{One, Zero};
@@ -465,83 +466,92 @@ fn take_bytes<T: ByteArrayType, IndexType: ArrowPrimitiveType>(
465466
array: &GenericByteArray<T>,
466467
indices: &PrimitiveArray<IndexType>,
467468
) -> Result<GenericByteArray<T>, ArrowError> {
468-
let data_len = indices.len();
469-
470-
let bytes_offset = (data_len + 1) * std::mem::size_of::<T::Offset>();
471-
let mut offsets = MutableBuffer::new(bytes_offset);
469+
let mut offsets = Vec::with_capacity(indices.len() + 1);
472470
offsets.push(T::Offset::default());
473471

474-
let mut values = MutableBuffer::new(0);
472+
let input_offsets = array.value_offsets();
473+
let mut capacity = 0;
474+
let nulls = take_nulls(array.nulls(), indices);
475475

476-
let nulls;
477-
if array.null_count() == 0 && indices.null_count() == 0 {
476+
let (offsets, values) = if array.null_count() == 0 && indices.null_count() == 0 {
478477
offsets.extend(indices.values().iter().map(|index| {
479-
let s: &[u8] = array.value(index.as_usize()).as_ref();
480-
values.extend_from_slice(s);
481-
T::Offset::usize_as(values.len())
478+
let index = index.as_usize();
479+
capacity += input_offsets[index + 1].as_usize() - input_offsets[index].as_usize();
480+
T::Offset::from_usize(capacity).expect("overflow")
482481
}));
483-
nulls = None
484-
} else if indices.null_count() == 0 {
485-
let num_bytes = bit_util::ceil(data_len, 8);
482+
let mut values = Vec::with_capacity(capacity);
486483

487-
let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, true);
488-
let null_slice = null_buf.as_slice_mut();
489-
offsets.extend(indices.values().iter().enumerate().map(|(i, index)| {
484+
for index in indices.values() {
485+
values.extend_from_slice(array.value(index.as_usize()).as_ref());
486+
}
487+
(offsets, values)
488+
} else if indices.null_count() == 0 {
489+
offsets.extend(indices.values().iter().map(|index| {
490490
let index = index.as_usize();
491491
if array.is_valid(index) {
492-
let s: &[u8] = array.value(index).as_ref();
493-
values.extend_from_slice(s.as_ref());
494-
} else {
495-
bit_util::unset_bit(null_slice, i);
492+
capacity += input_offsets[index + 1].as_usize() - input_offsets[index].as_usize();
496493
}
497-
T::Offset::usize_as(values.len())
494+
T::Offset::from_usize(capacity).expect("overflow")
498495
}));
499-
nulls = Some(null_buf.into());
496+
let mut values = Vec::with_capacity(capacity);
497+
498+
for index in indices.values() {
499+
let index = index.as_usize();
500+
if array.is_valid(index) {
501+
values.extend_from_slice(array.value(index).as_ref());
502+
}
503+
}
504+
(offsets, values)
500505
} else if array.null_count() == 0 {
501506
offsets.extend(indices.values().iter().enumerate().map(|(i, index)| {
507+
let index = index.as_usize();
502508
if indices.is_valid(i) {
503-
let s: &[u8] = array.value(index.as_usize()).as_ref();
504-
values.extend_from_slice(s);
509+
capacity += input_offsets[index + 1].as_usize() - input_offsets[index].as_usize();
505510
}
506-
T::Offset::usize_as(values.len())
511+
T::Offset::from_usize(capacity).expect("overflow")
507512
}));
508-
nulls = indices.nulls().map(|b| b.inner().sliced());
509-
} else {
510-
let num_bytes = bit_util::ceil(data_len, 8);
513+
let mut values = Vec::with_capacity(capacity);
511514

512-
let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, true);
513-
let null_slice = null_buf.as_slice_mut();
515+
for (i, index) in indices.values().iter().enumerate() {
516+
if indices.is_valid(i) {
517+
values.extend_from_slice(array.value(index.as_usize()).as_ref());
518+
}
519+
}
520+
(offsets, values)
521+
} else {
522+
let nulls = nulls.as_ref().unwrap();
514523
offsets.extend(indices.values().iter().enumerate().map(|(i, index)| {
524+
let index = index.as_usize();
525+
if nulls.is_valid(i) {
526+
capacity += input_offsets[index + 1].as_usize() - input_offsets[index].as_usize();
527+
}
528+
T::Offset::from_usize(capacity).expect("overflow")
529+
}));
530+
let mut values = Vec::with_capacity(capacity);
531+
532+
for (i, index) in indices.values().iter().enumerate() {
515533
// check index is valid before using index. The value in
516534
// NULL index slots may not be within bounds of array
517535
let index = index.as_usize();
518-
if indices.is_valid(i) && array.is_valid(index) {
519-
let s: &[u8] = array.value(index).as_ref();
520-
values.extend_from_slice(s);
521-
} else {
522-
// set null bit
523-
bit_util::unset_bit(null_slice, i);
536+
if nulls.is_valid(i) {
537+
values.extend_from_slice(array.value(index).as_ref());
524538
}
525-
T::Offset::usize_as(values.len())
526-
}));
527-
nulls = Some(null_buf.into())
528-
}
539+
}
540+
(offsets, values)
541+
};
529542

530543
T::Offset::from_usize(values.len()).ok_or(ArrowError::ComputeError(format!(
531544
"Offset overflow for {}BinaryArray: {}",
532545
T::Offset::PREFIX,
533546
values.len()
534547
)))?;
535548

536-
let array_data = ArrayData::builder(T::DATA_TYPE)
537-
.len(data_len)
538-
.add_buffer(offsets.into())
539-
.add_buffer(values.into())
540-
.null_bit_buffer(nulls);
541-
542-
let array_data = unsafe { array_data.build_unchecked() };
549+
let array = unsafe {
550+
let offsets = OffsetBuffer::new_unchecked(offsets.into());
551+
GenericByteArray::<T>::new_unchecked(offsets, values.into(), nulls)
552+
};
543553

544-
Ok(GenericByteArray::from(array_data))
554+
Ok(array)
545555
}
546556

547557
/// `take` implementation for byte view arrays
@@ -949,6 +959,7 @@ mod tests {
949959
use super::*;
950960
use arrow_array::builder::*;
951961
use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano};
962+
use arrow_data::ArrayData;
952963
use arrow_schema::{Field, Fields, TimeUnit, UnionFields};
953964

954965
fn test_take_decimal_arrays(

0 commit comments

Comments
 (0)