Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-37120][minor] Rename ending chunk to unbounded chunk #3938

Merged
merged 1 commit into from
Mar 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/content.zh/docs/connectors/flink-sources/db2-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ Db2 server.
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>
Whether to assign the unbounded chunk first during snapshot reading phase.<br>
Whether to assign the unbounded chunks first during snapshot reading phase.<br>
This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk.<br>
Experimental option, defaults to false.
</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ Connector Options
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>
Whether to assign the unbounded chunk first during snapshot reading phase.<br>
Whether to assign the unbounded chunks first during snapshot reading phase.<br>
This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk.<br>
Experimental option, defaults to false.
</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ Connector Options
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>
Whether to assign the unbounded chunk first during snapshot reading phase.<br>
Whether to assign the unbounded chunks first during snapshot reading phase.<br>
This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk.<br>
Experimental option, defaults to false.
</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ Connector Options
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>
Whether to assign the unbounded chunk first during snapshot reading phase.<br>
Whether to assign the unbounded chunks first during snapshot reading phase.<br>
This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk.<br>
Experimental option, defaults to false.
</td>
Expand Down
2 changes: 1 addition & 1 deletion docs/content/docs/connectors/flink-sources/db2-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ Db2 server.
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>
Whether to assign the unbounded chunk first during snapshot reading phase.<br>
Whether to assign the unbounded chunks first during snapshot reading phase.<br>
This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk.<br>
Experimental option, defaults to false.
</td>
Expand Down
2 changes: 1 addition & 1 deletion docs/content/docs/connectors/flink-sources/mongodb-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ Connector Options
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>
Whether to assign the unbounded chunk first during snapshot reading phase.<br>
Whether to assign the unbounded chunks first during snapshot reading phase.<br>
This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk.<br>
Experimental option, defaults to false.
</td>
Expand Down
2 changes: 1 addition & 1 deletion docs/content/docs/connectors/flink-sources/mysql-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ During a snapshot operation, the connector will query each included table to pro
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>
Whether to assign the unbounded chunk first during snapshot reading phase.<br>
Whether to assign the unbounded chunks first during snapshot reading phase.<br>
This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk.<br>
Experimental option, defaults to false.
</td>
Expand Down
2 changes: 1 addition & 1 deletion docs/content/docs/connectors/flink-sources/oracle-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ Connector Options
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>
Whether to assign the unbounded chunk first during snapshot reading phase.<br>
Whether to assign the unbounded chunks first during snapshot reading phase.<br>
This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk.<br>
Experimental option, defaults to false.
</td>
Expand Down
2 changes: 1 addition & 1 deletion docs/content/docs/connectors/flink-sources/postgres-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ SELECT * FROM shipments;
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>
Whether to assign the unbounded chunk first during snapshot reading phase.<br>
Whether to assign the unbounded chunks first during snapshot reading phase.<br>
This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk.<br>
Experimental option, defaults to false.
</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ Connector Options
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>
Whether to assign the unbounded chunk first during snapshot reading phase.<br>
Whether to assign the unbounded chunks first during snapshot reading phase.<br>
This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk.<br>
Experimental option, defaults to false.
</td>
Expand Down
2 changes: 1 addition & 1 deletion docs/content/docs/connectors/pipeline-connectors/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ pipeline:
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>
Whether to assign the unbounded chunk first during snapshot reading phase.<br>
Whether to assign the unbounded chunks first during snapshot reading phase.<br>
This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk.<br>
Experimental option, defaults to false.
</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ public DataSource createDataSource(Context context) {
config.get(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED);
boolean isParsingOnLineSchemaChanges = config.get(PARSE_ONLINE_SCHEMA_CHANGES);
boolean useLegacyJsonFormat = config.get(USE_LEGACY_JSON_FORMAT);
boolean isAssignEndingChunkFirst =
boolean isAssignUnboundedChunkFirst =
config.get(SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST);

validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
Expand Down Expand Up @@ -205,7 +205,7 @@ public DataSource createDataSource(Context context) {
.parseOnLineSchemaChanges(isParsingOnLineSchemaChanges)
.treatTinyInt1AsBoolean(treatTinyInt1AsBoolean)
.useLegacyJsonFormat(useLegacyJsonFormat)
.assignEndingChunkFirst(isAssignEndingChunkFirst);
.assignUnboundedChunkFirst(isAssignUnboundedChunkFirst);

List<TableId> tableIds = MySqlSchemaUtils.listTables(configFactory.createConfig(0), null);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,5 +320,5 @@ public class MySqlDataSourceOptions {
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to assign the ending chunk first during snapshot reading phase. This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk. Defaults to false.");
"Whether to assign the unbounded chunks first during snapshot reading phase. This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk. Defaults to false.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ public void testOptionalOption() {
MySqlDataSource dataSource = (MySqlDataSource) factory.createDataSource(context);
assertThat(dataSource.getSourceConfig().isTreatTinyInt1AsBoolean()).isFalse();
assertThat(dataSource.getSourceConfig().isParseOnLineSchemaChanges()).isTrue();
assertThat(dataSource.getSourceConfig().isAssignEndingChunkFirst()).isTrue();
assertThat(dataSource.getSourceConfig().isAssignUnboundedChunkFirst()).isTrue();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public abstract class BaseSourceConfig implements SourceConfig {
protected final boolean closeIdleReaders;
protected final boolean skipSnapshotBackfill;
protected final boolean isScanNewlyAddedTableEnabled;
protected final boolean assignEndingChunkFirst;
protected final boolean assignUnboundedChunkFirst;

// --------------------------------------------------------------------------------------------
// Debezium Configurations
Expand All @@ -58,7 +58,7 @@ public BaseSourceConfig(
boolean isScanNewlyAddedTableEnabled,
Properties dbzProperties,
Configuration dbzConfiguration,
boolean assignEndingChunkFirst) {
boolean assignUnboundedChunkFirst) {
this.startupOptions = startupOptions;
this.splitSize = splitSize;
this.splitMetaGroupSize = splitMetaGroupSize;
Expand All @@ -70,7 +70,7 @@ public BaseSourceConfig(
this.isScanNewlyAddedTableEnabled = isScanNewlyAddedTableEnabled;
this.dbzProperties = dbzProperties;
this.dbzConfiguration = dbzConfiguration;
this.assignEndingChunkFirst = assignEndingChunkFirst;
this.assignUnboundedChunkFirst = assignUnboundedChunkFirst;
}

@Override
Expand Down Expand Up @@ -120,7 +120,7 @@ public boolean isSkipSnapshotBackfill() {
}

@Override
public boolean isAssignEndingChunkFirst() {
return assignEndingChunkFirst;
public boolean isAssignUnboundedChunkFirst() {
return assignUnboundedChunkFirst;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public JdbcSourceConfig(
String chunkKeyColumn,
boolean skipSnapshotBackfill,
boolean isScanNewlyAddedTableEnabled,
boolean assignEndingChunkFirst) {
boolean assignUnboundedChunkFirst) {
super(
startupOptions,
splitSize,
Expand All @@ -87,7 +87,7 @@ public JdbcSourceConfig(
isScanNewlyAddedTableEnabled,
dbzProperties,
dbzConfiguration,
assignEndingChunkFirst);
assignUnboundedChunkFirst);
this.driverClassName = driverClassName;
this.hostname = hostname;
this.port = port;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public abstract class JdbcSourceConfigFactory implements Factory<JdbcSourceConfi
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue();
protected boolean scanNewlyAddedTableEnabled =
JdbcSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue();
protected boolean assignEndingChunkFirst =
protected boolean assignUnboundedChunkFirst =
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST.defaultValue();

/** Integer port number of the database server. */
Expand Down Expand Up @@ -255,10 +255,11 @@ public JdbcSourceConfigFactory scanNewlyAddedTableEnabled(boolean scanNewlyAdded
}

/**
* Whether to assign the ending chunk first during snapshot reading phase. Defaults to false.
* Whether to assign the unbounded chunks first during snapshot reading phase. Defaults to
* false.
*/
public JdbcSourceConfigFactory assignEndingChunkFirst(boolean assignEndingChunkFirst) {
this.assignEndingChunkFirst = assignEndingChunkFirst;
public JdbcSourceConfigFactory assignUnboundedChunkFirst(boolean assignUnboundedChunkFirst) {
this.assignUnboundedChunkFirst = assignUnboundedChunkFirst;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public interface SourceConfig extends Serializable {

boolean isScanNewlyAddedTableEnabled();

boolean isAssignEndingChunkFirst();
boolean isAssignUnboundedChunkFirst();

/** Factory for the {@code SourceConfig}. */
@FunctionalInterface
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,5 +144,5 @@ public class SourceOptions {
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to assign the ending chunk first during snapshot reading phase. This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk. Defaults to false.");
"Whether to assign the unbounded chunks first during snapshot reading phase. This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk. Defaults to false.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -469,10 +469,11 @@ private List<ChunkRange> splitEvenlySizedChunks(
break;
}
}
// add the ending split
// assign ending split first, both the largest and smallest unbounded chunks are completed
// add the unbounded split
// assign unbounded split first, both the largest and smallest unbounded chunks are
// completed
// in the first two splits
if (sourceConfig.isAssignEndingChunkFirst()) {
if (sourceConfig.isAssignUnboundedChunkFirst()) {
splits.add(0, ChunkRange.of(chunkStart, null));
} else {
splits.add(ChunkRange.of(chunkStart, null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,11 +233,11 @@ public Db2SourceBuilder<T> skipSnapshotBackfill(boolean skipSnapshotBackfill) {
}

/**
* Whether the {@link Db2IncrementalSource} should assign the ending chunk first or not during
* snapshot reading phase.
* Whether the {@link Db2IncrementalSource} should assign the unbounded chunks first or not
* during snapshot reading phase.
*/
public Db2SourceBuilder<T> assignEndingChunkFirst(boolean assignEndingChunkFirst) {
this.configFactory.assignEndingChunkFirst(assignEndingChunkFirst);
public Db2SourceBuilder<T> assignUnboundedChunkFirst(boolean assignUnboundedChunkFirst) {
this.configFactory.assignUnboundedChunkFirst(assignUnboundedChunkFirst);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public Db2SourceConfig(
int connectionPoolSize,
String chunkKeyColumn,
boolean skipSnapshotBackfill,
boolean assignEndingChunkFirst) {
boolean assignUnboundedChunkFirst) {
super(
startupOptions,
databaseList,
Expand All @@ -84,7 +84,7 @@ public Db2SourceConfig(
chunkKeyColumn,
skipSnapshotBackfill,
false,
assignEndingChunkFirst);
assignUnboundedChunkFirst);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,6 @@ public Db2SourceConfig create(int subtask) {
connectionPoolSize,
chunkKeyColumn,
skipSnapshotBackfill,
assignEndingChunkFirst);
assignUnboundedChunkFirst);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public class Db2TableSource implements ScanTableSource, SupportsReadingMetadata
private final String chunkKeyColumn;
private final boolean closeIdleReaders;
private final boolean skipSnapshotBackfill;
private final boolean assignEndingChunkFirst;
private final boolean assignUnboundedChunkFirst;

/** Metadata that is appended at the end of a physical source row. */
protected List<String> metadataKeys;
Expand Down Expand Up @@ -105,7 +105,7 @@ public Db2TableSource(
@Nullable String chunkKeyColumn,
boolean closeIdleReaders,
boolean skipSnapshotBackfill,
boolean assignEndingChunkFirst) {
boolean assignUnboundedChunkFirst) {
this.physicalSchema = physicalSchema;
this.port = port;
this.hostname = hostname;
Expand All @@ -130,7 +130,7 @@ public Db2TableSource(
this.chunkKeyColumn = chunkKeyColumn;
this.closeIdleReaders = closeIdleReaders;
this.skipSnapshotBackfill = skipSnapshotBackfill;
this.assignEndingChunkFirst = assignEndingChunkFirst;
this.assignUnboundedChunkFirst = assignUnboundedChunkFirst;
}

@Override
Expand Down Expand Up @@ -177,7 +177,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
.chunkKeyColumn(chunkKeyColumn)
.closeIdleReaders(closeIdleReaders)
.skipSnapshotBackfill(skipSnapshotBackfill)
.assignEndingChunkFirst(assignEndingChunkFirst)
.assignUnboundedChunkFirst(assignUnboundedChunkFirst)
.build();
return SourceProvider.of(db2ChangeEventSource);
} else {
Expand Down Expand Up @@ -239,7 +239,7 @@ public DynamicTableSource copy() {
chunkKeyColumn,
closeIdleReaders,
skipSnapshotBackfill,
assignEndingChunkFirst);
assignUnboundedChunkFirst);
source.metadataKeys = metadataKeys;
source.producedDataType = producedDataType;
return source;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {
config.getOptional(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN).orElse(null);
boolean closeIdleReaders = config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
boolean skipSnapshotBackfill = config.get(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
boolean assignEndingChunkFirst =
boolean assignUnboundedChunkFirst =
config.get(SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST);

if (enableParallelRead) {
Expand Down Expand Up @@ -177,7 +177,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {
chunkKeyColumn,
closeIdleReaders,
skipSnapshotBackfill,
assignEndingChunkFirst);
assignUnboundedChunkFirst);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,11 +269,11 @@ public MongoDBSourceBuilder<T> scanNewlyAddedTableEnabled(boolean scanNewlyAdded
}

/**
* Whether the {@link MongoDBSource} should assign the ending chunk first or not during snapshot
* reading phase.
* Whether the {@link MongoDBSource} should assign the unbounded chunks first or not during
* snapshot reading phase.
*/
public MongoDBSourceBuilder<T> assignEndingChunkFirst(boolean assignEndingChunkFirst) {
this.configFactory.assignEndingChunkFirst(assignEndingChunkFirst);
public MongoDBSourceBuilder<T> assignUnboundedChunkFirst(boolean assignUnboundedChunkFirst) {
this.configFactory.assignUnboundedChunkFirst(assignUnboundedChunkFirst);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public Collection<SnapshotSplit> split(SplitContext splitContext) {
ChunkUtils.maxUpperBoundOfId(),
null,
schema);
if (splitContext.isAssignEndingChunkFirst()) {
if (splitContext.isAssignUnboundedChunkFirst()) {
snapshotSplits.add(0, lastSplit);
} else {
snapshotSplits.add(lastSplit);
Expand Down
Loading
Loading