Skip to content

Commit 5408e30

Browse files
committed
[#11729][DocDB][xCluster] Fix for replication not working if user upgrades to a branch with CDCSDK code changes
Summary: With the changes for CDCSDK, we have separate `source_type` values i.e. `XCLUSTER` for xCluster replication and `CDCSDK` for the new changes. Similarly there is another option i.e. `checkpoint_type` which can have `IMPLICIT` and `EXPLICIT` values. If a stream for replication has been created before upgrading, it was unable to continue replication after upgrading to the latest version since the `source_type` and `checkpoint_type` options were missing from it as it has only been introduced with the CDCSDK changes only. Test Plan: * Manually tested with a custom build on dev portal Reviewers: sergei, jhe, mkantimath, skumar, rahuldesirazu Reviewed By: skumar, rahuldesirazu Subscribers: rahuldesirazu, iamoncar, sdash, ybase, bogdan Differential Revision: https://phabricator.dev.yugabyte.com/D15989
1 parent 6b14282 commit 5408e30

File tree

2 files changed

+49
-0
lines changed

2 files changed

+49
-0
lines changed

ent/src/yb/cdc/cdc_service.cc

+17
Original file line numberDiff line numberDiff line change
@@ -646,6 +646,20 @@ CHECKED_STATUS VerifyArg(const SetCDCCheckpointRequestPB& req) {
646646
return Status::OK();
647647
}
648648

649+
// This function is to handle the upgrade scenario where the DB is upgraded from a version
650+
// without CDCSDK changes to the one with it. So in case, some required options are missing,
651+
// the default values will be added for the same.
652+
void AddDefaultOptionsIfMissing(std::unordered_map<std::string, std::string>* options) {
653+
if ((*options).find(cdc::kSourceType) == (*options).end()) {
654+
(*options).emplace(cdc::kSourceType, CDCRequestSource_Name(cdc::CDCRequestSource::XCLUSTER));
655+
}
656+
657+
if ((*options).find(cdc::kCheckpointType) == (*options).end()) {
658+
(*options).emplace(cdc::kCheckpointType,
659+
CDCCheckpointType_Name(cdc::CDCCheckpointType::IMPLICIT));
660+
}
661+
}
662+
649663
} // namespace
650664

651665
template <class ReqType, class RespType>
@@ -2107,6 +2121,9 @@ Result<std::shared_ptr<StreamMetadata>> CDCServiceImpl::GetStream(const std::str
21072121
RETURN_NOT_OK(client()->GetCDCStream(stream_id, &ns_id, &object_ids, &options));
21082122

21092123
auto stream_metadata = std::make_shared<StreamMetadata>();
2124+
2125+
AddDefaultOptionsIfMissing(&options);
2126+
21102127
for (const auto& option : options) {
21112128
if (option.first == kRecordType) {
21122129
SCHECK(CDCRecordType_Parse(option.second, &stream_metadata->record_type),

ent/src/yb/master/catalog_manager_ent.cc

+32
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,34 @@ class CDCStreamLoader : public Visitor<PersistentCDCStreamInfo> {
210210
public:
211211
explicit CDCStreamLoader(CatalogManager* catalog_manager) : catalog_manager_(catalog_manager) {}
212212

213+
void AddDefaultValuesIfMissing(const SysCDCStreamEntryPB& metadata,
214+
CDCStreamInfo::WriteLock* l) {
215+
bool source_type_present = false;
216+
bool checkpoint_type_present = false;
217+
218+
// Iterate over all the options to check if checkpoint_type and source_type are present.
219+
for (auto option : metadata.options()) {
220+
if (option.key() == cdc::kSourceType) {
221+
source_type_present = true;
222+
}
223+
if (option.key() == cdc::kCheckpointType) {
224+
checkpoint_type_present = true;
225+
}
226+
}
227+
228+
if (!source_type_present) {
229+
auto source_type_opt = l->mutable_data()->pb.add_options();
230+
source_type_opt->set_key(cdc::kSourceType);
231+
source_type_opt->set_value(cdc::CDCRequestSource_Name(cdc::XCLUSTER));
232+
}
233+
234+
if (!checkpoint_type_present) {
235+
auto checkpoint_type_opt = l->mutable_data()->pb.add_options();
236+
checkpoint_type_opt->set_key(cdc::kCheckpointType);
237+
checkpoint_type_opt->set_value(cdc::CDCCheckpointType_Name(cdc::IMPLICIT));
238+
}
239+
}
240+
213241
Status Visit(const CDCStreamId& stream_id, const SysCDCStreamEntryPB& metadata)
214242
REQUIRES(catalog_manager_->mutex_) {
215243
DCHECK(!ContainsKey(catalog_manager_->cdc_stream_map_, stream_id))
@@ -245,6 +273,10 @@ class CDCStreamLoader : public Visitor<PersistentCDCStreamInfo> {
245273
auto l = stream->LockForWrite();
246274
l.mutable_data()->pb.CopyFrom(metadata);
247275

276+
// If no source_type and checkpoint_type is present, that means the stream was created in
277+
// a previous version where these options were not present.
278+
AddDefaultValuesIfMissing(metadata, &l);
279+
248280
// If the table has been deleted, then mark this stream as DELETING so it can be deleted by the
249281
// catalog manager background thread. Otherwise if this stream is missing an entry
250282
// for state, then mark its state as Active.

0 commit comments

Comments
 (0)