Skip to content

Commit 1371944

Browse files
committed
[CDCSDK] [#9019] CDC SDK Tx/xCluster layer changes
Summary: Github Master Ticket: #9019 Design DocumentL https://docs.google.com/document/d/1_xZqU5UgzCu1W--kci3ajU7_iYXXHMQvudmybDI-Xsk/edit Functional Spec: https://docs.google.com/document/u/2/d/1nHuzHQ-qYVPbKi2dqo_drzSXMq00h7w5oi0JDf0GD1U/edit#heading=h.jmqfs7jgvvg8 - Added a new CDC Type EXTERNAL - Have added CDCSDK naming conventions to avoid confusion with common codes with cluster - Read the changes from IntentDB for UPDATE_TRANSACTION_OP op type - Batch the changes from IntentDB depending on the maximum batch size defined by cdc_max_stream_intent_records - Send CDCSDKCheckpoint with every record - CDCSDKCheckpoint will have term, index, reverse_index_key, and write_id - Mark the record as INSERT/UPDATE/DELETE depending on the type of operations that were performed. - An update of the primary key will generate two events DELETE and INSERT - UPDATE of multiple columns is 'broken' into multiple records of single column UPDATE record in case of multi-shard transactions - Send the DDL events found in the WAL to the subscriber Test Plan: We have unit tests as well as tests using the ConsoleSubscriber. - Added CPP Unit tests to verify op types INSERT/UPDATE/DELETE types - Verify the ordering of events of records - Added Java Unit tests using CDC Java Console Client, verifying - Multiple data types (To be enhanced) - Large SQL scripts with varying DMLs with the expected output - Correctness in case of the composite primary key We also have run long-running tests with failover to verify if the number of streamed are expected. Reviewers: bogdan, nicolas, rahuldesirazu, sergei Reviewed By: rahuldesirazu, sergei Subscribers: amartsinchyk, kannan, kgupta, mihnea, iamoncar, sdash, jhe, ybase, vkushwaha Differential Revision: https://phabricator.dev.yugabyte.com/D13838
1 parent 43acbf4 commit 1371944

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

83 files changed

+5208
-504
lines changed

ent/src/yb/cdc/CMakeLists-include.txt

+2-2
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ set(CDC_SRCS_EXTENSIONS
2525
${YB_ENT_CURRENT_SOURCE_DIR}/cdc_service.cc
2626
${YB_ENT_CURRENT_SOURCE_DIR}/cdc_metrics.cc
2727
${YB_ENT_CURRENT_SOURCE_DIR}/cdc_producer.cc
28-
${YB_ENT_CURRENT_SOURCE_DIR}/cdc_rpc.cc
29-
${YB_ENT_CURRENT_SOURCE_DIR}/cdc_error.cc)
28+
${YB_ENT_CURRENT_SOURCE_DIR}/cdcsdk_producer.cc
29+
${YB_ENT_CURRENT_SOURCE_DIR}/cdc_rpc.cc)
3030

3131
ADD_YB_LIBRARY(
3232
cdc

ent/src/yb/cdc/cdc_common_util.h

+101
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
// Copyright (c) YugaByte, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4+
// in compliance with the License. You may obtain a copy of the License at
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software distributed under the License
9+
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10+
// or implied. See the License for the specific language governing permissions and limitations
11+
// under the License.
12+
//
13+
14+
#ifndef ENT_SRC_YB_CDC_CDC_COMMON_UTIL_H
15+
#define ENT_SRC_YB_CDC_CDC_COMMON_UTIL_H
16+
17+
#include <string>
18+
#include <boost/functional/hash.hpp>
19+
20+
#include "yb/cdc/cdc_service.pb.h"
21+
22+
#include "yb/common/common_fwd.h"
23+
#include "yb/common/wire_protocol.h"
24+
25+
#include "yb/consensus/raft_consensus.h"
26+
#include "yb/consensus/replicate_msgs_holder.h"
27+
28+
#include "yb/docdb/docdb.h"
29+
#include "yb/docdb/primitive_value.h"
30+
#include "yb/docdb/ql_rowwise_iterator_interface.h"
31+
#include "yb/docdb/value_type.h"
32+
33+
#include "yb/tablet/tablet.h"
34+
#include "yb/tablet/tablet_metadata.h"
35+
#include "yb/tablet/tablet_peer.h"
36+
#include "yb/tablet/transaction_participant.h"
37+
38+
#include "yb/tserver/tablet_server.h"
39+
#include "yb/tserver/ts_tablet_manager.h"
40+
41+
#include "yb/util/format.h"
42+
43+
namespace yb {
44+
namespace cdc {
45+
46+
YB_STRONGLY_TYPED_BOOL(ReplicateIntents);
47+
48+
// Use boost::unordered_map instead of std::unordered_map because gcc release build
49+
// fails to compile correctly when TxnStatusMap is used with Result<> (due to what seems like
50+
// a bug in gcc where it tries to incorrectly destroy Status part of Result).
51+
typedef boost::unordered_map<TransactionId,
52+
TransactionStatusResult,
53+
TransactionIdHash> TxnStatusMap;
54+
typedef std::pair<uint64_t, size_t> RecordTimeIndex;
55+
56+
void AddColumnToMap(
57+
const ColumnSchema &col_schema,
58+
const docdb::PrimitiveValue &col,
59+
cdc::KeyValuePairPB *kv_pair);
60+
61+
void AddProtoRecordColumnToMap(
62+
const ColumnSchema &col_schema,
63+
const docdb::PrimitiveValue &col,
64+
cdc::KeyValuePairPB *kv_pair,
65+
bool is_proto_record,
66+
DatumMessagePB *cdc_datum_message = nullptr);
67+
68+
Result<bool> SetCommittedRecordIndexForReplicateMsg(
69+
const consensus::ReplicateMsgPtr &msg,
70+
size_t index,
71+
const TxnStatusMap &txn_map,
72+
ReplicateIntents replicate_intents,
73+
std::vector<RecordTimeIndex> *records);
74+
75+
Result<std::vector<RecordTimeIndex>> GetCommittedRecordIndexes(
76+
const consensus::ReplicateMsgs &msgs,
77+
const TxnStatusMap &txn_map,
78+
ReplicateIntents replicate_intents,
79+
OpId *checkpoint);
80+
81+
Result<consensus::ReplicateMsgs> FilterAndSortWrites(
82+
const consensus::ReplicateMsgs &msgs,
83+
const TxnStatusMap &txn_map,
84+
ReplicateIntents replicate_intents,
85+
OpId *checkpoint);
86+
87+
Result<TransactionStatusResult> GetTransactionStatus(
88+
const TransactionId &txn_id,
89+
const HybridTime &hybrid_time,
90+
tablet::TransactionParticipant *txn_participant);
91+
92+
Result<TxnStatusMap> BuildTxnStatusMap(
93+
const consensus::ReplicateMsgs &messages,
94+
bool more_replicate_msgs,
95+
const std::shared_ptr<tablet::TabletPeer> &tablet_peer,
96+
tablet::TransactionParticipant *txn_participant);
97+
98+
} // namespace cdc
99+
} // namespace yb
100+
101+
#endif // ENT_SRC_YB_CDC_CDC_COMMON_UTIL_H

ent/src/yb/cdc/cdc_producer.cc

+11-23
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
// under the License.
1212

1313
#include "yb/cdc/cdc_producer.h"
14+
#include "yb/cdc/cdc_common_util.h"
1415

1516
#include "yb/cdc/cdc_service.pb.h"
1617
#include "yb/common/schema.h"
@@ -49,17 +50,6 @@ using consensus::ReplicateMsgs;
4950
using docdb::PrimitiveValue;
5051
using tablet::TransactionParticipant;
5152

52-
YB_STRONGLY_TYPED_BOOL(ReplicateIntents);
53-
54-
namespace {
55-
56-
// Use boost::unordered_map instead of std::unordered_map because gcc release build
57-
// fails to compile correctly when TxnStatusMap is used with Result<> (due to what seems like
58-
// a bug in gcc where it tries to incorrectly destroy Status part of Result).
59-
typedef boost::unordered_map<
60-
TransactionId, TransactionStatusResult, TransactionIdHash> TxnStatusMap;
61-
typedef std::pair<uint64_t, size_t> RecordTimeIndex;
62-
6353
void AddColumnToMap(const ColumnSchema& col_schema,
6454
const docdb::PrimitiveValue& col,
6555
cdc::KeyValuePairPB* kv_pair) {
@@ -413,18 +403,16 @@ CHECKED_STATUS PopulateSplitOpRecord(const ReplicateMsgPtr& msg, CDCRecordPB* re
413403
return Status::OK();
414404
}
415405

416-
} // namespace
417-
418-
Status GetChanges(const std::string& stream_id,
419-
const std::string& tablet_id,
420-
const OpId& from_op_id,
421-
const StreamMetadata& stream_metadata,
422-
const std::shared_ptr<tablet::TabletPeer>& tablet_peer,
423-
const MemTrackerPtr& mem_tracker,
424-
consensus::ReplicateMsgsHolder* msgs_holder,
425-
GetChangesResponsePB* resp,
426-
int64_t* last_readable_opid_index,
427-
const CoarseTimePoint deadline) {
406+
Status GetChangesForXCluster(const std::string& stream_id,
407+
const std::string& tablet_id,
408+
const OpId& from_op_id,
409+
const StreamMetadata& stream_metadata,
410+
const std::shared_ptr<tablet::TabletPeer>& tablet_peer,
411+
const MemTrackerPtr& mem_tracker,
412+
consensus::ReplicateMsgsHolder* msgs_holder,
413+
GetChangesResponsePB* resp,
414+
int64_t* last_readable_opid_index,
415+
const CoarseTimePoint deadline) {
428416
auto replicate_intents = ReplicateIntents(GetAtomicFlag(&FLAGS_cdc_enable_replicate_intents));
429417
// Request scope on transaction participant so that transactions are not removed from participant
430418
// while RequestScope is active.

ent/src/yb/cdc/cdc_producer.h

+23-10
Original file line numberDiff line numberDiff line change
@@ -59,17 +59,30 @@ struct StreamMetadata {
5959
}
6060
};
6161

62-
CHECKED_STATUS GetChanges(const std::string& stream_id,
63-
const std::string& tablet_id,
64-
const OpId& op_id,
65-
const StreamMetadata& record,
66-
const std::shared_ptr<tablet::TabletPeer>& tablet_peer,
67-
const std::shared_ptr<MemTracker>& mem_tracker,
68-
consensus::ReplicateMsgsHolder* msgs_holder,
69-
GetChangesResponsePB* resp,
70-
int64_t* last_readable_opid_index = nullptr,
71-
const CoarseTimePoint deadline = CoarseTimePoint::max());
62+
CHECKED_STATUS GetChangesForCDCSDK(const std::string& stream_id,
63+
const std::string& tablet_id,
64+
const CDCSDKCheckpointPB& op_id,
65+
const StreamMetadata& record,
66+
const std::shared_ptr<tablet::TabletPeer>& tablet_peer,
67+
const std::shared_ptr<MemTracker>& mem_tracker,
68+
consensus::ReplicateMsgsHolder* msgs_holder,
69+
GetChangesResponsePB* resp,
70+
std::string* commit_timestamp,
71+
std::shared_ptr<Schema>* cached_schema,
72+
OpId* last_streamed_op_id,
73+
int64_t* last_readable_opid_index = nullptr,
74+
const CoarseTimePoint deadline = CoarseTimePoint::max());
7275

76+
CHECKED_STATUS GetChangesForXCluster(const std::string& stream_id,
77+
const std::string& tablet_id,
78+
const OpId& op_id,
79+
const StreamMetadata& record,
80+
const std::shared_ptr<tablet::TabletPeer>& tablet_peer,
81+
const std::shared_ptr<MemTracker>& mem_tracker,
82+
consensus::ReplicateMsgsHolder* msgs_holder,
83+
GetChangesResponsePB* resp,
84+
int64_t* last_readable_opid_index = nullptr,
85+
const CoarseTimePoint deadline = CoarseTimePoint::max());
7386
} // namespace cdc
7487
} // namespace yb
7588

0 commit comments

Comments
 (0)