Skip to content

Commit a95cd74

Browse files
An empty list of consumers for the background partition (#15889)
1 parent 8ab3977 commit a95cd74

File tree

6 files changed

+75
-2
lines changed

6 files changed

+75
-2
lines changed

Diff for: ydb/core/persqueue/partition_init.cpp

+3-2
Original file line numberDiff line numberDiff line change
@@ -400,6 +400,7 @@ void TInitInfoRangeStep::Handle(TEvKeyValue::TEvResponse::TPtr &ev, const TActor
400400
case NKikimrProto::OVERRUN: {
401401
auto& sourceIdStorage = Partition()->SourceIdStorage;
402402
auto& usersInfoStorage = Partition()->UsersInfoStorage;
403+
const bool isSupportive = Partition()->IsSupportive();
403404

404405
for (ui32 i = 0; i < range.PairSize(); ++i) {
405406
const auto& pair = range.GetPair(i);
@@ -421,9 +422,9 @@ void TInitInfoRangeStep::Handle(TEvKeyValue::TEvResponse::TPtr &ev, const TActor
421422
sourceIdStorage.LoadSourceIdInfo(*key, pair.GetValue(), now);
422423
} else if (type == TKeyPrefix::MarkProtoSourceId) {
423424
sourceIdStorage.LoadSourceIdInfo(*key, pair.GetValue(), now);
424-
} else if (type == TKeyPrefix::MarkUser) {
425+
} else if ((type == TKeyPrefix::MarkUser) && !isSupportive) {
425426
usersInfoStorage->Parse(*key, pair.GetValue(), ctx);
426-
} else if (type == TKeyPrefix::MarkUserDeprecated) {
427+
} else if ((type == TKeyPrefix::MarkUserDeprecated) && !isSupportive) {
427428
usersInfoStorage->ParseDeprecated(*key, pair.GetValue(), ctx);
428429
}
429430
}

Diff for: ydb/core/persqueue/partition_read.cpp

+4
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@ void TPartition::SendReadingFinished(const TString& consumer) {
5151
}
5252

5353
void TPartition::FillReadFromTimestamps(const TActorContext& ctx) {
54+
if (IsSupportive()) {
55+
return;
56+
}
57+
5458
TSet<TString> hasReadRule;
5559

5660
for (auto& [consumer, userInfo] : UsersInfoStorage->GetAll()) {
Binary file not shown.
Binary file not shown.

Diff for: ydb/public/sdk/cpp/src/client/topic/ut/topic_to_table_ut.cpp

+63
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
#include <library/cpp/logger/stream.h>
1818
#include <library/cpp/testing/unittest/registar.h>
19+
#include <library/cpp/streams/bzip2/bzip2.h>
1920

2021
namespace NYdb::NTopic::NTests {
2122

@@ -150,6 +151,9 @@ class TFixture : public NUnitTest::TBaseFixture {
150151
void RestartLongTxService();
151152
void RestartPQTablet(const TString& topicPath, ui32 partition);
152153
void DumpPQTabletKeys(const TString& topicName, ui32 partition);
154+
void PQTabletPrepareFromResource(const TString& topicPath,
155+
ui32 partitionId,
156+
const TString& resourceName);
153157

154158
void DeleteSupportivePartition(const TString& topicName,
155159
ui32 partition);
@@ -1857,6 +1861,51 @@ void TFixture::DumpPQTabletKeys(const TString& topicName)
18571861
}
18581862
}
18591863

1864+
void TFixture::PQTabletPrepareFromResource(const TString& topicPath,
1865+
ui32 partitionId,
1866+
const TString& resourceName)
1867+
{
1868+
auto& runtime = Setup->GetRuntime();
1869+
TActorId edge = runtime.AllocateEdgeActor();
1870+
ui64 tabletId = GetTopicTabletId(edge, "/Root/" + topicPath, partitionId);
1871+
1872+
auto request = MakeHolder<TEvKeyValue::TEvRequest>();
1873+
size_t count = 0;
1874+
1875+
for (TStringStream stream(NResource::Find(resourceName)); true; ++count) {
1876+
TString key, encoded;
1877+
1878+
if (!stream.ReadTo(key, ' ')) {
1879+
break;
1880+
}
1881+
encoded = stream.ReadLine();
1882+
1883+
auto decoded = Base64Decode(encoded);
1884+
TStringInput decodedStream(decoded);
1885+
TBZipDecompress decompressor(&decodedStream);
1886+
1887+
auto* cmd = request->Record.AddCmdWrite();
1888+
cmd->SetKey(key);
1889+
cmd->SetValue(decompressor.ReadAll());
1890+
}
1891+
1892+
runtime.SendToPipe(tabletId, edge, request.Release(), 0, GetPipeConfigWithRetries());
1893+
1894+
TAutoPtr<IEventHandle> handle;
1895+
auto* response = runtime.GrabEdgeEvent<TEvKeyValue::TEvResponse>(handle);
1896+
UNIT_ASSERT(response);
1897+
UNIT_ASSERT(response->Record.HasStatus());
1898+
UNIT_ASSERT_EQUAL(response->Record.GetStatus(), NMsgBusProxy::MSTATUS_OK);
1899+
1900+
UNIT_ASSERT_VALUES_EQUAL(response->Record.WriteResultSize(), count);
1901+
1902+
for (size_t i = 0; i < response->Record.WriteResultSize(); ++i) {
1903+
const auto &result = response->Record.GetWriteResult(i);
1904+
UNIT_ASSERT(result.HasStatus());
1905+
UNIT_ASSERT_EQUAL(result.GetStatus(), NKikimrProto::OK);
1906+
}
1907+
}
1908+
18601909
void TFixture::TestTheCompletionOfATransaction(const TTransactionCompletionTestDescription& d)
18611910
{
18621911
for (auto& topic : d.Topics) {
@@ -3207,6 +3256,20 @@ Y_UNIT_TEST_F(Transactions_Conflict_On_SeqNo, TFixture)
32073256
UNIT_ASSERT_VALUES_UNEQUAL(successCount, TXS_COUNT);
32083257
}
32093258

3259+
Y_UNIT_TEST_F(The_Transaction_Starts_On_One_Version_And_Ends_On_The_Other, TFixture)
3260+
{
3261+
// In the test, we check the compatibility between versions `24-4-2` and `24-4-*/25-1-*`. To do this, the data
3262+
// obtained on the `24-4-2` version is loaded into the PQ tablets.
3263+
3264+
CreateTopic("topic_A", TEST_CONSUMER, 2);
3265+
3266+
PQTabletPrepareFromResource("topic_A", 0, "topic_A_partition_0_v24-4-2.dat");
3267+
PQTabletPrepareFromResource("topic_A", 1, "topic_A_partition_1_v24-4-2.dat");
3268+
3269+
RestartPQTablet("topic_A", 0);
3270+
RestartPQTablet("topic_A", 1);
3271+
}
3272+
32103273
}
32113274

32123275
}

Diff for: ydb/public/sdk/cpp/src/client/topic/ut/ya.make

+5
Original file line numberDiff line numberDiff line change
@@ -40,4 +40,9 @@ SRCS(
4040
trace_ut.cpp
4141
)
4242

43+
RESOURCE(
44+
ydb/public/sdk/cpp/src/client/topic/ut/resources/topic_A_partition_0_v24-4-2.dat topic_A_partition_0_v24-4-2.dat
45+
ydb/public/sdk/cpp/src/client/topic/ut/resources/topic_A_partition_1_v24-4-2.dat topic_A_partition_1_v24-4-2.dat
46+
)
47+
4348
END()

0 commit comments

Comments
 (0)