@@ -16,127 +16,6 @@ namespace NKikimr::NArrow {
16
16
17
17
#define Y_VERIFY_OK (status ) Y_ABORT_UNLESS(status.ok(), " %s" , status.ToString().c_str())
18
18
19
- namespace {
20
- enum class ECompareResult : i8 {
21
- LESS = -1 ,
22
- BORDER = 0 ,
23
- GREATER = 1
24
- };
25
-
26
- template <typename TArray>
27
- inline auto GetValue (const std::shared_ptr<TArray>& array, int pos) {
28
- return array->GetView (pos);
29
- }
30
-
31
- template <typename T>
32
- inline void UpdateCompare (const T& value, const T& border, ECompareResult& res) {
33
- if (res == ECompareResult::BORDER) {
34
- if constexpr (std::is_same_v<T, arrow::util::string_view>) {
35
- size_t minSize = (value.size () < border.size ()) ? value.size () : border.size ();
36
- int cmp = memcmp (value.data (), border.data (), minSize);
37
- if (cmp < 0 ) {
38
- res = ECompareResult::LESS;
39
- } else if (cmp > 0 ) {
40
- res = ECompareResult::GREATER;
41
- } else {
42
- UpdateCompare (value.size (), border.size (), res);
43
- }
44
- } else {
45
- if (value < border) {
46
- res = ECompareResult::LESS;
47
- } else if (value > border) {
48
- res = ECompareResult::GREATER;
49
- }
50
- }
51
- }
52
- }
53
-
54
- template <typename TArray, typename T>
55
- bool CompareImpl (const std::shared_ptr<arrow::Array>& column, const T& border, std::vector<NArrow::ECompareResult>& rowsCmp) {
56
- bool hasBorder = false ;
57
- ECompareResult* res = &rowsCmp[0 ];
58
- auto array = std::static_pointer_cast<TArray>(column);
59
-
60
- for (int i = 0 ; i < array->length (); ++i, ++res) {
61
- UpdateCompare (GetValue (array, i), border, *res);
62
- hasBorder = hasBorder || (*res == ECompareResult::BORDER);
63
- }
64
- return !hasBorder;
65
- }
66
-
67
- template <typename TArray, typename T>
68
- bool CompareImpl (const std::shared_ptr<arrow::ChunkedArray>& column, const T& border, std::vector<NArrow::ECompareResult>& rowsCmp) {
69
- bool hasBorder = false ;
70
- ECompareResult* res = &rowsCmp[0 ];
71
-
72
- for (auto & chunk : column->chunks ()) {
73
- auto array = std::static_pointer_cast<TArray>(chunk);
74
-
75
- for (int i = 0 ; i < chunk->length (); ++i, ++res) {
76
- UpdateCompare (GetValue (array, i), border, *res);
77
- hasBorder = hasBorder || (*res == ECompareResult::BORDER);
78
- }
79
- }
80
- return !hasBorder;
81
- }
82
-
83
- // / @return true in case we have no borders in compare: no need for future keys, allow early exit
84
- template <typename TArray>
85
- bool Compare (const arrow::Datum& column, const std::shared_ptr<arrow::Array>& borderArray, std::vector<NArrow::ECompareResult>& rowsCmp) {
86
- auto border = GetValue (std::static_pointer_cast<TArray>(borderArray), 0 );
87
-
88
- switch (column.kind ()) {
89
- case arrow::Datum::ARRAY:
90
- return CompareImpl<TArray>(column.make_array (), border, rowsCmp);
91
- case arrow::Datum::CHUNKED_ARRAY:
92
- return CompareImpl<TArray>(column.chunked_array (), border, rowsCmp);
93
- default :
94
- break ;
95
- }
96
- Y_ABORT_UNLESS (false );
97
- return false ;
98
- }
99
-
100
- bool SwitchCompare (const arrow::Datum& column, const std::shared_ptr<arrow::Array>& border, std::vector<NArrow::ECompareResult>& rowsCmp) {
101
- Y_ABORT_UNLESS (border->length () == 1 );
102
-
103
- // first time it's empty
104
- if (rowsCmp.empty ()) {
105
- rowsCmp.resize (column.length (), ECompareResult::BORDER);
106
- }
107
-
108
- return SwitchArrayType (column, [&](const auto & type) -> bool {
109
- using TWrap = std::decay_t <decltype (type)>;
110
- using TArray = typename arrow::TypeTraits<typename TWrap::T>::ArrayType;
111
- return Compare<TArray>(column, border, rowsCmp);
112
- });
113
- }
114
-
115
- template <typename T>
116
- void CompositeCompare (std::shared_ptr<T> some, std::shared_ptr<arrow::RecordBatch> borderBatch, std::vector<NArrow::ECompareResult>& rowsCmp) {
117
- AFL_VERIFY (some);
118
- AFL_VERIFY (borderBatch);
119
- auto key = borderBatch->schema ()->fields ();
120
- AFL_VERIFY (key.size ());
121
-
122
- for (size_t i = 0 ; i < key.size (); ++i) {
123
- auto & field = key[i];
124
- auto typeId = field->type ()->id ();
125
- auto column = some->GetColumnByName (field->name ());
126
- std::shared_ptr<arrow::Array> border = borderBatch->GetColumnByName (field->name ());
127
- AFL_VERIFY (column)(" schema1" , some->schema ()->ToString ())(" schema2" , borderBatch->schema ()->ToString ())(" f" , field->name ());
128
- AFL_VERIFY (border)(" schema1" , some->schema ()->ToString ())(" schema2" , borderBatch->schema ()->ToString ())(" f" , field->name ());
129
- AFL_VERIFY (some->schema ()->GetFieldByName (field->name ())->type ()->id () == typeId)(" schema1" , some->schema ()->ToString ())(
130
- " schema2" , borderBatch->schema ()->ToString ())(" f" , field->name ());
131
-
132
- if (SwitchCompare (column, border, rowsCmp)) {
133
- break ; // early exit in case we have all rows compared: no borders, can omit key tail
134
- }
135
- }
136
- }
137
-
138
- } // namespace
139
-
140
19
TColumnFilter::TSlicesIterator::TSlicesIterator (const TColumnFilter& owner, const std::optional<ui32> start, const std::optional<ui32> count)
141
20
: Owner(owner)
142
21
, StartIndex(start)
@@ -307,61 +186,6 @@ ui32 TColumnFilter::CrossSize(const ui32 s1, const ui32 f1, const ui32 s2, const
307
186
return f - s;
308
187
}
309
188
310
- NKikimr::NArrow::TColumnFilter TColumnFilter::MakePredicateFilter (
311
- const arrow::Datum& datum, const arrow::Datum& border, ECompareType compareType) {
312
- std::vector<ECompareResult> cmps;
313
-
314
- switch (datum.kind ()) {
315
- case arrow::Datum::ARRAY:
316
- Y_ABORT_UNLESS (border.kind () == arrow::Datum::ARRAY);
317
- SwitchCompare (datum, border.make_array (), cmps);
318
- break ;
319
- case arrow::Datum::CHUNKED_ARRAY:
320
- Y_ABORT_UNLESS (border.kind () == arrow::Datum::ARRAY);
321
- SwitchCompare (datum, border.make_array (), cmps);
322
- break ;
323
- case arrow::Datum::RECORD_BATCH:
324
- Y_ABORT_UNLESS (border.kind () == arrow::Datum::RECORD_BATCH);
325
- CompositeCompare (datum.record_batch (), border.record_batch (), cmps);
326
- break ;
327
- case arrow::Datum::TABLE:
328
- Y_ABORT_UNLESS (border.kind () == arrow::Datum::RECORD_BATCH);
329
- CompositeCompare (datum.table (), border.record_batch (), cmps);
330
- break ;
331
- default :
332
- Y_ABORT_UNLESS (false );
333
- break ;
334
- }
335
-
336
- std::vector<bool > bits;
337
- bits.reserve (cmps.size ());
338
-
339
- switch (compareType) {
340
- case ECompareType::LESS:
341
- for (size_t i = 0 ; i < cmps.size (); ++i) {
342
- bits.emplace_back (cmps[i] < ECompareResult::BORDER);
343
- }
344
- break ;
345
- case ECompareType::LESS_OR_EQUAL:
346
- for (size_t i = 0 ; i < cmps.size (); ++i) {
347
- bits.emplace_back (cmps[i] <= ECompareResult::BORDER);
348
- }
349
- break ;
350
- case ECompareType::GREATER:
351
- for (size_t i = 0 ; i < cmps.size (); ++i) {
352
- bits.emplace_back (cmps[i] > ECompareResult::BORDER);
353
- }
354
- break ;
355
- case ECompareType::GREATER_OR_EQUAL:
356
- for (size_t i = 0 ; i < cmps.size (); ++i) {
357
- bits.emplace_back (cmps[i] >= ECompareResult::BORDER);
358
- }
359
- break ;
360
- }
361
-
362
- return NArrow::TColumnFilter (std::move (bits));
363
- }
364
-
365
189
template <class TData >
366
190
bool ApplyImpl (const TColumnFilter& filter, std::shared_ptr<TData>& batch, const TColumnFilter::TApplyContext& context) {
367
191
if (!batch || !batch->num_rows ()) {
0 commit comments