diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java index 5fba71c6..0a26baf7 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java @@ -167,7 +167,8 @@ public SourceReader createReader(SourceReaderContext reade new KinesisStreamsRecordEmitter<>(deserializationSchema); return new KinesisStreamsSourceReader<>( new SingleThreadFetcherManager<>( - getKinesisShardSplitReaderSupplier(sourceConfig, shardMetricGroupMap)), + getKinesisShardSplitReaderSupplier(sourceConfig, shardMetricGroupMap), + sourceConfig), recordEmitter, sourceConfig, readerContext,