Skip to content

Commit cf5fead

Browse files
committed
[#11779][CDCSDK] Add option to send a DDL record based on a flag value in GetChangesRequest
Summary: Before this, the issue was that if for a stream ID, some data was consumed and a client comes up with the same stream ID and requests for changes, it will only receive the changes. Now the issue with this was with `Debezium` that when the connector was restarted, it directly received the changes without any DDL record, this DDL record was essential for Debezium since it was used to process the schema info for the columns in Debezium and in case it was not there, it lead to a `NullPointerException` on the client side, thus causing a connector crash effectively. Test Plan: Tested the complete CDC with Debezium pipeline with the specified change. Command to run test: `ybd --cxx-test integration-tests_cdcsdk_ysql-test --gtest_filter CDCSDKYsqlTest.TestNeedSchemaInfoFlag` Reviewers: rahuldesirazu, jhe, skumar, sergei Reviewed By: sergei Subscribers: iamoncar, sdash, ybase Differential Revision: https://phabricator.dev.yugabyte.com/D16057
1 parent 7898813 commit cf5fead

File tree

9 files changed

+133
-13
lines changed

9 files changed

+133
-13
lines changed

ent/src/yb/cdc/cdc_service.cc

+6-2
Original file line numberDiff line numberDiff line change
@@ -257,11 +257,15 @@ class CDCServiceImpl::Impl {
257257
it->last_streamed_op_id = op_id;
258258
}
259259

260-
std::shared_ptr<Schema> GetOrAddSchema(const ProducerTabletInfo& producer_tablet) {
260+
std::shared_ptr<Schema> GetOrAddSchema(const ProducerTabletInfo &producer_tablet,
261+
const bool need_schema_info) {
261262
std::lock_guard<decltype(mutex_)> l(mutex_);
262263
auto it = cdc_state_metadata_.find(producer_tablet);
263264

264265
if (it != cdc_state_metadata_.end()) {
266+
if (need_schema_info) {
267+
it->current_schema = std::make_shared<Schema>();
268+
}
265269
return it->current_schema;
266270
}
267271
CDCStateMetadataInfo info = CDCStateMetadataInfo {
@@ -1116,7 +1120,7 @@ void CDCServiceImpl::GetChanges(const GetChangesRequestPB* req,
11161120
std::string commit_timestamp;
11171121
OpId last_streamed_op_id;
11181122

1119-
auto cached_schema = impl_->GetOrAddSchema(producer_tablet);
1123+
auto cached_schema = impl_->GetOrAddSchema(producer_tablet, req->need_schema_info());
11201124
s = cdc::GetChangesForCDCSDK(
11211125
req->stream_id(), req->tablet_id(), cdc_sdk_op_id, record, tablet_peer, mem_tracker,
11221126
&msgs_holder, resp, &commit_timestamp, &cached_schema,

ent/src/yb/integration-tests/cdcsdk_ysql-test.cc

+88-1
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ class CDCSDKYsqlTest : public CDCSDKTestBase {
123123
return resp.cluster_config().cluster_uuid();
124124
}
125125

126-
// the range is exclusive of end i.e. [start, end)
126+
// The range is exclusive of end i.e. [start, end)
127127
void WriteRows(uint32_t start, uint32_t end, Cluster* cluster) {
128128
auto conn = EXPECT_RESULT(cluster->ConnectToDB(kNamespaceName));
129129
LOG(INFO) << "Writing " << end - start << " row(s)";
@@ -159,6 +159,18 @@ class CDCSDKYsqlTest : public CDCSDKTestBase {
159159
change_req->mutable_from_cdc_sdk_checkpoint()->set_write_id(0);
160160
}
161161

162+
void PrepareChangeRequest(
163+
GetChangesRequestPB* change_req, const CDCStreamId& stream_id,
164+
const google::protobuf::RepeatedPtrField<master::TabletLocationsPB>& tablets,
165+
const CDCSDKCheckpointPB& cp) {
166+
change_req->set_stream_id(stream_id);
167+
change_req->set_tablet_id(tablets.Get(0).tablet_id());
168+
change_req->mutable_from_cdc_sdk_checkpoint()->set_index(cp.index());
169+
change_req->mutable_from_cdc_sdk_checkpoint()->set_term(cp.term());
170+
change_req->mutable_from_cdc_sdk_checkpoint()->set_key(cp.key());
171+
change_req->mutable_from_cdc_sdk_checkpoint()->set_write_id(cp.write_id());
172+
}
173+
162174
void PrepareSetCheckpointRequest(
163175
SetCDCCheckpointRequestPB* set_checkpoint_req,
164176
const CDCStreamId stream_id,
@@ -253,6 +265,44 @@ class CDCSDKYsqlTest : public CDCSDKTestBase {
253265
LOG(INFO) << "Got " << ins_count << " insert records";
254266
ASSERT_EQ(expected_records_size, ins_count);
255267
}
268+
269+
Result<GetChangesResponsePB> VerifyIfDDLRecordPresent(
270+
const CDCStreamId& stream_id,
271+
const google::protobuf::RepeatedPtrField<master::TabletLocationsPB>& tablets,
272+
bool expect_ddl_record, bool is_first_call, const CDCSDKCheckpointPB* cp = nullptr) {
273+
GetChangesRequestPB req;
274+
GetChangesResponsePB resp;
275+
276+
if (cp == nullptr) {
277+
PrepareChangeRequest(&req, stream_id, tablets);
278+
} else {
279+
PrepareChangeRequest(&req, stream_id, tablets, *cp);
280+
}
281+
282+
// The default value for need_schema_info is false.
283+
if (expect_ddl_record) {
284+
req.set_need_schema_info(true);
285+
}
286+
287+
RpcController get_changes_rpc;
288+
RETURN_NOT_OK(cdc_proxy_->GetChanges(req, &resp, &get_changes_rpc));
289+
290+
if (resp.has_error()) {
291+
return StatusFromPB(resp.error().status());
292+
}
293+
294+
auto record = resp.cdc_sdk_proto_records(0);
295+
296+
// If it's the first call to GetChanges, we will get a DDL record irrespective of the
297+
// value of need_schema_info.
298+
if (is_first_call || expect_ddl_record) {
299+
EXPECT_EQ(record.row_message().op(), RowMessage::DDL);
300+
} else {
301+
EXPECT_NE(record.row_message().op(), RowMessage::DDL);
302+
}
303+
304+
return resp;
305+
}
256306
};
257307

258308
TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestBaseFunctions)) {
@@ -328,6 +378,43 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(MultiRowInsertion)) {
328378
LOG(INFO) << "Got " << ins_count << " insert records";
329379
ASSERT_EQ(expected_records_size, ins_count);
330380
}
381+
382+
TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestNeedSchemaInfoFlag)) {
383+
ASSERT_OK(SetUpWithParams(3, 1, false));
384+
385+
auto table = ASSERT_RESULT(CreateTable(&test_cluster_, kNamespaceName, kTableName));
386+
387+
google::protobuf::RepeatedPtrField<master::TabletLocationsPB> tablets;
388+
ASSERT_OK(test_client()->GetTablets(
389+
table, 0, &tablets, /* partition_list_version = */ nullptr));
390+
391+
std::string table_id = ASSERT_RESULT(GetTableId(&test_cluster_, kNamespaceName, kTableName));
392+
CDCStreamId stream_id = ASSERT_RESULT(CreateDBStream());
393+
394+
ASSERT_OK(SetInitialCheckpoint(stream_id, tablets));
395+
396+
// This will write one row with PK = 0.
397+
WriteRows(0 /* start */, 1 /* end */, &test_cluster_);
398+
399+
// This is the first call to GetChanges, we will get a DDL record.
400+
GetChangesResponsePB resp = ASSERT_RESULT(VerifyIfDDLRecordPresent(stream_id, tablets, false,
401+
true));
402+
403+
// Write another row to the database with PK = 1.
404+
WriteRows(1 /* start */, 2 /* end */, &test_cluster_);
405+
406+
// We will not get any DDL record here since this is not the first call and the flag
407+
// need_schema_info is also unset.
408+
resp = ASSERT_RESULT(VerifyIfDDLRecordPresent(stream_id, tablets, false, false,
409+
&resp.cdc_sdk_checkpoint()));
410+
411+
// Write another row to the database with PK = 2.
412+
WriteRows(2 /* start */, 3 /* end */, &test_cluster_);
413+
414+
// We will get a DDL record since we have enabled the need_schema_info flag.
415+
resp = ASSERT_RESULT(VerifyIfDDLRecordPresent(stream_id, tablets, true, false,
416+
&resp.cdc_sdk_checkpoint()));
417+
}
331418
} // namespace enterprise
332419
} // namespace cdc
333420
} // namespace yb

java/yb-cdc/src/main/java/org/yb/cdc/ConcurrentPoller.java

+10-1
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ public class ConcurrentPoller {
4949

5050
YBClient synClient;
5151

52+
// We need the schema information in a DDL the very first time we send a getChanges request.
53+
boolean needSchemaInfo = true;
54+
5255
public ConcurrentPoller(YBClient synClient,
5356
AsyncYBClient client,
5457
OutputClient outputClient,
@@ -111,6 +114,7 @@ public void poll() throws Exception {
111114
final List result = new ArrayList();
112115
queue.addAll(listTabletIdTableIdPair);
113116
queue.add(END_PAIR);
117+
114118
while (true) {
115119
if (stopExecution) {
116120
// This signals the CDCConsoleSubscriber to stop polling further and exit.
@@ -134,12 +138,17 @@ public void poll() throws Exception {
134138

135139
Deferred<GetChangesResponse> response = asyncYBClient.getChangesCDCSDK(
136140
table, streamId, entry.getKey() /*tabletId*/,
137-
cp.getTerm(), cp.getIndex(), cp.getKey(), cp.getWriteId(), cp.getSnapshotTime());
141+
cp.getTerm(), cp.getIndex(), cp.getKey(), cp.getWriteId(), cp.getSnapshotTime(),
142+
needSchemaInfo);
143+
144+
// Once we got the response, we do not need the schema in further calls so unset the flag.
145+
needSchemaInfo = false;
138146

139147
response.addCallback(resCallback);
140148
response.addErrback(errCallback);
141149

142150
deferredList.add(response);
151+
143152
}
144153

145154
AtomicInteger totalException = new AtomicInteger();

java/yb-cdc/src/test/java/org/yb/cdc/TestGetChanges.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,8 @@ public void testGettingChangesWithNegativeIndex() {
7777
// An exception would be thrown for an index less than 0.
7878
try {
7979
GetChangesResponse changesResponse =
80-
myClient.getChangesCDCSDK(table, dbStreamId, tabletId, 0, -1, new byte[]{}, 0, 0L);
80+
myClient.getChangesCDCSDK(
81+
table, dbStreamId, tabletId, 0, -1, new byte[]{}, 0, 0L, false);
8182
} catch (Exception e) {
8283
exceptionThrown = true;
8384
break;

java/yb-cdc/src/test/java/org/yb/cdc/util/CDCSubscriber.java

+14-2
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ public class CDCSubscriber {
4545

4646
private Checkpoint checkpoint;
4747

48+
private boolean needSchemaInfo = false;
49+
4850
/**
4951
* This is the default number of tablets as specified in AsyncYBClient
5052
* @see AsyncYBClient
@@ -88,6 +90,14 @@ public void setTableName(String tableName) {
8890
this.tableName = tableName;
8991
}
9092

93+
public boolean shouldSendSchema() {
94+
return needSchemaInfo;
95+
}
96+
97+
public void setNeedSchemaInfo(boolean needSchemaInfo) {
98+
this.needSchemaInfo = needSchemaInfo;
99+
}
100+
91101
public void setNumberOfTablets(int numberOfTablets) {
92102
this.numberOfTablets = numberOfTablets;
93103
}
@@ -461,8 +471,10 @@ public void getResponseFromCDC(List records, Checkpoint cp) throws Exception {
461471

462472
for (String tabletId : tabletIds) {
463473
GetChangesResponse changesResponse =
464-
syncClient.getChangesCDCSDK(table, dbStreamId, tabletId,
465-
cp.getTerm(), cp.getIndex(), cp.getKey(), cp.getWriteId(), cp.getSnapshotTime());
474+
syncClient.getChangesCDCSDK(
475+
table, dbStreamId, tabletId,
476+
cp.getTerm(), cp.getIndex(), cp.getKey(), cp.getWriteId(), cp.getSnapshotTime(),
477+
shouldSendSchema());
466478

467479
if (FORMAT.equalsIgnoreCase("PROTO")) {
468480
// Add records in proto.

java/yb-client/src/main/java/org/yb/client/AsyncYBClient.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -432,10 +432,11 @@ public Object call(Object o) throws Exception {
432432
public Deferred<GetChangesResponse> getChangesCDCSDK(YBTable table, String streamId,
433433
String tabletId, long term,
434434
long index, byte[] key,
435-
int write_id, long time) {
435+
int write_id, long time,
436+
boolean needSchemaInfo) {
436437
checkIsClosed();
437438
GetChangesRequest rpc = new GetChangesRequest(table, streamId, tabletId, term,
438-
index, key, write_id, time);
439+
index, key, write_id, time, needSchemaInfo);
439440
Deferred d = rpc.getDeferred();
440441
d.addErrback(new Callback<Exception, Exception>() {
441442
@Override

java/yb-client/src/main/java/org/yb/client/GetChangesRequest.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,10 @@ public String getTabletId() {
3535
private final byte[] key;
3636
private final int write_id;
3737
private final long time;
38+
private final boolean needSchemaInfo;
3839

3940
public GetChangesRequest(YBTable table, String streamId, String tabletId,
40-
long term, long index, byte[] key, int write_id, long time) {
41+
long term, long index, byte[] key, int write_id, long time, boolean needSchemaInfo) {
4142
super(table);
4243
this.streamId = streamId;
4344
this.tabletId = tabletId;
@@ -46,6 +47,7 @@ public GetChangesRequest(YBTable table, String streamId, String tabletId,
4647
this.key = key;
4748
this.write_id = write_id;
4849
this.time = time;
50+
this.needSchemaInfo = needSchemaInfo;
4951
}
5052

5153
@Override
@@ -54,6 +56,7 @@ ChannelBuffer serialize(Message header) {
5456
final GetChangesRequestPB.Builder builder = GetChangesRequestPB.newBuilder();
5557
builder.setDbStreamId(ByteString.copyFromUtf8(this.streamId));
5658
builder.setTabletId(ByteString.copyFromUtf8(this.tabletId));
59+
builder.setNeedSchemaInfo(this.needSchemaInfo);
5760
if (term != 0 || index != 0) {
5861
CdcService.CDCSDKCheckpointPB.Builder checkpointBuilder =
5962
CdcService.CDCSDKCheckpointPB.newBuilder();

java/yb-client/src/main/java/org/yb/client/YBClient.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -1430,9 +1430,10 @@ public IsSetupUniverseReplicationDoneResponse isAlterUniverseReplicationDone(
14301430
public GetChangesResponse getChangesCDCSDK(YBTable table, String streamId,
14311431
String tabletId, long term,
14321432
long index, byte[] key,
1433-
int write_id, long time) throws Exception {
1434-
Deferred<GetChangesResponse> d = asyncClient
1435-
.getChangesCDCSDK(table, streamId, tabletId, term, index, key, write_id, time);
1433+
int write_id, long time,
1434+
boolean needSchemaInfo) throws Exception {
1435+
Deferred<GetChangesResponse> d = asyncClient.getChangesCDCSDK(
1436+
table, streamId, tabletId, term, index, key, write_id, time, needSchemaInfo);
14361437
return d.join(2*getDefaultAdminOperationTimeoutMs());
14371438
}
14381439

src/yb/cdc/cdc_service.proto

+2
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,8 @@ message GetChangesRequestPB {
195195
optional bytes table_id = 7;
196196

197197
optional CDCSDKCheckpointPB from_cdc_sdk_checkpoint = 8;
198+
199+
optional bool need_schema_info = 9 [default = false];
198200
}
199201

200202
message KeyValuePairPB {

0 commit comments

Comments
 (0)