Skip to content

Commit ab139ad

Browse files
committed
scheduler
1 parent 9b364a6 commit ab139ad

File tree

17 files changed

+433
-226
lines changed

17 files changed

+433
-226
lines changed

Diff for: ydb/core/kqp/ut/olap/kqp_olap_ut.cpp

+3-2
Original file line numberDiff line numberDiff line change
@@ -911,6 +911,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
911911
auto settings = TKikimrSettings()
912912
.SetWithSampleTables(false);
913913
TKikimrRunner kikimr(settings);
914+
kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::TX_COLUMNSHARD_SCAN, NActors::NLog::PRI_DEBUG);
914915

915916
TLocalHelper(kikimr).CreateTestOlapTable();
916917
auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();
@@ -947,7 +948,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
947948
UNIT_ASSERT(results.erase(ts.GetValue()));
948949
tsPrev = ts;
949950
}
950-
UNIT_ASSERT(rows.size() == 4);
951+
UNIT_ASSERT_VALUES_EQUAL(rows.size(), 4);
951952
}
952953

953954
Y_UNIT_TEST(ExtractRangesSimpleLimit) {
@@ -989,7 +990,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
989990
UNIT_ASSERT(results.erase(ts.GetValue()));
990991
tsPrev = ts;
991992
}
992-
UNIT_ASSERT(rows.size() == 1);
993+
UNIT_ASSERT_VALUES_EQUAL(rows.size(), 1);
993994
}
994995

995996
Y_UNIT_TEST(ExtractRanges) {

Diff for: ydb/core/tx/columnshard/engines/reader/actor/actor.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ bool TColumnShardScan::ProduceResults() noexcept {
243243
Result->LastKey = ConvertLastKey(CurrentLastReadKey->GetPKCursor());
244244
Result->LastCursorProto = CurrentLastReadKey->SerializeToProto();
245245
SendResult(false, false);
246-
ScanIterator->OnSentDataFromInterval(result.GetNotFinishedIntervalIdx());
246+
ScanIterator->OnSentDataFromInterval(result.GetNextIntervalHint());
247247
ACFL_DEBUG("stage", "finished")("iterator", ScanIterator->DebugString());
248248
return true;
249249
}

Diff for: ydb/core/tx/columnshard/engines/reader/common/result.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ TPartialReadResult::TPartialReadResult(const std::vector<std::shared_ptr<NGroupe
6060
, GroupGuard(gGuard)
6161
, ResultBatch(batch)
6262
, ScanCursor(scanCursor)
63-
, NotFinishedIntervalIdx(notFinishedIntervalIdx)
63+
, NextIntervalHint(notFinishedIntervalIdx)
6464
, Guard(TValidator::CheckNotNull(context)->GetCounters().GetResultsForReplyGuard()) {
6565
Y_ABORT_UNLESS(ResultBatch.GetRecordsCount());
6666
Y_ABORT_UNLESS(ScanCursor);

Diff for: ydb/core/tx/columnshard/engines/reader/common/result.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ class TPartialReadResult: public TNonCopyable {
2222
// This 1-row batch contains the last key that was read while producing the ResultBatch.
2323
// NOTE: it might be different from the Key of last row in ResulBatch in case of filtering/aggregation/limit
2424
std::shared_ptr<IScanCursor> ScanCursor;
25-
YDB_READONLY_DEF(std::optional<ui32>, NotFinishedIntervalIdx);
25+
YDB_READONLY_DEF(std::optional<ui32>, NextIntervalHint);
2626
const NColumnShard::TCounterGuard Guard;
2727

2828
public:

Diff for: ydb/core/tx/columnshard/engines/reader/duplicates/manager.cpp

+39-22
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
#include "manager.h"
2+
// TODO: move to simple_reader/
23

34
#include <ydb/core/tx/columnshard/engines/reader/duplicates/merge.h>
5+
#include <ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.h>
6+
#include <ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.h>
47
#include <ydb/core/tx/conveyor/usage/service.h>
58

69
#include <bit>
@@ -35,14 +38,14 @@ void TIntervalCounter::PropagateDelta(const TPosition& node) {
3538

3639
void TIntervalCounter::Update(const TPosition& node, const TModification& modification, TZerosCollector* callback) {
3740
if (modification.GetLeft() <= node.GetLeft() && modification.GetRight() >= node.GetRight()) {
38-
if (callback) {
39-
callback->OnUpdate(node.GetLeft(), node.GetRight(), GetCount(node), modification.GetDelta());
40-
}
4141
if (node.GetLeft() == node.GetRight()) {
4242
Count[node.GetIndex()] += modification.GetDelta();
4343
} else {
4444
PropagatedDeltas[node.GetIndex()] += modification.GetDelta();
4545
}
46+
if (callback) {
47+
callback->OnUpdate(node, GetCount(node), modification.GetDelta());
48+
}
4649
} else {
4750
PropagateDelta(node.GetIndex());
4851
if (modification.GetLeft() <= node.LeftChild().GetRight()) {
@@ -55,7 +58,7 @@ void TIntervalCounter::Update(const TPosition& node, const TModification& modifi
5558
}
5659

5760
void TIntervalCounter::Inc(const ui32 l, const ui32 r) {
58-
Update(TPosition(MaxIndex), TModification(l, r, 1), nullptr);
61+
Update(GetRoot(), TModification(l, r, 1), nullptr);
5962
}
6063

6164
TIntervalCounter::TIntervalCounter(const std::vector<std::pair<ui32, ui32>>& intervals) {
@@ -67,21 +70,21 @@ TIntervalCounter::TIntervalCounter(const std::vector<std::pair<ui32, ui32>>& int
6770
}
6871
}
6972
MaxIndex = std::bit_ceil(maxValue);
70-
Count.resize(MaxIndex * 2 - 1);
71-
PropagatedDeltas.resize(MaxIndex * 2 - 1);
73+
Count.resize(MaxIndex * 2 + 1);
74+
PropagatedDeltas.resize(MaxIndex * 2 + 1);
7275

7376
for (const auto& [l, r] : intervals) {
7477
Inc(l, r);
7578
}
7679
}
7780

7881
bool TIntervalCounter::IsAllZeros() const {
79-
return GetCount(TPosition(MaxIndex)) == 0;
82+
return GetCount(GetRoot()) == 0;
8083
}
8184

8285
std::vector<ui32> TIntervalCounter::DecAndGetZeros(const ui32 l, const ui32 r) {
8386
TZerosCollector callback;
84-
Update(TPosition(MaxIndex), TModification(l, r, -1), &callback);
87+
Update(GetRoot(), TModification(l, r, -1), &callback);
8588
return callback.ExtractValues();
8689
}
8790

@@ -160,20 +163,30 @@ TDuplicateFilterConstructor::TSourceIntervals::TSourceIntervals(const std::vecto
160163
}
161164

162165
void TDuplicateFilterConstructor::Handle(const TEvRequestFilter::TPtr& ev) {
166+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "request_duplicates_filter")("source_id", ev->Get()->GetSource()->GetSourceId());
167+
// TODO: handle step restarts (completely avoid them or fix verify here)
163168
// TODO: add counter: max volume of redundant memory for merges
164169
const auto& source = ev->Get()->GetSource();
165170
TIntervalsRange range = Intervals.GetRangeVerified(source->GetSourceId());
166-
AFL_VERIFY(AvailableSources.emplace(source->GetSourceId(), TSourceFilterConstructor(source, ev->Get()->GetSubscriber(), Intervals)).second);
171+
auto [constructionInfo, emplaced] =
172+
AvailableSources.emplace(source->GetSourceId(), TSourceFilterConstructor(source, ev->Get()->GetSubscriber(), Intervals));
173+
AFL_VERIFY(emplaced);
167174
AvailableSourcesCount.AddRange(range.GetFirstIdx(), range.GetLastIdx(), source->GetSourceId());
168175
std::vector<ui32> readyIntervals = AwaitedSourcesCount.DecAndGetZeros(range.GetFirstIdx(), range.GetLastIdx());
169176

177+
if (readyIntervals.size() != range.NumIntervals()) {
178+
AFL_VERIFY(readyIntervals.size() < range.NumIntervals());
179+
constructionInfo->second.SetBlockGuard(NSimple::TSourceFetchingScheduler::SetBlocked(
180+
source->GetSourceIdx(), source->GetContextAsVerified<NSimple::TSpecialReadContext>()->GetScheduler()));
181+
}
182+
170183
for (const ui32 intervalIdx : readyIntervals) {
171184
auto sourceIds = AvailableSourcesCount.FindIntersections(intervalIdx);
172185
AFL_VERIFY(sourceIds.size());
173186
const std::shared_ptr<NCommon::TSpecialReadContext> readContext =
174187
TValidator::CheckNotNull(AvailableSources.FindPtr(sourceIds.front()))->GetSource()->GetContext();
175-
const std::shared_ptr<TBuildDuplicateFilters> task =
176-
std::make_shared<TBuildDuplicateFilters>(readContext->GetReadMetadata()->GetReplaceKey(), IIndexInfo::GetSnapshotColumnNames());
188+
const std::shared_ptr<TBuildDuplicateFilters> task = std::make_shared<TBuildDuplicateFilters>(
189+
readContext->GetReadMetadata()->GetReplaceKey(), IIndexInfo::GetSnapshotColumnNames(), intervalIdx, SelfId());
177190
for (const ui64 sourceId : sourceIds) {
178191
const TSourceFilterConstructor* constructionInfo = AvailableSources.FindPtr(sourceId);
179192
AFL_VERIFY(constructionInfo)("source", sourceId);
@@ -184,27 +197,31 @@ void TDuplicateFilterConstructor::Handle(const TEvRequestFilter::TPtr& ev) {
184197
std::make_shared<NArrow::TGeneralContainer>(source->GetStageData()
185198
.ToGeneralContainer(source->GetContext()->GetCommonContext()->GetResolver())
186199
->Slice(intervalRange.GetBegin(), intervalRange.Size()));
187-
task->AddSource(slice, source->GetStageData().GetNotAppliedFilter(),
188-
std::make_shared<TInternalFilterSubscriber>(intervalIdx, source->GetSourceId(), SelfId()));
200+
task->AddSource(slice, source->GetStageData().GetNotAppliedFilter(), source->GetSourceId());
189201
}
190202
NConveyor::TScanServiceOperator::SendTaskToExecute(task, readContext->GetCommonContext()->GetConveyorProcessId());
191203
}
192204
}
193205

194206
void TDuplicateFilterConstructor::Handle(const TEvDuplicateFilterPartialResult::TPtr& ev) {
195207
if (ev->Get()->GetResult().IsFail()) {
208+
AFL_INFO(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "interval_merging_error")("error", ev->Get()->GetResult().GetErrorMessage());
196209
AbortConstruction(ev->Get()->GetResult().GetErrorMessage());
197210
return;
198211
}
199-
TSourceFilterConstructor* constructor = AvailableSources.FindPtr(ev->Get()->GetSourceId());
200-
AFL_VERIFY(constructor)("portion", ev->Get()->GetSourceId());
201-
// TODO: avoid copying filters
202-
constructor->SetFilter(ev->Get()->GetIntervalIdx(), ev->Get()->ExtractResult().DetachResult());
203-
if (constructor->IsReady()) {
204-
std::move(*constructor).Finish();
205-
AFL_VERIFY(AvailableSources.erase(ev->Get()->GetSourceId()));
206-
if (AvailableSources.empty() && AwaitedSourcesCount.IsAllZeros()) {
207-
PassAway();
212+
for (auto&& [sourceId, filter] : ev->Get()->DetachResult()) {
213+
TSourceFilterConstructor* constructor = AvailableSources.FindPtr(sourceId);
214+
AFL_VERIFY(constructor)("portion", sourceId);
215+
// TODO: avoid copying filters
216+
constructor->SetFilter(ev->Get()->GetIntervalIdx(), std::move(filter));
217+
if (constructor->IsReady()) {
218+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "build_duplicates_filter")(
219+
"source_id", constructor->GetSource()->GetSourceId());
220+
std::move(*constructor).Finish();
221+
AFL_VERIFY(AvailableSources.erase(sourceId));
222+
if (AvailableSources.empty() && AwaitedSourcesCount.IsAllZeros()) {
223+
PassAway();
224+
}
208225
}
209226
}
210227
}

Diff for: ydb/core/tx/columnshard/engines/reader/duplicates/manager.h

+32-48
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,30 @@
33
#include "events.h"
44

55
#include <ydb/core/tx/columnshard/engines/portions/portion_info.h>
6+
#include <ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scheduler.h>
67

78
#include <ydb/library/actors/core/actor_bootstrapped.h>
89

910
namespace NKikimr::NOlap::NReader {
1011

12+
class TEvDuplicateFilterPartialResult
13+
: public NActors::TEventLocal<TEvDuplicateFilterPartialResult, NColumnShard::TEvPrivate::EvDuplicateFilterPartialResult> {
14+
private:
15+
using TFilterBySourceId = THashMap<ui64, NArrow::TColumnFilter>;
16+
YDB_READONLY(TConclusion<TFilterBySourceId>, Result, TFilterBySourceId());
17+
YDB_READONLY_DEF(ui32, IntervalIdx);
18+
19+
public:
20+
TEvDuplicateFilterPartialResult(TConclusion<TFilterBySourceId>&& result, const ui32 intervalIdx)
21+
: Result(std::move(result))
22+
, IntervalIdx(intervalIdx) {
23+
}
24+
25+
TFilterBySourceId&& DetachResult() {
26+
return Result.DetachResult();
27+
}
28+
};
29+
1130
class TRangeIndex {
1231
private:
1332
// TODO: optimize implementation
@@ -75,10 +94,10 @@ class TIntervalCounter {
7594
std::vector<ui32> FormerOnes;
7695

7796
public:
78-
void OnUpdate(const ui32 l, const ui32 r, const ui64 formerValue, const i64 delta) {
97+
void OnUpdate(const TPosition& node, const ui64 newValue, const i64 delta) {
7998
AFL_VERIFY(delta == -1);
80-
if (formerValue == 1) {
81-
for (ui32 i = l; i <= r; ++i) {
99+
if (newValue == 0) {
100+
for (ui32 i = node.GetLeft(); i <= node.GetRight(); ++i) {
82101
FormerOnes.emplace_back(i);
83102
}
84103
}
@@ -89,7 +108,7 @@ class TIntervalCounter {
89108
}
90109
};
91110

92-
// Segment tree: Count[i] = Count[i * 2 + 1] + PropagatedDeltas[i * 2 + 1] * intervalSize(i) + Count[i * 2 + 2] + PropagateDelta[i * 2 + 2] * intervalSize(i)
111+
// Segment tree: Count[i] = GetCount(i * 2 + 1) + GetCount(i * 2 + 2)
93112
std::vector<ui64> Count;
94113
std::vector<i64> PropagatedDeltas;
95114
ui32 MaxIndex = 0;
@@ -104,10 +123,12 @@ class TIntervalCounter {
104123
AFL_VERIFY(PropagatedDeltas[node.GetIndex()] * node.IntervalSize() >= (i64)Count[node.GetIndex()]);
105124
return Count[node.GetIndex()] + PropagatedDeltas[node.GetIndex()] * node.IntervalSize();
106125
}
126+
TPosition GetRoot() const {
127+
return TPosition(MaxIndex);
128+
}
107129

108130
public:
109131
TIntervalCounter(const std::vector<std::pair<ui32, ui32>>& intervals);
110-
111132
std::vector<ui32> DecAndGetZeros(const ui32 l, const ui32 r);
112133
bool IsAllZeros() const;
113134
};
@@ -165,6 +186,7 @@ class TDuplicateFilterConstructor: public NActors::TActor<TDuplicateFilterConstr
165186
std::shared_ptr<IFilterSubscriber> Subscriber;
166187
std::vector<ui64> IntervalOffsets;
167188
ui64 ReadyFilterCount = 0;
189+
std::optional<NSimple::ISourceFetchingScheduler::TSourceBlockedGuard> BlockGuard;
168190

169191
public:
170192
TSourceFilterConstructor(const std::shared_ptr<NCommon::IDataSource>& source, const std::shared_ptr<IFilterSubscriber>& subscriber,
@@ -202,51 +224,13 @@ class TDuplicateFilterConstructor: public NActors::TActor<TDuplicateFilterConstr
202224
}
203225
}
204226

205-
void Finish() &&;
206-
void AbortConstruction(const TString& reason) &&;
207-
};
208-
209-
class TEvDuplicateFilterPartialResult
210-
: public NActors::TEventLocal<TEvDuplicateFilterPartialResult, NColumnShard::TEvPrivate::EvDuplicateFilterPartialResult> {
211-
private:
212-
YDB_READONLY(TConclusion<NArrow::TColumnFilter>, Result, NArrow::TColumnFilter::BuildAllowFilter());
213-
YDB_READONLY_DEF(ui32, IntervalIdx);
214-
YDB_READONLY_DEF(ui64, SourceId);
215-
216-
public:
217-
TEvDuplicateFilterPartialResult(TConclusion<NArrow::TColumnFilter>&& result, const ui32 intervalIdx, const ui64 sourceId)
218-
: Result(std::move(result))
219-
, IntervalIdx(intervalIdx)
220-
, SourceId(sourceId) {
221-
}
222-
223-
TConclusion<NArrow::TColumnFilter>&& ExtractResult() {
224-
return std::move(Result);
227+
void SetBlockGuard(NSimple::ISourceFetchingScheduler::TSourceBlockedGuard&& guard) {
228+
AFL_VERIFY(!BlockGuard);
229+
BlockGuard.emplace(std::move(guard));
225230
}
226-
};
227-
228-
class TInternalFilterSubscriber: public IFilterSubscriber {
229-
private:
230-
ui32 IntervalIdx;
231-
ui32 SourceId;
232-
TActorId Owner;
233231

234-
virtual void OnFilterReady(const NArrow::TColumnFilter& result) override {
235-
TActorContext::AsActorContext().Send(
236-
Owner, new TDuplicateFilterConstructor::TEvDuplicateFilterPartialResult(result, IntervalIdx, SourceId));
237-
}
238-
239-
virtual void OnFailure(const TString& reason) override {
240-
TActorContext::AsActorContext().Send(
241-
Owner, new TDuplicateFilterConstructor::TEvDuplicateFilterPartialResult(TConclusionStatus::Fail(reason), IntervalIdx, SourceId));
242-
}
243-
244-
public:
245-
TInternalFilterSubscriber(const ui32 intervalIdx, const ui64 sourceId, const TActorId& owner)
246-
: IntervalIdx(intervalIdx)
247-
, SourceId(sourceId)
248-
, Owner(owner) {
249-
}
232+
void Finish() &&;
233+
void AbortConstruction(const TString& reason) &&;
250234
};
251235

252236
private:
+30-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,32 @@
1+
#include "manager.h"
12
#include "merge.h"
23

3-
namespace NKikimr::NOlap::NReader {}
4+
namespace NKikimr::NOlap::NReader {
5+
6+
TConclusionStatus TBuildDuplicateFilters::DoExecute(const std::shared_ptr<ITask>& /*taskPtr*/) {
7+
NArrow::NMerger::TMergePartialStream merger(PKSchema, nullptr, false, VersionColumnNames);
8+
for (ui64 i = 0; i < Sources.size(); ++i) {
9+
const auto& source = Sources[i];
10+
merger.AddSource(source.GetData(), source.GetFilter(), i);
11+
}
12+
TFiltersBuilder filtersBuilder(Sources.size());
13+
merger.DrainAll(filtersBuilder);
14+
std::vector<NArrow::TColumnFilter> filters = std::move(filtersBuilder).ExtractFilters();
15+
AFL_VERIFY(filters.size() == Sources.size());
16+
// TODO: avoid copying filters
17+
THashMap<ui64, NArrow::TColumnFilter> result;
18+
for (ui64 i = 0; i < filters.size(); ++i) {
19+
AFL_VERIFY(Sources[i].GetData()->GetRecordsCount() == filters[i].GetRecordsCountVerified())(
20+
"data", Sources[i].GetData()->GetRecordsCount())(
21+
"filter", filters[i].GetRecordsCountVerified());
22+
result.emplace(Sources[i].GetSourceId(), std::move(filters[i]));
23+
}
24+
TActorContext::AsActorContext().Send(Owner, new TEvDuplicateFilterPartialResult(std::move(result), IntervalIdx));
25+
return TConclusionStatus::Success();
26+
}
27+
28+
void TBuildDuplicateFilters::DoOnCannotExecute(const TString& reason) {
29+
TActorContext::AsActorContext().Send(Owner, new TEvDuplicateFilterPartialResult(TConclusionStatus::Fail(reason), IntervalIdx));
30+
}
31+
32+
} // namespace NKikimr::NOlap::NReader

0 commit comments

Comments
 (0)