From b7a9ba42901eef55c43874211da441cc7270b9ce Mon Sep 17 00:00:00 2001 From: Leonard Xu Date: Thu, 6 Mar 2025 12:15:32 +0800 Subject: [PATCH] [FLINK-37120][minor] Rename ending chunk to unbounded chunk --- .../docs/connectors/flink-sources/db2-cdc.md | 2 +- .../connectors/flink-sources/oracle-cdc.md | 2 +- .../connectors/flink-sources/postgres-cdc.md | 2 +- .../connectors/flink-sources/sqlserver-cdc.md | 2 +- .../docs/connectors/flink-sources/db2-cdc.md | 2 +- .../connectors/flink-sources/mongodb-cdc.md | 2 +- .../connectors/flink-sources/mysql-cdc.md | 2 +- .../connectors/flink-sources/oracle-cdc.md | 2 +- .../connectors/flink-sources/postgres-cdc.md | 2 +- .../connectors/flink-sources/sqlserver-cdc.md | 2 +- .../connectors/pipeline-connectors/mysql.md | 2 +- .../mysql/factory/MySqlDataSourceFactory.java | 4 ++-- .../mysql/source/MySqlDataSourceOptions.java | 2 +- .../source/MySqlDataSourceFactoryTest.java | 2 +- .../base/config/BaseSourceConfig.java | 10 ++++---- .../base/config/JdbcSourceConfig.java | 4 ++-- .../base/config/JdbcSourceConfigFactory.java | 9 +++---- .../connectors/base/config/SourceConfig.java | 2 +- .../base/options/SourceOptions.java | 2 +- .../splitter/JdbcSourceChunkSplitter.java | 7 +++--- .../db2/source/Db2SourceBuilder.java | 8 +++---- .../db2/source/config/Db2SourceConfig.java | 4 ++-- .../source/config/Db2SourceConfigFactory.java | 2 +- .../connectors/db2/table/Db2TableSource.java | 10 ++++---- .../db2/table/Db2TableSourceFactory.java | 4 ++-- .../mongodb/source/MongoDBSourceBuilder.java | 8 +++---- .../splitters/SampleBucketSplitStrategy.java | 2 +- .../assigners/splitters/SplitContext.java | 12 +++++----- .../splitters/SplitVectorSplitStrategy.java | 2 +- .../source/config/MongoDBSourceConfig.java | 10 ++++---- .../config/MongoDBSourceConfigFactory.java | 11 +++++---- .../mongodb/table/MongoDBTableSource.java | 10 ++++---- .../table/MongoDBTableSourceFactory.java | 4 ++-- ...pshotSplitReaderAssignEndingFirstTest.java | 2 +- .../mysql/source/MySqlSourceBuilder.java | 8 +++---- .../source/assigners/MySqlChunkSplitter.java | 7 +++--- .../source/config/MySqlSourceConfig.java | 10 ++++---- .../config/MySqlSourceConfigFactory.java | 11 +++++---- .../source/config/MySqlSourceOptions.java | 4 ++-- .../mysql/table/MySqlTableSource.java | 10 ++++---- .../mysql/table/MySqlTableSourceFactory.java | 8 +++---- .../assigners/MySqlChunkSplitterTest.java | 6 ++--- .../table/MySqlTableSourceFactoryTest.java | 24 +++++++++---------- .../oracle/source/OracleSourceBuilder.java | 6 ++--- .../source/config/OracleSourceConfig.java | 4 ++-- .../config/OracleSourceConfigFactory.java | 2 +- .../oracle/table/OracleTableSource.java | 10 ++++---- .../table/OracleTableSourceFactory.java | 4 ++-- .../source/PostgresSourceBuilder.java | 6 ++--- .../source/config/PostgresSourceConfig.java | 4 ++-- .../config/PostgresSourceConfigFactory.java | 2 +- .../table/PostgreSQLTableFactory.java | 4 ++-- .../postgres/table/PostgreSQLTableSource.java | 10 ++++---- .../table/MockPostgreSQLTableSource.java | 2 +- .../source/SqlServerSourceBuilder.java | 8 +++---- .../source/config/SqlServerSourceConfig.java | 4 ++-- .../config/SqlServerSourceConfigFactory.java | 2 +- .../table/SqlServerTableFactory.java | 4 ++-- .../sqlserver/table/SqlServerTableSource.java | 10 ++++---- 59 files changed, 164 insertions(+), 159 deletions(-) diff --git a/docs/content.zh/docs/connectors/flink-sources/db2-cdc.md b/docs/content.zh/docs/connectors/flink-sources/db2-cdc.md index 20ddbe160df..25e17d327af 100644 --- a/docs/content.zh/docs/connectors/flink-sources/db2-cdc.md +++ b/docs/content.zh/docs/connectors/flink-sources/db2-cdc.md @@ -270,7 +270,7 @@ Db2 server. false Boolean - Whether to assign the unbounded chunk first during snapshot reading phase.
+ Whether to assign the unbounded chunks first during snapshot reading phase.
This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk.
Experimental option, defaults to false. diff --git a/docs/content.zh/docs/connectors/flink-sources/oracle-cdc.md b/docs/content.zh/docs/connectors/flink-sources/oracle-cdc.md index c5bc79bd92f..368b8fe182a 100644 --- a/docs/content.zh/docs/connectors/flink-sources/oracle-cdc.md +++ b/docs/content.zh/docs/connectors/flink-sources/oracle-cdc.md @@ -428,7 +428,7 @@ Connector Options false Boolean - Whether to assign the unbounded chunk first during snapshot reading phase.
+ Whether to assign the unbounded chunks first during snapshot reading phase.
This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk.
Experimental option, defaults to false. diff --git a/docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md b/docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md index a862326b95f..d3082146bec 100644 --- a/docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md +++ b/docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md @@ -251,7 +251,7 @@ Connector Options false Boolean - Whether to assign the unbounded chunk first during snapshot reading phase.
+ Whether to assign the unbounded chunks first during snapshot reading phase.
This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk.
Experimental option, defaults to false. diff --git a/docs/content.zh/docs/connectors/flink-sources/sqlserver-cdc.md b/docs/content.zh/docs/connectors/flink-sources/sqlserver-cdc.md index 5602beabe73..49b97f2e233 100644 --- a/docs/content.zh/docs/connectors/flink-sources/sqlserver-cdc.md +++ b/docs/content.zh/docs/connectors/flink-sources/sqlserver-cdc.md @@ -244,7 +244,7 @@ Connector Options false Boolean - Whether to assign the unbounded chunk first during snapshot reading phase.
+ Whether to assign the unbounded chunks first during snapshot reading phase.
This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk.
Experimental option, defaults to false. diff --git a/docs/content/docs/connectors/flink-sources/db2-cdc.md b/docs/content/docs/connectors/flink-sources/db2-cdc.md index 01ff29ef1d9..09d7895fee2 100644 --- a/docs/content/docs/connectors/flink-sources/db2-cdc.md +++ b/docs/content/docs/connectors/flink-sources/db2-cdc.md @@ -270,7 +270,7 @@ Db2 server. false Boolean - Whether to assign the unbounded chunk first during snapshot reading phase.
+ Whether to assign the unbounded chunks first during snapshot reading phase.
This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk.
Experimental option, defaults to false. diff --git a/docs/content/docs/connectors/flink-sources/mongodb-cdc.md b/docs/content/docs/connectors/flink-sources/mongodb-cdc.md index eabac840d0d..aebbbf08d82 100644 --- a/docs/content/docs/connectors/flink-sources/mongodb-cdc.md +++ b/docs/content/docs/connectors/flink-sources/mongodb-cdc.md @@ -327,7 +327,7 @@ Connector Options false Boolean - Whether to assign the unbounded chunk first during snapshot reading phase.
+ Whether to assign the unbounded chunks first during snapshot reading phase.
This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk.
Experimental option, defaults to false. diff --git a/docs/content/docs/connectors/flink-sources/mysql-cdc.md b/docs/content/docs/connectors/flink-sources/mysql-cdc.md index 7e8a9fba0f4..45988afd983 100644 --- a/docs/content/docs/connectors/flink-sources/mysql-cdc.md +++ b/docs/content/docs/connectors/flink-sources/mysql-cdc.md @@ -421,7 +421,7 @@ During a snapshot operation, the connector will query each included table to pro false Boolean - Whether to assign the unbounded chunk first during snapshot reading phase.
+ Whether to assign the unbounded chunks first during snapshot reading phase.
This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk.
Experimental option, defaults to false. diff --git a/docs/content/docs/connectors/flink-sources/oracle-cdc.md b/docs/content/docs/connectors/flink-sources/oracle-cdc.md index e98272e8bdd..7694218994f 100644 --- a/docs/content/docs/connectors/flink-sources/oracle-cdc.md +++ b/docs/content/docs/connectors/flink-sources/oracle-cdc.md @@ -429,7 +429,7 @@ Connector Options false Boolean - Whether to assign the unbounded chunk first during snapshot reading phase.
+ Whether to assign the unbounded chunks first during snapshot reading phase.
This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk.
Experimental option, defaults to false. diff --git a/docs/content/docs/connectors/flink-sources/postgres-cdc.md b/docs/content/docs/connectors/flink-sources/postgres-cdc.md index d7598584cda..a1ced51a6c2 100644 --- a/docs/content/docs/connectors/flink-sources/postgres-cdc.md +++ b/docs/content/docs/connectors/flink-sources/postgres-cdc.md @@ -248,7 +248,7 @@ SELECT * FROM shipments; false Boolean - Whether to assign the unbounded chunk first during snapshot reading phase.
+ Whether to assign the unbounded chunks first during snapshot reading phase.
This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk.
Experimental option, defaults to false. diff --git a/docs/content/docs/connectors/flink-sources/sqlserver-cdc.md b/docs/content/docs/connectors/flink-sources/sqlserver-cdc.md index 10e6f4a52ce..683ed15a3b8 100644 --- a/docs/content/docs/connectors/flink-sources/sqlserver-cdc.md +++ b/docs/content/docs/connectors/flink-sources/sqlserver-cdc.md @@ -244,7 +244,7 @@ Connector Options false Boolean - Whether to assign the unbounded chunk first during snapshot reading phase.
+ Whether to assign the unbounded chunks first during snapshot reading phase.
This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk.
Experimental option, defaults to false. diff --git a/docs/content/docs/connectors/pipeline-connectors/mysql.md b/docs/content/docs/connectors/pipeline-connectors/mysql.md index cf5a56ec22b..a5e74f6e32e 100644 --- a/docs/content/docs/connectors/pipeline-connectors/mysql.md +++ b/docs/content/docs/connectors/pipeline-connectors/mysql.md @@ -338,7 +338,7 @@ pipeline: false Boolean - Whether to assign the unbounded chunk first during snapshot reading phase.
+ Whether to assign the unbounded chunks first during snapshot reading phase.
This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk.
Experimental option, defaults to false. diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java index b6c48167474..de47d236788 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java @@ -151,7 +151,7 @@ public DataSource createDataSource(Context context) { config.get(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED); boolean isParsingOnLineSchemaChanges = config.get(PARSE_ONLINE_SCHEMA_CHANGES); boolean useLegacyJsonFormat = config.get(USE_LEGACY_JSON_FORMAT); - boolean isAssignEndingChunkFirst = + boolean isAssignUnboundedChunkFirst = config.get(SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST); validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1); @@ -205,7 +205,7 @@ public DataSource createDataSource(Context context) { .parseOnLineSchemaChanges(isParsingOnLineSchemaChanges) .treatTinyInt1AsBoolean(treatTinyInt1AsBoolean) .useLegacyJsonFormat(useLegacyJsonFormat) - .assignEndingChunkFirst(isAssignEndingChunkFirst); + .assignUnboundedChunkFirst(isAssignUnboundedChunkFirst); List tableIds = MySqlSchemaUtils.listTables(configFactory.createConfig(0), null); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java index fb878547454..bb7d7b86635 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java @@ -320,5 +320,5 @@ public class MySqlDataSourceOptions { .booleanType() .defaultValue(false) .withDescription( - "Whether to assign the ending chunk first during snapshot reading phase. This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk. Defaults to false."); + "Whether to assign the unbounded chunks first during snapshot reading phase. This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk. Defaults to false."); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java index 1ced1f740c8..86bd01d7575 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java @@ -274,7 +274,7 @@ public void testOptionalOption() { MySqlDataSource dataSource = (MySqlDataSource) factory.createDataSource(context); assertThat(dataSource.getSourceConfig().isTreatTinyInt1AsBoolean()).isFalse(); assertThat(dataSource.getSourceConfig().isParseOnLineSchemaChanges()).isTrue(); - assertThat(dataSource.getSourceConfig().isAssignEndingChunkFirst()).isTrue(); + assertThat(dataSource.getSourceConfig().isAssignUnboundedChunkFirst()).isTrue(); } @Test diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/config/BaseSourceConfig.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/config/BaseSourceConfig.java index a67d6b46d8b..b8ce8dd4448 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/config/BaseSourceConfig.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/config/BaseSourceConfig.java @@ -38,7 +38,7 @@ public abstract class BaseSourceConfig implements SourceConfig { protected final boolean closeIdleReaders; protected final boolean skipSnapshotBackfill; protected final boolean isScanNewlyAddedTableEnabled; - protected final boolean assignEndingChunkFirst; + protected final boolean assignUnboundedChunkFirst; // -------------------------------------------------------------------------------------------- // Debezium Configurations @@ -58,7 +58,7 @@ public BaseSourceConfig( boolean isScanNewlyAddedTableEnabled, Properties dbzProperties, Configuration dbzConfiguration, - boolean assignEndingChunkFirst) { + boolean assignUnboundedChunkFirst) { this.startupOptions = startupOptions; this.splitSize = splitSize; this.splitMetaGroupSize = splitMetaGroupSize; @@ -70,7 +70,7 @@ public BaseSourceConfig( this.isScanNewlyAddedTableEnabled = isScanNewlyAddedTableEnabled; this.dbzProperties = dbzProperties; this.dbzConfiguration = dbzConfiguration; - this.assignEndingChunkFirst = assignEndingChunkFirst; + this.assignUnboundedChunkFirst = assignUnboundedChunkFirst; } @Override @@ -120,7 +120,7 @@ public boolean isSkipSnapshotBackfill() { } @Override - public boolean isAssignEndingChunkFirst() { - return assignEndingChunkFirst; + public boolean isAssignUnboundedChunkFirst() { + return assignUnboundedChunkFirst; } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/config/JdbcSourceConfig.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/config/JdbcSourceConfig.java index b3cc6ea80ea..afa730667e0 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/config/JdbcSourceConfig.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/config/JdbcSourceConfig.java @@ -74,7 +74,7 @@ public JdbcSourceConfig( String chunkKeyColumn, boolean skipSnapshotBackfill, boolean isScanNewlyAddedTableEnabled, - boolean assignEndingChunkFirst) { + boolean assignUnboundedChunkFirst) { super( startupOptions, splitSize, @@ -87,7 +87,7 @@ public JdbcSourceConfig( isScanNewlyAddedTableEnabled, dbzProperties, dbzConfiguration, - assignEndingChunkFirst); + assignUnboundedChunkFirst); this.driverClassName = driverClassName; this.hostname = hostname; this.port = port; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/config/JdbcSourceConfigFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/config/JdbcSourceConfigFactory.java index 5ff9211d726..3dffd13eeb0 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/config/JdbcSourceConfigFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/config/JdbcSourceConfigFactory.java @@ -60,7 +60,7 @@ public abstract class JdbcSourceConfigFactory implements Factory splitEvenlySizedChunks( break; } } - // add the ending split - // assign ending split first, both the largest and smallest unbounded chunks are completed + // add the unbounded split + // assign unbounded split first, both the largest and smallest unbounded chunks are + // completed // in the first two splits - if (sourceConfig.isAssignEndingChunkFirst()) { + if (sourceConfig.isAssignUnboundedChunkFirst()) { splits.add(0, ChunkRange.of(chunkStart, null)); } else { splits.add(ChunkRange.of(chunkStart, null)); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/Db2SourceBuilder.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/Db2SourceBuilder.java index 3875a87f90e..8048adfbdfd 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/Db2SourceBuilder.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/Db2SourceBuilder.java @@ -233,11 +233,11 @@ public Db2SourceBuilder skipSnapshotBackfill(boolean skipSnapshotBackfill) { } /** - * Whether the {@link Db2IncrementalSource} should assign the ending chunk first or not during - * snapshot reading phase. + * Whether the {@link Db2IncrementalSource} should assign the unbounded chunks first or not + * during snapshot reading phase. */ - public Db2SourceBuilder assignEndingChunkFirst(boolean assignEndingChunkFirst) { - this.configFactory.assignEndingChunkFirst(assignEndingChunkFirst); + public Db2SourceBuilder assignUnboundedChunkFirst(boolean assignUnboundedChunkFirst) { + this.configFactory.assignUnboundedChunkFirst(assignUnboundedChunkFirst); return this; } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/config/Db2SourceConfig.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/config/Db2SourceConfig.java index 272f5393572..ab35aa409ea 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/config/Db2SourceConfig.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/config/Db2SourceConfig.java @@ -57,7 +57,7 @@ public Db2SourceConfig( int connectionPoolSize, String chunkKeyColumn, boolean skipSnapshotBackfill, - boolean assignEndingChunkFirst) { + boolean assignUnboundedChunkFirst) { super( startupOptions, databaseList, @@ -84,7 +84,7 @@ public Db2SourceConfig( chunkKeyColumn, skipSnapshotBackfill, false, - assignEndingChunkFirst); + assignUnboundedChunkFirst); } @Override diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/config/Db2SourceConfigFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/config/Db2SourceConfigFactory.java index b5ba50a6f46..365e7871099 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/config/Db2SourceConfigFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/config/Db2SourceConfigFactory.java @@ -104,6 +104,6 @@ public Db2SourceConfig create(int subtask) { connectionPoolSize, chunkKeyColumn, skipSnapshotBackfill, - assignEndingChunkFirst); + assignUnboundedChunkFirst); } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/table/Db2TableSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/table/Db2TableSource.java index 46afd81b310..7c64110019f 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/table/Db2TableSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/table/Db2TableSource.java @@ -77,7 +77,7 @@ public class Db2TableSource implements ScanTableSource, SupportsReadingMetadata private final String chunkKeyColumn; private final boolean closeIdleReaders; private final boolean skipSnapshotBackfill; - private final boolean assignEndingChunkFirst; + private final boolean assignUnboundedChunkFirst; /** Metadata that is appended at the end of a physical source row. */ protected List metadataKeys; @@ -105,7 +105,7 @@ public Db2TableSource( @Nullable String chunkKeyColumn, boolean closeIdleReaders, boolean skipSnapshotBackfill, - boolean assignEndingChunkFirst) { + boolean assignUnboundedChunkFirst) { this.physicalSchema = physicalSchema; this.port = port; this.hostname = hostname; @@ -130,7 +130,7 @@ public Db2TableSource( this.chunkKeyColumn = chunkKeyColumn; this.closeIdleReaders = closeIdleReaders; this.skipSnapshotBackfill = skipSnapshotBackfill; - this.assignEndingChunkFirst = assignEndingChunkFirst; + this.assignUnboundedChunkFirst = assignUnboundedChunkFirst; } @Override @@ -177,7 +177,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { .chunkKeyColumn(chunkKeyColumn) .closeIdleReaders(closeIdleReaders) .skipSnapshotBackfill(skipSnapshotBackfill) - .assignEndingChunkFirst(assignEndingChunkFirst) + .assignUnboundedChunkFirst(assignUnboundedChunkFirst) .build(); return SourceProvider.of(db2ChangeEventSource); } else { @@ -239,7 +239,7 @@ public DynamicTableSource copy() { chunkKeyColumn, closeIdleReaders, skipSnapshotBackfill, - assignEndingChunkFirst); + assignUnboundedChunkFirst); source.metadataKeys = metadataKeys; source.producedDataType = producedDataType; return source; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/table/Db2TableSourceFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/table/Db2TableSourceFactory.java index 7b84aa82676..b3ea3189c0d 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/table/Db2TableSourceFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/table/Db2TableSourceFactory.java @@ -139,7 +139,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { config.getOptional(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN).orElse(null); boolean closeIdleReaders = config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED); boolean skipSnapshotBackfill = config.get(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP); - boolean assignEndingChunkFirst = + boolean assignUnboundedChunkFirst = config.get(SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST); if (enableParallelRead) { @@ -177,7 +177,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { chunkKeyColumn, closeIdleReaders, skipSnapshotBackfill, - assignEndingChunkFirst); + assignUnboundedChunkFirst); } @Override diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBSourceBuilder.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBSourceBuilder.java index 0181204d379..c61a5d0ce99 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBSourceBuilder.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBSourceBuilder.java @@ -269,11 +269,11 @@ public MongoDBSourceBuilder scanNewlyAddedTableEnabled(boolean scanNewlyAdded } /** - * Whether the {@link MongoDBSource} should assign the ending chunk first or not during snapshot - * reading phase. + * Whether the {@link MongoDBSource} should assign the unbounded chunks first or not during + * snapshot reading phase. */ - public MongoDBSourceBuilder assignEndingChunkFirst(boolean assignEndingChunkFirst) { - this.configFactory.assignEndingChunkFirst(assignEndingChunkFirst); + public MongoDBSourceBuilder assignUnboundedChunkFirst(boolean assignUnboundedChunkFirst) { + this.configFactory.assignUnboundedChunkFirst(assignUnboundedChunkFirst); return this; } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/assigners/splitters/SampleBucketSplitStrategy.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/assigners/splitters/SampleBucketSplitStrategy.java index 096eda3d437..497cefd157e 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/assigners/splitters/SampleBucketSplitStrategy.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/assigners/splitters/SampleBucketSplitStrategy.java @@ -159,7 +159,7 @@ public Collection split(SplitContext splitContext) { ChunkUtils.maxUpperBoundOfId(), null, schema); - if (splitContext.isAssignEndingChunkFirst()) { + if (splitContext.isAssignUnboundedChunkFirst()) { snapshotSplits.add(0, lastSplit); } else { snapshotSplits.add(lastSplit); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/assigners/splitters/SplitContext.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/assigners/splitters/SplitContext.java index 0e21ac4adfa..0bfe297f37d 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/assigners/splitters/SplitContext.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/assigners/splitters/SplitContext.java @@ -41,7 +41,7 @@ public class SplitContext { private final BsonDocument collectionStats; private final int chunkSizeMB; private final int samplesPerChunk; - private final boolean assignEndingChunkFirst; + private final boolean assignUnboundedChunkFirst; public SplitContext( MongoClient mongoClient, @@ -49,13 +49,13 @@ public SplitContext( BsonDocument collectionStats, int chunkSizeMB, int samplesPerChunk, - boolean assignEndingChunkFirst) { + boolean assignUnboundedChunkFirst) { this.mongoClient = mongoClient; this.collectionId = collectionId; this.collectionStats = collectionStats; this.chunkSizeMB = chunkSizeMB; this.samplesPerChunk = samplesPerChunk; - this.assignEndingChunkFirst = assignEndingChunkFirst; + this.assignUnboundedChunkFirst = assignUnboundedChunkFirst; } public static SplitContext of(MongoDBSourceConfig sourceConfig, TableId collectionId) { @@ -66,7 +66,7 @@ public static SplitContext of(MongoDBSourceConfig sourceConfig, TableId collecti collStats(mongoClient, collectionId), sourceConfig.getSplitSize(), sourceConfig.getSamplesPerChunk(), - sourceConfig.isAssignEndingChunkFirst()); + sourceConfig.isAssignUnboundedChunkFirst()); } public MongoClient getMongoClient() { @@ -105,7 +105,7 @@ public boolean isShardedCollection() { return collectionStats.getBoolean("sharded", BsonBoolean.FALSE).getValue(); } - public boolean isAssignEndingChunkFirst() { - return assignEndingChunkFirst; + public boolean isAssignUnboundedChunkFirst() { + return assignUnboundedChunkFirst; } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/assigners/splitters/SplitVectorSplitStrategy.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/assigners/splitters/SplitVectorSplitStrategy.java index 1468680af42..f2b6947f3f3 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/assigners/splitters/SplitVectorSplitStrategy.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/assigners/splitters/SplitVectorSplitStrategy.java @@ -131,7 +131,7 @@ public Collection split(SplitContext splitContext) { ChunkUtils.maxUpperBoundOfId(), null, schema); - if (splitContext.isAssignEndingChunkFirst()) { + if (splitContext.isAssignUnboundedChunkFirst()) { snapshotSplits.add(0, lastSplit); } else { snapshotSplits.add(lastSplit); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/config/MongoDBSourceConfig.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/config/MongoDBSourceConfig.java index 67bbec91103..03562618822 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/config/MongoDBSourceConfig.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/config/MongoDBSourceConfig.java @@ -55,7 +55,7 @@ public class MongoDBSourceConfig implements SourceConfig { private final boolean disableCursorTimeout; private final boolean skipSnapshotBackfill; private final boolean isScanNewlyAddedTableEnabled; - private final boolean assignEndingChunkFirst; + private final boolean assignUnboundedChunkFirst; MongoDBSourceConfig( String scheme, @@ -79,7 +79,7 @@ public class MongoDBSourceConfig implements SourceConfig { boolean disableCursorTimeout, boolean skipSnapshotBackfill, boolean isScanNewlyAddedTableEnabled, - boolean assignEndingChunkFirst) { + boolean assignUnboundedChunkFirst) { this.scheme = checkNotNull(scheme); this.hosts = checkNotNull(hosts); this.username = username; @@ -102,7 +102,7 @@ public class MongoDBSourceConfig implements SourceConfig { this.disableCursorTimeout = disableCursorTimeout; this.skipSnapshotBackfill = skipSnapshotBackfill; this.isScanNewlyAddedTableEnabled = isScanNewlyAddedTableEnabled; - this.assignEndingChunkFirst = assignEndingChunkFirst; + this.assignUnboundedChunkFirst = assignUnboundedChunkFirst; } public String getScheme() { @@ -203,8 +203,8 @@ public boolean isScanNewlyAddedTableEnabled() { return isScanNewlyAddedTableEnabled; } - public boolean isAssignEndingChunkFirst() { - return assignEndingChunkFirst; + public boolean isAssignUnboundedChunkFirst() { + return assignUnboundedChunkFirst; } @Override diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/config/MongoDBSourceConfigFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/config/MongoDBSourceConfigFactory.java index 16a064e99e5..691f3712a1a 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/config/MongoDBSourceConfigFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/config/MongoDBSourceConfigFactory.java @@ -62,7 +62,7 @@ public class MongoDBSourceConfigFactory implements Factory private boolean disableCursorTimeout = true; protected boolean skipSnapshotBackfill = false; protected boolean scanNewlyAddedTableEnabled = false; - protected boolean assignEndingChunkFirst = false; + protected boolean assignUnboundedChunkFirst = false; /** The protocol connected to MongoDB. For example mongodb or mongodb+srv. */ public MongoDBSourceConfigFactory scheme(String scheme) { @@ -272,10 +272,11 @@ public MongoDBSourceConfigFactory scanNewlyAddedTableEnabled( } /** - * Whether to assign the ending chunk first during snapshot reading phase. Defaults to false. + * Whether to assign the unbounded chunks first during snapshot reading phase. Defaults to + * false. */ - public MongoDBSourceConfigFactory assignEndingChunkFirst(boolean assignEndingChunkFirst) { - this.assignEndingChunkFirst = assignEndingChunkFirst; + public MongoDBSourceConfigFactory assignUnboundedChunkFirst(boolean assignUnboundedChunkFirst) { + this.assignUnboundedChunkFirst = assignUnboundedChunkFirst; return this; } @@ -305,6 +306,6 @@ public MongoDBSourceConfig create(int subtaskId) { disableCursorTimeout, skipSnapshotBackfill, scanNewlyAddedTableEnabled, - assignEndingChunkFirst); + assignUnboundedChunkFirst); } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableSource.java index 21662cee615..217a70cabd0 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableSource.java @@ -86,7 +86,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad private final boolean noCursorTimeout; private final boolean skipSnapshotBackfill; private final boolean scanNewlyAddedTableEnabled; - private final boolean assignEndingChunkFirst; + private final boolean assignUnboundedChunkFirst; // -------------------------------------------------------------------------------------------- // Mutable attributes @@ -123,7 +123,7 @@ public MongoDBTableSource( boolean noCursorTimeout, boolean skipSnapshotBackfill, boolean scanNewlyAddedTableEnabled, - boolean assignEndingChunkFirst) { + boolean assignUnboundedChunkFirst) { this.physicalSchema = physicalSchema; this.scheme = checkNotNull(scheme); this.hosts = checkNotNull(hosts); @@ -150,7 +150,7 @@ public MongoDBTableSource( this.noCursorTimeout = noCursorTimeout; this.skipSnapshotBackfill = skipSnapshotBackfill; this.scanNewlyAddedTableEnabled = scanNewlyAddedTableEnabled; - this.assignEndingChunkFirst = assignEndingChunkFirst; + this.assignUnboundedChunkFirst = assignUnboundedChunkFirst; } @Override @@ -211,7 +211,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { .scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled) .deserializer(deserializer) .disableCursorTimeout(noCursorTimeout) - .assignEndingChunkFirst(assignEndingChunkFirst); + .assignUnboundedChunkFirst(assignUnboundedChunkFirst); Optional.ofNullable(databaseList).ifPresent(builder::databaseList); Optional.ofNullable(collectionList).ifPresent(builder::collectionList); @@ -312,7 +312,7 @@ public DynamicTableSource copy() { noCursorTimeout, skipSnapshotBackfill, scanNewlyAddedTableEnabled, - assignEndingChunkFirst); + assignUnboundedChunkFirst); source.metadataKeys = metadataKeys; source.producedDataType = producedDataType; return source; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableSourceFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableSourceFactory.java index 0d4966bc9f3..1b40ff5898d 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableSourceFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableSourceFactory.java @@ -107,7 +107,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { boolean enableCloseIdleReaders = config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED); boolean skipSnapshotBackfill = config.get(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP); boolean scanNewlyAddedTableEnabled = config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED); - boolean assignEndingChunkFirst = + boolean assignUnboundedChunkFirst = config.get(SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST); int splitSizeMB = config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB); @@ -150,7 +150,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { noCursorTimeout, skipSnapshotBackfill, scanNewlyAddedTableEnabled, - assignEndingChunkFirst); + assignUnboundedChunkFirst); } private void checkPrimaryKey(UniqueConstraint pk, String message) { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/reader/MongoDBSnapshotSplitReaderAssignEndingFirstTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/reader/MongoDBSnapshotSplitReaderAssignEndingFirstTest.java index c60b65da362..e41040d9d4a 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/reader/MongoDBSnapshotSplitReaderAssignEndingFirstTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/reader/MongoDBSnapshotSplitReaderAssignEndingFirstTest.java @@ -80,7 +80,7 @@ public void before() { .splitSizeMB(1) .samplesPerChunk(10) .pollAwaitTimeMillis(500) - .assignEndingChunkFirst(true); + .assignUnboundedChunkFirst(true); sourceConfig = configFactory.create(0); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java index 97817b189fc..caf316d1b4a 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java @@ -284,11 +284,11 @@ public MySqlSourceBuilder parseOnLineSchemaChanges(boolean parseOnLineSchemaC } /** - * Whether the {@link MySqlSource} should assign the ending chunk first or not during snapshot - * reading phase. + * Whether the {@link MySqlSource} should assign the unbounded chunks first or not during + * snapshot reading phase. */ - public MySqlSourceBuilder assignEndingChunkFirst(boolean assignEndingChunkFirst) { - this.configFactory.assignEndingChunkFirst(assignEndingChunkFirst); + public MySqlSourceBuilder assignUnboundedChunkFirst(boolean assignUnboundedChunkFirst) { + this.configFactory.assignUnboundedChunkFirst(assignUnboundedChunkFirst); return this; } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java index cc1560a5af8..b986b5e8195 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java @@ -316,10 +316,11 @@ public List splitEvenlySizedChunks( break; } } - // add the ending split - // assign ending split first, both the largest and smallest unbounded chunks are completed + // add the unbounded split + // assign unbounded split first, both the largest and smallest unbounded chunks are + // completed // in the first two splits - if (sourceConfig.isAssignEndingChunkFirst()) { + if (sourceConfig.isAssignUnboundedChunkFirst()) { splits.add(0, ChunkRange.of(chunkStart, null)); } else { splits.add(ChunkRange.of(chunkStart, null)); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java index 27e9ec56548..2ce586dd081 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java @@ -68,7 +68,7 @@ public class MySqlSourceConfig implements Serializable { private final boolean skipSnapshotBackfill; private final boolean parseOnLineSchemaChanges; public static boolean useLegacyJsonFormat = true; - private final boolean assignEndingChunkFirst; + private final boolean assignUnboundedChunkFirst; // -------------------------------------------------------------------------------------------- // Debezium Configurations @@ -107,7 +107,7 @@ public class MySqlSourceConfig implements Serializable { boolean parseOnLineSchemaChanges, boolean treatTinyInt1AsBoolean, boolean useLegacyJsonFormat, - boolean assignEndingChunkFirst) { + boolean assignUnboundedChunkFirst) { this.hostname = checkNotNull(hostname); this.port = port; this.username = checkNotNull(username); @@ -138,7 +138,7 @@ public class MySqlSourceConfig implements Serializable { this.parseOnLineSchemaChanges = parseOnLineSchemaChanges; this.treatTinyInt1AsBoolean = treatTinyInt1AsBoolean; this.useLegacyJsonFormat = useLegacyJsonFormat; - this.assignEndingChunkFirst = assignEndingChunkFirst; + this.assignUnboundedChunkFirst = assignUnboundedChunkFirst; } public String getHostname() { @@ -226,8 +226,8 @@ public boolean isParseOnLineSchemaChanges() { return parseOnLineSchemaChanges; } - public boolean isAssignEndingChunkFirst() { - return assignEndingChunkFirst; + public boolean isAssignUnboundedChunkFirst() { + return assignUnboundedChunkFirst; } public Properties getDbzProperties() { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java index 3d176e910c4..427115edea7 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java @@ -73,7 +73,7 @@ public class MySqlSourceConfigFactory implements Serializable { private boolean parseOnLineSchemaChanges = false; private boolean treatTinyInt1AsBoolean = true; private boolean useLegacyJsonFormat = true; - private boolean assignEndingChunkFirst = false; + private boolean assignUnboundedChunkFirst = false; public MySqlSourceConfigFactory hostname(String hostname) { this.hostname = hostname; @@ -316,10 +316,11 @@ public MySqlSourceConfigFactory treatTinyInt1AsBoolean(boolean treatTinyInt1AsBo } /** - * Whether to assign the ending chunk first during snapshot reading phase. Defaults to false. + * Whether to assign the unbounded chunks first during snapshot reading phase. Defaults to + * false. */ - public MySqlSourceConfigFactory assignEndingChunkFirst(boolean assignEndingChunkFirst) { - this.assignEndingChunkFirst = assignEndingChunkFirst; + public MySqlSourceConfigFactory assignUnboundedChunkFirst(boolean assignUnboundedChunkFirst) { + this.assignUnboundedChunkFirst = assignUnboundedChunkFirst; return this; } @@ -420,6 +421,6 @@ public MySqlSourceConfig createConfig(int subtaskId, String serverName) { parseOnLineSchemaChanges, treatTinyInt1AsBoolean, useLegacyJsonFormat, - assignEndingChunkFirst); + assignUnboundedChunkFirst); } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java index d973f29a93d..71e50d88326 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java @@ -280,10 +280,10 @@ public class MySqlSourceOptions { "Whether to use legacy json format. The default value is true, which means there is no whitespace before value and after comma in json format."); @Experimental - public static final ConfigOption SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST = + public static final ConfigOption SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST = ConfigOptions.key("scan.incremental.snapshot.unbounded-chunk-first.enabled") .booleanType() .defaultValue(false) .withDescription( - "Whether to assign the ending chunk first during snapshot reading phase. This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk. Defaults to false."); + "Whether to assign the unbounded chunks first during snapshot reading phase. This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk. Defaults to false."); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java index b4be2214104..f13a8a495d7 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java @@ -100,7 +100,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat final boolean skipSnapshotBackFill; final boolean parseOnlineSchemaChanges; private final boolean useLegacyJsonFormat; - private final boolean assignEndingChunkFirst; + private final boolean assignUnboundedChunkFirst; // -------------------------------------------------------------------------------------------- // Mutable attributes @@ -141,7 +141,7 @@ public MySqlTableSource( boolean skipSnapshotBackFill, boolean parseOnlineSchemaChanges, boolean useLegacyJsonFormat, - boolean assignEndingChunkFirst) { + boolean assignUnboundedChunkFirst) { this.physicalSchema = physicalSchema; this.port = port; this.hostname = checkNotNull(hostname); @@ -173,7 +173,7 @@ public MySqlTableSource( this.chunkKeyColumn = chunkKeyColumn; this.skipSnapshotBackFill = skipSnapshotBackFill; this.useLegacyJsonFormat = useLegacyJsonFormat; - this.assignEndingChunkFirst = assignEndingChunkFirst; + this.assignUnboundedChunkFirst = assignUnboundedChunkFirst; } @Override @@ -231,7 +231,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { .skipSnapshotBackfill(skipSnapshotBackFill) .parseOnLineSchemaChanges(parseOnlineSchemaChanges) .useLegacyJsonFormat(useLegacyJsonFormat) - .assignEndingChunkFirst(assignEndingChunkFirst) + .assignUnboundedChunkFirst(assignUnboundedChunkFirst) .build(); return SourceProvider.of(parallelSource); } else { @@ -320,7 +320,7 @@ public DynamicTableSource copy() { skipSnapshotBackFill, parseOnlineSchemaChanges, useLegacyJsonFormat, - assignEndingChunkFirst); + assignUnboundedChunkFirst); source.metadataKeys = metadataKeys; source.producedDataType = producedDataType; return source; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java index a7ccb40550e..8839290698a 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java @@ -105,8 +105,8 @@ public DynamicTableSource createDynamicTableSource(Context context) { boolean parseOnLineSchemaChanges = config.get(MySqlSourceOptions.PARSE_ONLINE_SCHEMA_CHANGES); boolean useLegacyJsonFormat = config.get(MySqlSourceOptions.USE_LEGACY_JSON_FORMAT); - boolean assignEndingChunkFirst = - config.get(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST); + boolean assignUnboundedChunkFirst = + config.get(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST); if (enableParallelRead) { validatePrimaryKeyIfEnableParallel(physicalSchema, chunkKeyColumn); @@ -153,7 +153,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { skipSnapshotBackFill, parseOnLineSchemaChanges, useLegacyJsonFormat, - assignEndingChunkFirst); + assignUnboundedChunkFirst); } @Override @@ -201,7 +201,7 @@ public Set> optionalOptions() { options.add(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP); options.add(MySqlSourceOptions.PARSE_ONLINE_SCHEMA_CHANGES); options.add(MySqlSourceOptions.USE_LEGACY_JSON_FORMAT); - options.add(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST); + options.add(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST); return options; } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitterTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitterTest.java index e26244a1248..b04fa61ad73 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitterTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitterTest.java @@ -43,7 +43,7 @@ public void testSplitEvenlySizedChunksOverflow() { .username("") .password("") .serverTimeZone(ZoneId.of("UTC").toString()) - .assignEndingChunkFirst(false) + .assignUnboundedChunkFirst(false) .createConfig(0); MySqlChunkSplitter splitter = new MySqlChunkSplitter(null, sourceConfig); List res = @@ -70,7 +70,7 @@ public void testSplitEvenlySizedChunksNormal() { .username("") .password("") .serverTimeZone(ZoneId.of("UTC").toString()) - .assignEndingChunkFirst(false) + .assignUnboundedChunkFirst(false) .createConfig(0); MySqlChunkSplitter splitter = new MySqlChunkSplitter(null, sourceConfig); List res = @@ -98,7 +98,7 @@ public void testSplitEvenlySizedChunksEndingFirst() { .username("") .password("") .serverTimeZone(ZoneId.of("UTC").toString()) - .assignEndingChunkFirst(true) + .assignUnboundedChunkFirst(true) .createConfig(0); MySqlChunkSplitter splitter = new MySqlChunkSplitter(null, sourceConfig); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java index db7c4bc3fff..deb06116c08 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java @@ -52,10 +52,10 @@ import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECT_TIMEOUT; import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.HEARTBEAT_INTERVAL; import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.PARSE_ONLINE_SCHEMA_CHANGES; -import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST; import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP; import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE; import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED; +import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST; import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE; import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.USE_LEGACY_JSON_FORMAT; import static org.apache.flink.core.testutils.FlinkMatchers.containsMessage; @@ -133,7 +133,7 @@ public void testCommonProperties() { SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(), PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), USE_LEGACY_JSON_FORMAT.defaultValue(), - SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST.defaultValue()); + SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue()); assertEquals(expectedSource, actualSource); } @@ -182,7 +182,7 @@ public void testEnableParallelReadSource() { SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(), PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), USE_LEGACY_JSON_FORMAT.defaultValue(), - SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST.defaultValue()); + SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue()); assertEquals(expectedSource, actualSource); } @@ -227,7 +227,7 @@ public void testEnableParallelReadSourceWithSingleServerId() { SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(), PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), USE_LEGACY_JSON_FORMAT.defaultValue(), - SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST.defaultValue()); + SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue()); assertEquals(expectedSource, actualSource); } @@ -270,7 +270,7 @@ public void testEnableParallelReadSourceLatestOffset() { SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(), PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), USE_LEGACY_JSON_FORMAT.defaultValue(), - SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST.defaultValue()); + SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue()); assertEquals(expectedSource, actualSource); } @@ -330,7 +330,7 @@ public void testOptionalProperties() { true, PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), true, - SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST.defaultValue()); + SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue()); assertEquals(expectedSource, actualSource); assertTrue(actualSource instanceof MySqlTableSource); MySqlTableSource actualMySqlTableSource = (MySqlTableSource) actualSource; @@ -387,7 +387,7 @@ public void testStartupFromSpecificOffset() { SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(), PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), USE_LEGACY_JSON_FORMAT.defaultValue(), - SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST.defaultValue()); + SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue()); assertEquals(expectedSource, actualSource); } @@ -428,7 +428,7 @@ public void testStartupFromInitial() { SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(), PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), USE_LEGACY_JSON_FORMAT.defaultValue(), - SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST.defaultValue()); + SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue()); assertEquals(expectedSource, actualSource); } @@ -470,7 +470,7 @@ public void testStartupFromEarliestOffset() { SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(), PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), USE_LEGACY_JSON_FORMAT.defaultValue(), - SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST.defaultValue()); + SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue()); assertEquals(expectedSource, actualSource); } @@ -513,7 +513,7 @@ public void testStartupFromSpecificTimestamp() { SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(), PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), USE_LEGACY_JSON_FORMAT.defaultValue(), - SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST.defaultValue()); + SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue()); assertEquals(expectedSource, actualSource); } @@ -554,7 +554,7 @@ public void testStartupFromLatestOffset() { SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(), PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), USE_LEGACY_JSON_FORMAT.defaultValue(), - SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST.defaultValue()); + SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue()); assertEquals(expectedSource, actualSource); } @@ -600,7 +600,7 @@ public void testMetadataColumns() { SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(), PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), USE_LEGACY_JSON_FORMAT.defaultValue(), - SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST.defaultValue()); + SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue()); expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType(); expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name"); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceBuilder.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceBuilder.java index 66ebd83995c..97bc18c7dda 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceBuilder.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceBuilder.java @@ -250,11 +250,11 @@ public OracleSourceBuilder scanNewlyAddedTableEnabled(boolean scanNewlyAddedT } /** - * Whether the {@link OracleIncrementalSource} should assign the ending chunk first or not + * Whether the {@link OracleIncrementalSource} should assign the unbounded chunks first or not * during snapshot reading phase. */ - public OracleSourceBuilder assignEndingChunkFirst(boolean assignEndingChunkFirst) { - this.configFactory.assignEndingChunkFirst(assignEndingChunkFirst); + public OracleSourceBuilder assignUnboundedChunkFirst(boolean assignUnboundedChunkFirst) { + this.configFactory.assignUnboundedChunkFirst(assignUnboundedChunkFirst); return this; } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/config/OracleSourceConfig.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/config/OracleSourceConfig.java index 25456e102df..26152d52c09 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/config/OracleSourceConfig.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/config/OracleSourceConfig.java @@ -65,7 +65,7 @@ public OracleSourceConfig( String chunkKeyColumn, boolean skipSnapshotBackfill, boolean scanNewlyAddedTableEnabled, - boolean assignEndingChunkFirst) { + boolean assignUnboundedChunkFirst) { super( startupOptions, databaseList, @@ -92,7 +92,7 @@ public OracleSourceConfig( chunkKeyColumn, skipSnapshotBackfill, scanNewlyAddedTableEnabled, - assignEndingChunkFirst); + assignUnboundedChunkFirst); this.url = url; } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/config/OracleSourceConfigFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/config/OracleSourceConfigFactory.java index 2899641db97..a255b380626 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/config/OracleSourceConfigFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/config/OracleSourceConfigFactory.java @@ -135,6 +135,6 @@ public OracleSourceConfig create(int subtaskId) { chunkKeyColumn, skipSnapshotBackfill, scanNewlyAddedTableEnabled, - assignEndingChunkFirst); + assignUnboundedChunkFirst); } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSource.java index d86eb439dda..d9a0ab87e33 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSource.java @@ -80,7 +80,7 @@ public class OracleTableSource implements ScanTableSource, SupportsReadingMetada private final boolean closeIdleReaders; private final boolean skipSnapshotBackfill; private final boolean scanNewlyAddedTableEnabled; - private final boolean assignEndingChunkFirst; + private final boolean assignUnboundedChunkFirst; // -------------------------------------------------------------------------------------------- // Mutable attributes @@ -117,7 +117,7 @@ public OracleTableSource( boolean closeIdleReaders, boolean skipSnapshotBackfill, boolean scanNewlyAddedTableEnabled, - boolean assignEndingChunkFirst) { + boolean assignUnboundedChunkFirst) { this.physicalSchema = physicalSchema; this.url = url; this.port = port; @@ -144,7 +144,7 @@ public OracleTableSource( this.closeIdleReaders = closeIdleReaders; this.skipSnapshotBackfill = skipSnapshotBackfill; this.scanNewlyAddedTableEnabled = scanNewlyAddedTableEnabled; - this.assignEndingChunkFirst = assignEndingChunkFirst; + this.assignUnboundedChunkFirst = assignUnboundedChunkFirst; } @Override @@ -194,7 +194,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { .skipSnapshotBackfill(skipSnapshotBackfill) .chunkKeyColumn(chunkKeyColumn) .scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled) - .assignEndingChunkFirst(assignEndingChunkFirst) + .assignUnboundedChunkFirst(assignUnboundedChunkFirst) .build(); return SourceProvider.of(oracleChangeEventSource); @@ -262,7 +262,7 @@ public DynamicTableSource copy() { closeIdleReaders, skipSnapshotBackfill, scanNewlyAddedTableEnabled, - assignEndingChunkFirst); + assignUnboundedChunkFirst); source.metadataKeys = metadataKeys; source.producedDataType = producedDataType; return source; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSourceFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSourceFactory.java index 72418e70f26..279dafe8197 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSourceFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSourceFactory.java @@ -109,7 +109,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { boolean closeIdlerReaders = config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED); boolean skipSnapshotBackfill = config.get(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP); boolean scanNewlyAddedTableEnabled = config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED); - boolean assignEndingChunkFirst = + boolean assignUnboundedChunkFirst = config.get(SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST); if (enableParallelRead) { @@ -149,7 +149,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { closeIdlerReaders, skipSnapshotBackfill, scanNewlyAddedTableEnabled, - assignEndingChunkFirst); + assignUnboundedChunkFirst); } @Override diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java index b97626450c5..5ce690f0a3e 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java @@ -275,11 +275,11 @@ public PostgresSourceBuilder scanNewlyAddedTableEnabled(boolean scanNewlyAdde } /** - * Whether the {@link PostgresSourceEnumerator} should assign the ending chunk first or not + * Whether the {@link PostgresSourceEnumerator} should assign the unbounded chunks first or not * during snapshot reading phase. */ - public PostgresSourceBuilder assignEndingChunkFirst(boolean assignEndingChunkFirst) { - this.configFactory.assignEndingChunkFirst(assignEndingChunkFirst); + public PostgresSourceBuilder assignUnboundedChunkFirst(boolean assignUnboundedChunkFirst) { + this.configFactory.assignUnboundedChunkFirst(assignUnboundedChunkFirst); return this; } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java index 9455db5ffe7..2be7e2637df 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java @@ -67,7 +67,7 @@ public PostgresSourceConfig( boolean skipSnapshotBackfill, boolean isScanNewlyAddedTableEnabled, int lsnCommitCheckpointsDelay, - boolean assignEndingChunkFirst) { + boolean assignUnboundedChunkFirst) { super( startupOptions, databaseList, @@ -94,7 +94,7 @@ public PostgresSourceConfig( chunkKeyColumn, skipSnapshotBackfill, isScanNewlyAddedTableEnabled, - assignEndingChunkFirst); + assignUnboundedChunkFirst); this.subtaskId = subtaskId; this.lsnCommitCheckpointsDelay = lsnCommitCheckpointsDelay; } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java index 0917309112f..69dea146e5c 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java @@ -133,7 +133,7 @@ public PostgresSourceConfig create(int subtaskId) { skipSnapshotBackfill, scanNewlyAddedTableEnabled, lsnCommitCheckpointsDelay, - assignEndingChunkFirst); + assignUnboundedChunkFirst); } /** diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactory.java index 7ad15f530a1..e6d4cd1ffff 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactory.java @@ -117,7 +117,7 @@ public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context c boolean skipSnapshotBackfill = config.get(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP); boolean isScanNewlyAddedTableEnabled = config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED); int lsnCommitCheckpointsDelay = config.get(SCAN_LSN_COMMIT_CHECKPOINTS_DELAY); - boolean assignEndingChunkFirst = + boolean assignUnboundedChunkFirst = config.get(SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST); if (enableParallelRead) { @@ -165,7 +165,7 @@ public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context c skipSnapshotBackfill, isScanNewlyAddedTableEnabled, lsnCommitCheckpointsDelay, - assignEndingChunkFirst); + assignUnboundedChunkFirst); } @Override diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableSource.java index 58edda9ab38..95405df7e8c 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableSource.java @@ -85,7 +85,7 @@ public class PostgreSQLTableSource implements ScanTableSource, SupportsReadingMe private final boolean skipSnapshotBackfill; private final boolean scanNewlyAddedTableEnabled; private final int lsnCommitCheckpointsDelay; - private final boolean assignEndingChunkFirst; + private final boolean assignUnboundedChunkFirst; // -------------------------------------------------------------------------------------------- // Mutable attributes @@ -126,7 +126,7 @@ public PostgreSQLTableSource( boolean skipSnapshotBackfill, boolean isScanNewlyAddedTableEnabled, int lsnCommitCheckpointsDelay, - boolean assignEndingChunkFirst) { + boolean assignUnboundedChunkFirst) { this.physicalSchema = physicalSchema; this.port = port; this.hostname = checkNotNull(hostname); @@ -158,7 +158,7 @@ public PostgreSQLTableSource( this.skipSnapshotBackfill = skipSnapshotBackfill; this.scanNewlyAddedTableEnabled = isScanNewlyAddedTableEnabled; this.lsnCommitCheckpointsDelay = lsnCommitCheckpointsDelay; - this.assignEndingChunkFirst = assignEndingChunkFirst; + this.assignUnboundedChunkFirst = assignUnboundedChunkFirst; } @Override @@ -221,7 +221,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { .skipSnapshotBackfill(skipSnapshotBackfill) .scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled) .lsnCommitCheckpointsDelay(lsnCommitCheckpointsDelay) - .assignEndingChunkFirst(assignEndingChunkFirst) + .assignUnboundedChunkFirst(assignUnboundedChunkFirst) .build(); return SourceProvider.of(parallelSource); } else { @@ -291,7 +291,7 @@ public DynamicTableSource copy() { skipSnapshotBackfill, scanNewlyAddedTableEnabled, lsnCommitCheckpointsDelay, - assignEndingChunkFirst); + assignUnboundedChunkFirst); source.metadataKeys = metadataKeys; source.producedDataType = producedDataType; return source; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/MockPostgreSQLTableSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/MockPostgreSQLTableSource.java index fd39a942a18..b19f206f544 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/MockPostgreSQLTableSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/MockPostgreSQLTableSource.java @@ -66,7 +66,7 @@ public MockPostgreSQLTableSource(PostgreSQLTableSource postgreSQLTableSource) { (boolean) get(postgreSQLTableSource, "skipSnapshotBackfill"), (boolean) get(postgreSQLTableSource, "scanNewlyAddedTableEnabled"), (int) get(postgreSQLTableSource, "lsnCommitCheckpointsDelay"), - (boolean) get(postgreSQLTableSource, "assignEndingChunkFirst")); + (boolean) get(postgreSQLTableSource, "assignUnboundedChunkFirst")); } @Override diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerSourceBuilder.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerSourceBuilder.java index 4f6e0694ac2..64141f05b51 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerSourceBuilder.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerSourceBuilder.java @@ -230,11 +230,11 @@ public SqlServerSourceBuilder skipSnapshotBackfill(boolean skipSnapshotBackfi } /** - * Whether the {@link SqlServerSourceBuilder} should assign the ending chunk first or not during - * snapshot reading phase. + * Whether the {@link SqlServerSourceBuilder} should assign the unbounded chunks first or not + * during snapshot reading phase. */ - public SqlServerSourceBuilder assignEndingChunkFirst(boolean assignEndingChunkFirst) { - this.configFactory.assignEndingChunkFirst(assignEndingChunkFirst); + public SqlServerSourceBuilder assignUnboundedChunkFirst(boolean assignUnboundedChunkFirst) { + this.configFactory.assignUnboundedChunkFirst(assignUnboundedChunkFirst); return this; } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/config/SqlServerSourceConfig.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/config/SqlServerSourceConfig.java index 962fd87ccb2..356b6a0135a 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/config/SqlServerSourceConfig.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/config/SqlServerSourceConfig.java @@ -57,7 +57,7 @@ public SqlServerSourceConfig( int connectionPoolSize, String chunkKeyColumn, boolean skipSnapshotBackfill, - boolean assignEndingChunkFirst) { + boolean assignUnboundedChunkFirst) { super( startupOptions, databaseList, @@ -84,7 +84,7 @@ public SqlServerSourceConfig( chunkKeyColumn, skipSnapshotBackfill, false, - assignEndingChunkFirst); + assignUnboundedChunkFirst); } @Override diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/config/SqlServerSourceConfigFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/config/SqlServerSourceConfigFactory.java index 0d5fe4bd5ed..661094a8942 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/config/SqlServerSourceConfigFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/config/SqlServerSourceConfigFactory.java @@ -104,6 +104,6 @@ public SqlServerSourceConfig create(int subtask) { connectionPoolSize, chunkKeyColumn, skipSnapshotBackfill, - assignEndingChunkFirst); + assignUnboundedChunkFirst); } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/table/SqlServerTableFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/table/SqlServerTableFactory.java index c71d82cc6f9..3a43fba8a4d 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/table/SqlServerTableFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/table/SqlServerTableFactory.java @@ -140,7 +140,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { config.getOptional(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN).orElse(null); boolean closeIdleReaders = config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED); boolean skipSnapshotBackfill = config.get(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP); - boolean assignEndingChunkFirst = + boolean assignUnboundedChunkFirst = config.get(SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST); if (enableParallelRead) { @@ -178,7 +178,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { chunkKeyColumn, closeIdleReaders, skipSnapshotBackfill, - assignEndingChunkFirst); + assignUnboundedChunkFirst); } @Override diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/table/SqlServerTableSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/table/SqlServerTableSource.java index d95ad492f79..d43846192ce 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/table/SqlServerTableSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/table/SqlServerTableSource.java @@ -79,7 +79,7 @@ public class SqlServerTableSource implements ScanTableSource, SupportsReadingMet private final String chunkKeyColumn; private final boolean closeIdleReaders; private final boolean skipSnapshotBackfill; - private final boolean assignEndingChunkFirst; + private final boolean assignUnboundedChunkFirst; // -------------------------------------------------------------------------------------------- // Mutable attributes @@ -114,7 +114,7 @@ public SqlServerTableSource( @Nullable String chunkKeyColumn, boolean closeIdleReaders, boolean skipSnapshotBackfill, - boolean assignEndingChunkFirst) { + boolean assignUnboundedChunkFirst) { this.physicalSchema = physicalSchema; this.port = port; this.hostname = checkNotNull(hostname); @@ -139,7 +139,7 @@ public SqlServerTableSource( this.chunkKeyColumn = chunkKeyColumn; this.closeIdleReaders = closeIdleReaders; this.skipSnapshotBackfill = skipSnapshotBackfill; - this.assignEndingChunkFirst = assignEndingChunkFirst; + this.assignUnboundedChunkFirst = assignUnboundedChunkFirst; } @Override @@ -188,7 +188,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { .chunkKeyColumn(chunkKeyColumn) .closeIdleReaders(closeIdleReaders) .skipSnapshotBackfill(skipSnapshotBackfill) - .assignEndingChunkFirst(assignEndingChunkFirst) + .assignUnboundedChunkFirst(assignUnboundedChunkFirst) .build(); return SourceProvider.of(sqlServerChangeEventSource); } else { @@ -250,7 +250,7 @@ public DynamicTableSource copy() { chunkKeyColumn, closeIdleReaders, skipSnapshotBackfill, - assignEndingChunkFirst); + assignUnboundedChunkFirst); source.metadataKeys = metadataKeys; source.producedDataType = producedDataType; return source;