Skip to content

Commit e1b94ec

Browse files
authored
Merge fda2720 into 41df44f
2 parents 41df44f + fda2720 commit e1b94ec

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+1306
-277
lines changed

Diff for: ydb/core/formats/arrow/accessor/abstract/accessor.cpp

+40
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,37 @@ std::shared_ptr<arrow::Array> IChunkedArray::TReader::CopyRecord(const ui64 reco
2323
return NArrow::CopyRecords(address.GetArray(), { address.GetPosition() });
2424
}
2525

26+
IChunkedArray::TRowRange IChunkedArray::TReader::EqualRange(const TAddress& value, const TRowRange& range) const {
27+
const TRowRange clippedRange = range.Intersect({0, GetRecordsCount()});
28+
const auto localIndexes = std::ranges::iota_view(clippedRange.GetBegin(), clippedRange.GetEnd());
29+
30+
const ui64 begin = [&]() {
31+
const auto findBound =
32+
std::lower_bound(localIndexes.begin(), localIndexes.end(), value, [this](const ui64 index, const TAddress& bound) {
33+
return GetReadChunk(index).Compare(bound) == std::partial_ordering::less;
34+
});
35+
if (findBound == localIndexes.end()) {
36+
return clippedRange.GetEnd();
37+
} else {
38+
return *findBound;
39+
}
40+
}();
41+
42+
const ui64 end = [&]() {
43+
const auto findBound =
44+
std::upper_bound(localIndexes.begin(), localIndexes.end(), value, [this](const TAddress& bound, const ui64 index) {
45+
return GetReadChunk(index).Compare(bound) == std::partial_ordering::greater;
46+
});
47+
if (findBound == localIndexes.end()) {
48+
return clippedRange.GetEnd();
49+
} else {
50+
return *findBound;
51+
}
52+
}();
53+
54+
return { begin, end };
55+
}
56+
2657
std::shared_ptr<arrow::ChunkedArray> IChunkedArray::Slice(const ui32 offset, const ui32 count) const {
2758
AFL_VERIFY(offset + count <= (ui64)GetRecordsCount())("offset", offset)("count", count)("length", GetRecordsCount());
2859
ui32 currentOffset = offset;
@@ -145,6 +176,15 @@ std::shared_ptr<arrow::ChunkedArray> IChunkedArray::GetChunkedArray() const {
145176
return std::make_shared<arrow::ChunkedArray>(chunks, GetDataType());
146177
}
147178

179+
TColumnFilter IChunkedArray::TRowRange::MakeFilter(const ui64 recordsCount) const {
180+
AFL_VERIFY(End <= recordsCount)("end", End)("count", recordsCount);
181+
TColumnFilter result = TColumnFilter::BuildAllowFilter();
182+
result.Add(false, Begin);
183+
result.Add(true, Size());
184+
result.Add(false, recordsCount - End);
185+
return result;
186+
}
187+
148188
TString IChunkedArray::TReader::DebugString(const ui32 position) const {
149189
auto address = GetReadChunk(position);
150190
return NArrow::DebugString(address.GetArray(), address.GetPosition());

Diff for: ydb/core/formats/arrow/accessor/abstract/accessor.h

+42
Original file line numberDiff line numberDiff line change
@@ -387,6 +387,47 @@ class IChunkedArray {
387387
return DoGetLocalData(chunkCurrent, position);
388388
}
389389

390+
class TRowRange {
391+
private:
392+
YDB_READONLY_DEF(ui64, Begin);
393+
YDB_READONLY_DEF(ui64, End);
394+
395+
public:
396+
TRowRange Intersect(const TRowRange& other) const {
397+
const ui64 begin = Max(Begin, other.Begin);
398+
const ui64 end = Min(End, other.End);
399+
if (begin > end) {
400+
return {0, 0};
401+
}
402+
return {begin, end};
403+
}
404+
405+
ui64 Size() const {
406+
return End - Begin;
407+
}
408+
409+
bool Empty() const {
410+
return Begin == End;
411+
}
412+
413+
TColumnFilter MakeFilter(const ui64 recordsCount) const;
414+
415+
TString DebugString() const {
416+
return TStringBuilder() << "[" << Begin << ";" << End << ")";
417+
}
418+
419+
TRowRange(const ui64 begin, const ui64 end)
420+
: Begin(begin)
421+
, End(end) {
422+
AFL_VERIFY(Begin <= End)("begin", Begin)("end", End);
423+
}
424+
425+
TRowRange(const ui64 end)
426+
: Begin(0)
427+
, End(end) {
428+
}
429+
};
430+
390431
class TReader {
391432
private:
392433
std::shared_ptr<IChunkedArray> ChunkedArray;
@@ -408,6 +449,7 @@ class IChunkedArray {
408449
void AppendPositionTo(arrow::ArrayBuilder& builder, const ui64 position, ui64* recordSize) const;
409450
std::shared_ptr<arrow::Array> CopyRecord(const ui64 recordIndex) const;
410451
TString DebugString(const ui32 position) const;
452+
TRowRange EqualRange(const TAddress& value, const TRowRange& range) const;
411453
};
412454

413455
std::shared_ptr<arrow::Scalar> GetScalar(const ui32 index) const {

Diff for: ydb/core/formats/arrow/arrow_filter.cpp

-176
Original file line numberDiff line numberDiff line change
@@ -16,127 +16,6 @@ namespace NKikimr::NArrow {
1616

1717
#define Y_VERIFY_OK(status) Y_ABORT_UNLESS(status.ok(), "%s", status.ToString().c_str())
1818

19-
namespace {
20-
enum class ECompareResult : i8 {
21-
LESS = -1,
22-
BORDER = 0,
23-
GREATER = 1
24-
};
25-
26-
template <typename TArray>
27-
inline auto GetValue(const std::shared_ptr<TArray>& array, int pos) {
28-
return array->GetView(pos);
29-
}
30-
31-
template <typename T>
32-
inline void UpdateCompare(const T& value, const T& border, ECompareResult& res) {
33-
if (res == ECompareResult::BORDER) {
34-
if constexpr (std::is_same_v<T, arrow::util::string_view>) {
35-
size_t minSize = (value.size() < border.size()) ? value.size() : border.size();
36-
int cmp = memcmp(value.data(), border.data(), minSize);
37-
if (cmp < 0) {
38-
res = ECompareResult::LESS;
39-
} else if (cmp > 0) {
40-
res = ECompareResult::GREATER;
41-
} else {
42-
UpdateCompare(value.size(), border.size(), res);
43-
}
44-
} else {
45-
if (value < border) {
46-
res = ECompareResult::LESS;
47-
} else if (value > border) {
48-
res = ECompareResult::GREATER;
49-
}
50-
}
51-
}
52-
}
53-
54-
template <typename TArray, typename T>
55-
bool CompareImpl(const std::shared_ptr<arrow::Array>& column, const T& border, std::vector<NArrow::ECompareResult>& rowsCmp) {
56-
bool hasBorder = false;
57-
ECompareResult* res = &rowsCmp[0];
58-
auto array = std::static_pointer_cast<TArray>(column);
59-
60-
for (int i = 0; i < array->length(); ++i, ++res) {
61-
UpdateCompare(GetValue(array, i), border, *res);
62-
hasBorder = hasBorder || (*res == ECompareResult::BORDER);
63-
}
64-
return !hasBorder;
65-
}
66-
67-
template <typename TArray, typename T>
68-
bool CompareImpl(const std::shared_ptr<arrow::ChunkedArray>& column, const T& border, std::vector<NArrow::ECompareResult>& rowsCmp) {
69-
bool hasBorder = false;
70-
ECompareResult* res = &rowsCmp[0];
71-
72-
for (auto& chunk : column->chunks()) {
73-
auto array = std::static_pointer_cast<TArray>(chunk);
74-
75-
for (int i = 0; i < chunk->length(); ++i, ++res) {
76-
UpdateCompare(GetValue(array, i), border, *res);
77-
hasBorder = hasBorder || (*res == ECompareResult::BORDER);
78-
}
79-
}
80-
return !hasBorder;
81-
}
82-
83-
/// @return true in case we have no borders in compare: no need for future keys, allow early exit
84-
template <typename TArray>
85-
bool Compare(const arrow::Datum& column, const std::shared_ptr<arrow::Array>& borderArray, std::vector<NArrow::ECompareResult>& rowsCmp) {
86-
auto border = GetValue(std::static_pointer_cast<TArray>(borderArray), 0);
87-
88-
switch (column.kind()) {
89-
case arrow::Datum::ARRAY:
90-
return CompareImpl<TArray>(column.make_array(), border, rowsCmp);
91-
case arrow::Datum::CHUNKED_ARRAY:
92-
return CompareImpl<TArray>(column.chunked_array(), border, rowsCmp);
93-
default:
94-
break;
95-
}
96-
Y_ABORT_UNLESS(false);
97-
return false;
98-
}
99-
100-
bool SwitchCompare(const arrow::Datum& column, const std::shared_ptr<arrow::Array>& border, std::vector<NArrow::ECompareResult>& rowsCmp) {
101-
Y_ABORT_UNLESS(border->length() == 1);
102-
103-
// first time it's empty
104-
if (rowsCmp.empty()) {
105-
rowsCmp.resize(column.length(), ECompareResult::BORDER);
106-
}
107-
108-
return SwitchArrayType(column, [&](const auto& type) -> bool {
109-
using TWrap = std::decay_t<decltype(type)>;
110-
using TArray = typename arrow::TypeTraits<typename TWrap::T>::ArrayType;
111-
return Compare<TArray>(column, border, rowsCmp);
112-
});
113-
}
114-
115-
template <typename T>
116-
void CompositeCompare(std::shared_ptr<T> some, std::shared_ptr<arrow::RecordBatch> borderBatch, std::vector<NArrow::ECompareResult>& rowsCmp) {
117-
AFL_VERIFY(some);
118-
AFL_VERIFY(borderBatch);
119-
auto key = borderBatch->schema()->fields();
120-
AFL_VERIFY(key.size());
121-
122-
for (size_t i = 0; i < key.size(); ++i) {
123-
auto& field = key[i];
124-
auto typeId = field->type()->id();
125-
auto column = some->GetColumnByName(field->name());
126-
std::shared_ptr<arrow::Array> border = borderBatch->GetColumnByName(field->name());
127-
AFL_VERIFY(column)("schema1", some->schema()->ToString())("schema2", borderBatch->schema()->ToString())("f", field->name());
128-
AFL_VERIFY(border)("schema1", some->schema()->ToString())("schema2", borderBatch->schema()->ToString())("f", field->name());
129-
AFL_VERIFY(some->schema()->GetFieldByName(field->name())->type()->id() == typeId)("schema1", some->schema()->ToString())(
130-
"schema2", borderBatch->schema()->ToString())("f", field->name());
131-
132-
if (SwitchCompare(column, border, rowsCmp)) {
133-
break; // early exit in case we have all rows compared: no borders, can omit key tail
134-
}
135-
}
136-
}
137-
138-
} // namespace
139-
14019
TColumnFilter::TSlicesIterator::TSlicesIterator(const TColumnFilter& owner, const std::optional<ui32> start, const std::optional<ui32> count)
14120
: Owner(owner)
14221
, StartIndex(start)
@@ -307,61 +186,6 @@ ui32 TColumnFilter::CrossSize(const ui32 s1, const ui32 f1, const ui32 s2, const
307186
return f - s;
308187
}
309188

310-
NKikimr::NArrow::TColumnFilter TColumnFilter::MakePredicateFilter(
311-
const arrow::Datum& datum, const arrow::Datum& border, ECompareType compareType) {
312-
std::vector<ECompareResult> cmps;
313-
314-
switch (datum.kind()) {
315-
case arrow::Datum::ARRAY:
316-
Y_ABORT_UNLESS(border.kind() == arrow::Datum::ARRAY);
317-
SwitchCompare(datum, border.make_array(), cmps);
318-
break;
319-
case arrow::Datum::CHUNKED_ARRAY:
320-
Y_ABORT_UNLESS(border.kind() == arrow::Datum::ARRAY);
321-
SwitchCompare(datum, border.make_array(), cmps);
322-
break;
323-
case arrow::Datum::RECORD_BATCH:
324-
Y_ABORT_UNLESS(border.kind() == arrow::Datum::RECORD_BATCH);
325-
CompositeCompare(datum.record_batch(), border.record_batch(), cmps);
326-
break;
327-
case arrow::Datum::TABLE:
328-
Y_ABORT_UNLESS(border.kind() == arrow::Datum::RECORD_BATCH);
329-
CompositeCompare(datum.table(), border.record_batch(), cmps);
330-
break;
331-
default:
332-
Y_ABORT_UNLESS(false);
333-
break;
334-
}
335-
336-
std::vector<bool> bits;
337-
bits.reserve(cmps.size());
338-
339-
switch (compareType) {
340-
case ECompareType::LESS:
341-
for (size_t i = 0; i < cmps.size(); ++i) {
342-
bits.emplace_back(cmps[i] < ECompareResult::BORDER);
343-
}
344-
break;
345-
case ECompareType::LESS_OR_EQUAL:
346-
for (size_t i = 0; i < cmps.size(); ++i) {
347-
bits.emplace_back(cmps[i] <= ECompareResult::BORDER);
348-
}
349-
break;
350-
case ECompareType::GREATER:
351-
for (size_t i = 0; i < cmps.size(); ++i) {
352-
bits.emplace_back(cmps[i] > ECompareResult::BORDER);
353-
}
354-
break;
355-
case ECompareType::GREATER_OR_EQUAL:
356-
for (size_t i = 0; i < cmps.size(); ++i) {
357-
bits.emplace_back(cmps[i] >= ECompareResult::BORDER);
358-
}
359-
break;
360-
}
361-
362-
return NArrow::TColumnFilter(std::move(bits));
363-
}
364-
365189
template <class TData>
366190
bool ApplyImpl(const TColumnFilter& filter, std::shared_ptr<TData>& batch, const TColumnFilter::TApplyContext& context) {
367191
if (!batch || !batch->num_rows()) {

Diff for: ydb/core/formats/arrow/arrow_filter.h

-3
Original file line numberDiff line numberDiff line change
@@ -266,9 +266,6 @@ class TColumnFilter {
266266
TColumnFilter And(const TColumnFilter& extFilter) const Y_WARN_UNUSED_RESULT;
267267
TColumnFilter Or(const TColumnFilter& extFilter) const Y_WARN_UNUSED_RESULT;
268268

269-
// It makes a filter using composite predicate
270-
static TColumnFilter MakePredicateFilter(const arrow::Datum& datum, const arrow::Datum& border, ECompareType compareType);
271-
272269
class TApplyContext {
273270
private:
274271
YDB_READONLY_DEF(std::optional<ui32>, StartPos);

Diff for: ydb/core/formats/arrow/common/container.cpp

+22
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
#include <ydb/library/actors/core/log.h>
88
#include <ydb/library/formats/arrow/common/vector_operations.h>
9+
#include <ydb/library/formats/arrow/replace_key.h>
910
#include <ydb/library/formats/arrow/simple_arrays_cache.h>
1011

1112
namespace NKikimr::NArrow {
@@ -266,4 +267,25 @@ TConclusion<std::shared_ptr<arrow::Scalar>> IFieldsConstructor::GetDefaultColumn
266267
return TConclusionStatus::Fail("have not default value for column " + field->name());
267268
}
268269

270+
NAccessor::IChunkedArray::TRowRange TGeneralContainer::EqualRange(const arrow::RecordBatch& border) const {
271+
AFL_VERIFY(border.num_columns());
272+
AFL_VERIFY(border.num_columns() <= (i64)Columns.size())("expected", Columns.size())("actual", border.num_columns());
273+
AFL_VERIFY(border.num_rows() == 1)("rows", border.num_rows());
274+
275+
NAccessor::IChunkedArray::TRowRange range(GetRecordsCount());
276+
for (ui64 i = 0; (i64)i < border.num_columns(); ++i) {
277+
AFL_VERIFY(border.schema()->field(i)->Equals(Schema->field(i)))("expected", Schema->field(i)->ToString())(
278+
"actual", border.schema()->field(i)->ToString());
279+
const auto column = GetColumnVerified(i);
280+
const NAccessor::IChunkedArray::TReader reader(column);
281+
auto array = NAccessor::TTrivialArray::BuildArrayFromScalar(NArrow::TStatusValidator::GetValid(border.column(i)->GetScalar(0)));
282+
range = reader.EqualRange(NAccessor::IChunkedArray::TAddress(array, 0), range);
283+
if (range.Empty()) {
284+
return range;
285+
}
286+
}
287+
288+
return range;
289+
}
290+
269291
} // namespace NKikimr::NArrow

Diff for: ydb/core/formats/arrow/common/container.h

+2
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,8 @@ class TGeneralContainer {
133133

134134
std::shared_ptr<NAccessor::IChunkedArray> GetAccessorByNameOptional(const std::string& fieldId) const;
135135
std::shared_ptr<NAccessor::IChunkedArray> GetAccessorByNameVerified(const std::string& fieldId) const;
136+
137+
NAccessor::IChunkedArray::TRowRange EqualRange(const arrow::RecordBatch& border) const;
136138
};
137139

138140
} // namespace NKikimr::NArrow

Diff for: ydb/core/formats/arrow/reader/batch_iterator.h

+5-2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ namespace NKikimr::NArrow::NMerger {
66

77
class TBatchIterator {
88
private:
9+
YDB_READONLY_DEF(ui64, SourceId);
910
bool ControlPointFlag;
1011
TRWSortableBatchPosition KeyColumns;
1112
TRWSortableBatchPosition VersionColumns;
@@ -51,8 +52,10 @@ class TBatchIterator {
5152

5253
template <class TDataContainer>
5354
TBatchIterator(std::shared_ptr<TDataContainer> batch, std::shared_ptr<NArrow::TColumnFilter> filter,
54-
const std::vector<std::string>& keyColumns, const std::vector<std::string>& dataColumns, const bool reverseSort, const std::vector<std::string>& versionColumnNames)
55-
: ControlPointFlag(false)
55+
const std::vector<std::string>& keyColumns, const std::vector<std::string>& dataColumns, const bool reverseSort,
56+
const std::vector<std::string>& versionColumnNames, const ui64 sourceId)
57+
: SourceId(sourceId)
58+
, ControlPointFlag(false)
5659
, KeyColumns(batch, 0, keyColumns, dataColumns, reverseSort)
5760
, VersionColumns(batch, 0, versionColumnNames, {}, false)
5861
, RecordsCount(batch->num_rows())

0 commit comments

Comments
 (0)