-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-36611][pipeline-connector][kafka] Add schema info to output of Kafka sink #3791
base: master
Are you sure you want to change the base?
[FLINK-36611][pipeline-connector][kafka] Add schema info to output of Kafka sink #3791
Conversation
...um/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java
Outdated
Show resolved
Hide resolved
// escape characters such as "\" | ||
String schemaValue = node.get("schema").asText(); | ||
JsonNode schemaNode = mapper.readTree(schemaValue); | ||
node.set("schema", schemaNode); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because the schema is passed to the downstream as a string, and there is a nested json in the schema, if the json string is put into jsonNode, there will be ["]. The JsonNode.asText() method can solve this problem well.
Hi, @MOBIN-F. Is there any blocker for this being ready? |
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/DataChangeEvent.java
Outdated
Show resolved
Hide resolved
...mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java
Outdated
Show resolved
Hide resolved
61ba472
to
1a2f93e
Compare
...sql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlReadableMetadata.java
Outdated
Show resolved
Hide resolved
1a2f93e
to
b7632e6
Compare
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/pom.xml
Outdated
Show resolved
Hide resolved
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/pom.xml
Show resolved
Hide resolved
...ava/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java
Outdated
Show resolved
Hide resolved
...tor-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkOptions.java
Outdated
Show resolved
Hide resolved
...tor-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkOptions.java
Outdated
Show resolved
Hide resolved
...tor-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactory.java
Outdated
Show resolved
Hide resolved
...fka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/ChangeLogJsonFormatFactory.java
Outdated
Show resolved
Hide resolved
...ava/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java
Outdated
Show resolved
Hide resolved
...ava/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java
Show resolved
Hide resolved
case VARCHAR: | ||
case VARBINARY: | ||
default: | ||
field = SchemaBuilder.string(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This includes ARRAY/MAP/ROW, There may be some issues if all they were converted to SchemaBuilder.string
.
You can check if there is a better type, and if not, it is also acceptable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
debezium-json does not seem to receive ARRAY/MAP/ROW type data because the mysql does not support this type
Thanks @MOBIN-F for this update. Additionally, you can manually test the data types in Paimon action to verify its correctness. |
<div class="wy-table-responsive"> | ||
<table class="colwidths-auto docutils"> | ||
<thead> | ||
<tr> | ||
<th class="text-left">CDC type</th> | ||
<th class="text-left">JSON type</th> | ||
<th class="text-left">Literal type</th> | ||
<th class="text-left">Semantic type</th> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to add Literal/Semantic type to the documentation? It seems like users won't care about it? @lvyanquan
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I suggest not add this as users won't pay much attention on it.
I think adding more information is not a disadvantage, so we can keep them.
I tested 【pipeline-kafka-->kafka-->paimon action->paimon table】, and it worked as expected. Maybe I can add some e2e tests later. |
.../apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonRowDataSerializationSchema.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
Left some minor comments.
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/pom.xml
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
Hi @leonardBang @ruanhang1993 could you help to check about this? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @MOBIN-F and @lvyanquan for the contribution, LGTM, wait the CI green
CI failed, but it has nothing to do with this PR because this PR only involves the kafka-pipeline module |
Currently, the output of Kafka sink in debezium format looks like this:
It contains record data with full before/after and db info, but schema info wasn't included.
However, In some scenarios, we need this information to determine the type of data. For example, Paimon's Kafka CDC source requires this type information, otherwise all types are considered String, refer to https://paimon.apache.org/docs/0.9/flink/cdc-ingestion/kafka-cdc/#supported-formats.
Considering that this will increase the data load, I suggest adding a parameter to configure whether to enable it.