Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-36611][pipeline-connector][kafka] Add schema info to output of Kafka sink #3791

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 86 additions & 2 deletions docs/content.zh/docs/connectors/pipeline-connectors/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,13 @@ Pipeline 连接器配置项
<td>String</td>
<td>自定义的上游表名到下游 Kafka Topic 名的映射关系。 每个映射关系由 `;` 分割,上游表的 TableId 和下游 Kafka 的 Topic 名由 `:` 分割。 举个例子,我们可以配置 `sink.tableId-to-topic.mapping` 的值为 `mydb.mytable1:topic1;mydb.mytable2:topic2`。 </td>
</tr>
<tr>
<td>sink.debezium-json.include-schema.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>如果配置了这个参数,每条debezium记录都将包含debezium schema信息。 只有当`value.format`为`debezium-json`时才生效。 </td>
</tr>
</tbody>
</table>
</div>
Expand Down Expand Up @@ -180,6 +187,63 @@ Pipeline 连接器配置项
}
}
```
当`sink.debezium-json.include-schema.enabled=true`时,输出示例如下:
```json
{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":true,
"field":"col1"
},
{
"type":"string",
"optional":true,
"field":"col2"
}
],
"optional":true,
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":true,
"field":"col1"
},
{
"type":"string",
"optional":true,
"field":"col2"
}
],
"optional":true,
"field":"after"
}
],
"optional":false
},
"payload":{
"before": null,
"after": {
"col1": "1",
"col2": "1"
},
"op": "c",
"source": {
"db": "default_namespace",
"table": "table1"
}
}
}
```

#### canal-json
参考 [Canal | Apache Flink](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/canal/#available-metadata), canal-json 格式会包含 `old`,`data`,`type`,`database`,`table`,`pkNames` 几个元素, 但是 `ts` 并不会包含在其中。
Expand All @@ -204,79 +268,99 @@ Pipeline 连接器配置项

数据类型映射
----------------
[Literal type](https://debezium.io/documentation/reference/3.1/connectors/mysql.html#mysql-data-types): 反映数据的实际存储类型 (对应debezium schema中的type字段)<br>
[Semantic type](https://debezium.io/documentation/reference/3.1/connectors/mysql.html#mysql-data-types): 反映数据的逻辑类型 (对应对应debezium schema中的name字段)。
<div class="wy-table-responsive">
<table class="colwidths-auto docutils">
<thead>
<tr>
<th class="text-left">CDC type</th>
<th class="text-left">JSON type</th>
<th class="text-left">Literal type</th>
<th class="text-left">Semantic type</th>
<th class="text-left" style="width:60%;">NOTE</th>
</tr>
</thead>
<tbody>
<tr>
<td>TINYINT</td>
<td>TINYINT</td>
<td>INT16</td>
<td></td>
</tr>
<tr>
<td>SMALLINT</td>
<td>SMALLINT</td>
<td>INT16</td>
<td></td>
</tr>
<tr>
<td>INT</td>
<td>INT</td>
<td>INT32</td>
<td></td>
</tr>
<tr>
<td>BIGINT</td>
<td>BIGINT</td>
<td>INT64</td>
<td></td>
</tr>
<tr>
<td>FLOAT</td>
<td>FLOAT</td>
<td>FLOAT</td>
<td></td>
</tr>
<tr>
<td>DOUBLE</td>
<td>DOUBLE</td>
<td>DOUBLE</td>
<td></td>
</tr>
<tr>
<td>DECIMAL(p, s)</td>
<td>DECIMAL(p, s)</td>
<td>BYTES</td>
<td>org.apache.kafka.connect.data.Decimal</td>
<td></td>
</tr>
<tr>
<td>BOOLEAN</td>
<td>BOOLEAN</td>
<td>BOOLEAN</td>
<td></td>
</tr>
<tr>
<td>DATE</td>
<td>DATE</td>
<td>io.debezium.time.Date</td>
<td></td>
</tr>
<tr>
<td>TIMESTAMP</td>
<td>DATETIME</td>
<td>TIMESTAMP(p)</td>
<td>TIMESTAMP(p)</td>
<td>INT64</td>
<td>p <=3 io.debezium.time.Timestamp <br>p >3 io.debezium.time.MicroTimestamp </td>
<td></td>
</tr>
<tr>
<td>TIMESTAMP_LTZ</td>
<td>TIMESTAMP_LTZ</td>
<td>STRING</td>
<td>io.debezium.time.ZonedTimestamp</td>
<td></td>
</tr>
<tr>
<td>CHAR(n)</td>
<td>CHAR(n)</td>
<td>STRING</td>
<td></td>
</tr>
<tr>
<td>VARCHAR(n)</td>
<td>VARCHAR(n)</td>
<td>STRING</td>
<td></td>
</tr>
</tbody>
Expand Down
89 changes: 87 additions & 2 deletions docs/content/docs/connectors/pipeline-connectors/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,13 @@ Pipeline Connector Options
<td>String</td>
<td>Custom table mappings for each table from upstream tableId to downstream Kafka topic. Each mapping is separated by `;`, separate upstream tableId and downstream Kafka topic by `:`, For example, we can set `sink.tableId-to-topic.mapping` like `mydb.mytable1:topic1;mydb.mytable2:topic2`. </td>
</tr>
<tr>
<td>sink.debezium-json.include-schema.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>If this parameter is configured, each debezium record will contain debezium schema information. Is only supported when using debezium-json. </td>
</tr>
</tbody>
</table>
</div>
Expand Down Expand Up @@ -178,6 +185,63 @@ An output example is:
}
}
```
When `sink.debezium-json.include-schema.enabled` is true, the output format will be:
```json
{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":true,
"field":"col1"
},
{
"type":"string",
"optional":true,
"field":"col2"
}
],
"optional":true,
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":true,
"field":"col1"
},
{
"type":"string",
"optional":true,
"field":"col2"
}
],
"optional":true,
"field":"after"
}
],
"optional":false
},
"payload":{
"before": null,
"after": {
"col1": "1",
"col2": "1"
},
"op": "c",
"source": {
"db": "default_namespace",
"table": "table1"
}
}
}
```

#### canal-json
Refer to [Canal | Apache Flink](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/canal/#available-metadata), canal-json format will contains `old`,`data`,`type`,`database`,`table`,`pkNames` elements, but `ts` is not included.
Expand All @@ -202,79 +266,100 @@ An output example is:

Data Type Mapping
----------------
[Literal type](https://debezium.io/documentation/reference/3.1/connectors/mysql.html#mysql-data-types): defines the physical storage format of data (type field of the debezium schema)<br>
[Semantic type](https://debezium.io/documentation/reference/3.1/connectors/mysql.html#mysql-data-types): defines the logical meaning of data (name field of the debezium schema).
<div class="wy-table-responsive">
<table class="colwidths-auto docutils">
<thead>
<tr>
<th class="text-left">CDC type</th>
<th class="text-left">JSON type</th>
<th class="text-left">Literal type</th>
<th class="text-left">Semantic type</th>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to add Literal/Semantic type to the documentation? It seems like users won't care about it? @lvyanquan

Copy link
Contributor

@lvyanquan lvyanquan Mar 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I suggest not add this as users won't pay much attention on it.

I think adding more information is not a disadvantage, so we can keep them.

<th class="text-left" style="width:60%;">NOTE</th>
</tr>
</thead>
<tbody>
<tr>
<td>TINYINT</td>
<td>TINYINT</td>
<td>INT16</td>
<td></td>
</tr>
<tr>
<td>SMALLINT</td>
<td>SMALLINT</td>
<td>INT16</td>
<td></td>
</tr>
<tr>
<td>INT</td>
<td>INT</td>
<td>INT32</td>
<td></td>
</tr>
<tr>
<td>BIGINT</td>
<td>BIGINT</td>
<td>INT64</td>
<td></td>
</tr>
<tr>
<td>FLOAT</td>
<td>FLOAT</td>
<td>FLOAT</td>
<td></td>
</tr>
<tr>
<td>DOUBLE</td>
<td>DOUBLE</td>
<td>DOUBLE</td>
<td></td>
</tr>
<tr>
<td>DECIMAL(p, s)</td>
<td>DECIMAL(p, s)</td>
<td>BYTES</td>
<td>org.apache.kafka.connect.data.Decimal</td>
<td></td>
</tr>
<tr>
<td>BOOLEAN</td>
<td>BOOLEAN</td>
<td>BOOLEAN</td>
<td></td>
</tr>
<tr>
<td>DATE</td>
<td>DATE</td>
<td>INT32</td>
<td>io.debezium.time.Date</td>
<td></td>
</tr>
<tr>
<td>TIMESTAMP</td>
<td>TIMESTAMP</td>
<td>TIMESTAMP(p)</td>
<td>TIMESTAMP(p)</td>
<td>INT64</td>
<td>p <=3 io.debezium.time.Timestamp <br>p >3 io.debezium.time.MicroTimestamp </td>
<td></td>
</tr>
<tr>
<td>TIMESTAMP_LTZ</td>
<td>TIMESTAMP_LTZ</td>
<td>STRING</td>
<td>io.debezium.time.ZonedTimestamp</td>
<td></td>
</tr>
<tr>
<td>CHAR(n)</td>
<td>CHAR(n)</td>
<td>STRING</td>
<td></td>
</tr>
<tr>
<td>VARCHAR(n)</td>
<td>VARCHAR(n)</td>
<td>STRING</td>
<td></td>
</tr>
</tbody>
Expand Down
Loading
Loading