From 4718dc93f149141618ba5919d2f081d04571910c Mon Sep 17 00:00:00 2001 From: Shuyi Chen Date: Tue, 25 Feb 2025 23:50:30 -0800 Subject: [PATCH] [FLINK-37366] Allow configurable retry for Kafka topic metadata fetch --- .../kafka/source/KafkaSourceOptions.java | 14 +++++ .../enumerator/KafkaSourceEnumerator.java | 2 +- .../subscriber/KafkaSubscriber.java | 5 +- .../subscriber/KafkaSubscriberUtils.java | 58 +++++++++++++++---- .../subscriber/PartitionSetSubscriber.java | 6 +- .../subscriber/TopicListSubscriber.java | 6 +- .../subscriber/TopicPatternSubscriber.java | 6 +- .../kafka/source/KafkaSourceBuilderTest.java | 4 +- .../kafka/source/KafkaSourceTest.java | 5 +- .../subscriber/KafkaSubscriberTest.java | 13 +++-- 10 files changed, 93 insertions(+), 26 deletions(-) diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceOptions.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceOptions.java index f96cd3ea0..b3f754bfb 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceOptions.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceOptions.java @@ -58,6 +58,20 @@ public class KafkaSourceOptions { .defaultValue(true) .withDescription("Whether to commit consuming offset on checkpoint."); + public static final ConfigOption TOPIC_METADATA_REQUEST_MAX_RETRY = + ConfigOptions.key("topic.metadata.max.retry") + .intType() + .defaultValue(0) + .withDescription( + "Max number of retries for topic metadata request before failing the source."); + + public static final ConfigOption TOPIC_METADATA_REQUEST_RETRY_INTERVAL_MS = + ConfigOptions.key("topic.metadata.retry.interval.ms") + .longType() + .defaultValue(Duration.ofSeconds(30).toMillis()) + .withDescription( + "The interval in milliseconds between retries for topic metadata request."); + @SuppressWarnings("unchecked") public static T getOption( Properties props, ConfigOption configOption, Function parser) { diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java index 10025fa2a..ad1d8d225 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java @@ -230,7 +230,7 @@ public void close() { * @return Set of subscribed {@link TopicPartition}s */ private Set getSubscribedTopicPartitions() { - return subscriber.getSubscribedTopicPartitions(adminClient); + return subscriber.getSubscribedTopicPartitions(adminClient, properties); } /** diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriber.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriber.java index 37de884af..d1cc1be3d 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriber.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriber.java @@ -25,6 +25,7 @@ import java.io.Serializable; import java.util.List; +import java.util.Properties; import java.util.Set; import java.util.regex.Pattern; @@ -51,9 +52,11 @@ public interface KafkaSubscriber extends Serializable { * Get a set of subscribed {@link TopicPartition}s. * * @param adminClient The admin client used to retrieve subscribed topic partitions. + * @param properties The properties for the configuration. * @return A set of subscribed {@link TopicPartition}s */ - Set getSubscribedTopicPartitions(AdminClient adminClient); + Set getSubscribedTopicPartitions( + AdminClient adminClient, Properties properties); // ----------------- factory methods -------------- diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberUtils.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberUtils.java index 72e7f64d0..ed82d5349 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberUtils.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberUtils.java @@ -18,37 +18,45 @@ package org.apache.flink.connector.kafka.source.enumerator.subscriber; +import org.apache.flink.connector.kafka.source.KafkaSourceOptions; + import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.TopicDescription; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Map; +import java.util.Properties; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; import java.util.stream.Collectors; /** The base implementations of {@link KafkaSubscriber}. */ class KafkaSubscriberUtils { + private static final Logger LOG = LoggerFactory.getLogger(KafkaSubscriberUtils.class); private KafkaSubscriberUtils() {} - static Map getAllTopicMetadata(AdminClient adminClient) { + static Map getAllTopicMetadata( + AdminClient adminClient, Properties properties) { try { Set allTopicNames = adminClient.listTopics().names().get(); - return getTopicMetadata(adminClient, allTopicNames); + return getTopicMetadata(adminClient, allTopicNames, properties); } catch (Exception e) { throw new RuntimeException("Failed to get metadata for all topics.", e); } } static Map getTopicMetadata( - AdminClient adminClient, Pattern topicPattern) { + AdminClient adminClient, Pattern topicPattern, Properties properties) { try { Set allTopicNames = adminClient.listTopics().names().get(); Set matchedTopicNames = allTopicNames.stream() .filter(name -> topicPattern.matcher(name).matches()) .collect(Collectors.toSet()); - return getTopicMetadata(adminClient, matchedTopicNames); + return getTopicMetadata(adminClient, matchedTopicNames, properties); } catch (Exception e) { throw new RuntimeException( String.format("Failed to get metadata for %s topics.", topicPattern.pattern()), @@ -57,12 +65,42 @@ static Map getTopicMetadata( } static Map getTopicMetadata( - AdminClient adminClient, Set topicNames) { - try { - return adminClient.describeTopics(topicNames).allTopicNames().get(); - } catch (Exception e) { - throw new RuntimeException( - String.format("Failed to get metadata for topics %s.", topicNames), e); + AdminClient adminClient, Set topicNames, Properties properties) { + int maxRetries = + KafkaSourceOptions.getOption( + properties, + KafkaSourceOptions.TOPIC_METADATA_REQUEST_MAX_RETRY, + Integer::parseInt); + long retryDelay = + KafkaSourceOptions.getOption( + properties, + KafkaSourceOptions.TOPIC_METADATA_REQUEST_RETRY_INTERVAL_MS, + Long::parseLong); + for (int attempt = 0; attempt <= maxRetries; attempt++) { + try { + return adminClient.describeTopics(topicNames).allTopicNames().get(); + } catch (Exception e) { + if (attempt == maxRetries) { + throw new RuntimeException( + String.format("Failed to get metadata for topics %s.", topicNames), e); + } else { + LOG.warn( + "Attempt {} to get metadata for topics {} failed. Retrying in {} ms...", + attempt, + topicNames, + retryDelay); + try { + TimeUnit.MILLISECONDS.sleep(retryDelay); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); // Restore interrupted state + LOG.error("Thread was interrupted during metadata fetch retry delay.", ie); + } + } + } } + // This statement will never be reached because either a valid result is returned or an + // exception is thrown. + throw new IllegalStateException( + "Unexpected error in getTopicMetadata: reached unreachable code."); } } diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/PartitionSetSubscriber.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/PartitionSetSubscriber.java index 9cd50fb20..05c412c43 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/PartitionSetSubscriber.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/PartitionSetSubscriber.java @@ -30,6 +30,7 @@ import java.util.HashSet; import java.util.Map; import java.util.Optional; +import java.util.Properties; import java.util.Set; import java.util.stream.Collectors; @@ -46,7 +47,8 @@ class PartitionSetSubscriber implements KafkaSubscriber, KafkaDatasetIdentifierP } @Override - public Set getSubscribedTopicPartitions(AdminClient adminClient) { + public Set getSubscribedTopicPartitions( + AdminClient adminClient, Properties properties) { final Set topicNames = subscribedPartitions.stream() .map(TopicPartition::topic) @@ -54,7 +56,7 @@ public Set getSubscribedTopicPartitions(AdminClient adminClient) LOG.debug("Fetching descriptions for topics: {}", topicNames); final Map topicMetadata = - getTopicMetadata(adminClient, topicNames); + getTopicMetadata(adminClient, topicNames, properties); Set existingSubscribedPartitions = new HashSet<>(); diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicListSubscriber.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicListSubscriber.java index e86ade0fa..7d8012186 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicListSubscriber.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicListSubscriber.java @@ -32,6 +32,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Properties; import java.util.Set; import static org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata; @@ -50,10 +51,11 @@ class TopicListSubscriber implements KafkaSubscriber, KafkaDatasetIdentifierProv } @Override - public Set getSubscribedTopicPartitions(AdminClient adminClient) { + public Set getSubscribedTopicPartitions( + AdminClient adminClient, Properties properties) { LOG.debug("Fetching descriptions for topics: {}", topics); final Map topicMetadata = - getTopicMetadata(adminClient, new HashSet<>(topics)); + getTopicMetadata(adminClient, new HashSet<>(topics), properties); Set subscribedPartitions = new HashSet<>(); for (TopicDescription topic : topicMetadata.values()) { diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicPatternSubscriber.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicPatternSubscriber.java index 208959e27..175e4c5ec 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicPatternSubscriber.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicPatternSubscriber.java @@ -31,6 +31,7 @@ import java.util.HashSet; import java.util.Map; import java.util.Optional; +import java.util.Properties; import java.util.Set; import java.util.regex.Pattern; @@ -47,10 +48,11 @@ class TopicPatternSubscriber implements KafkaSubscriber, KafkaDatasetIdentifierP } @Override - public Set getSubscribedTopicPartitions(AdminClient adminClient) { + public Set getSubscribedTopicPartitions( + AdminClient adminClient, Properties properties) { LOG.debug("Fetching descriptions for {} topics on Kafka cluster", topicPattern.pattern()); final Map matchedTopicMetadata = - getTopicMetadata(adminClient, topicPattern); + getTopicMetadata(adminClient, topicPattern, properties); Set subscribedTopicPartitions = new HashSet<>(); diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java index ca777bc73..443ae6a6e 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java @@ -38,6 +38,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Properties; import java.util.Set; import java.util.regex.Pattern; import java.util.stream.Stream; @@ -251,7 +252,8 @@ private KafkaSourceBuilder getBasicBuilder() { private static class ExampleCustomSubscriber implements KafkaSubscriber { @Override - public Set getSubscribedTopicPartitions(AdminClient adminClient) { + public Set getSubscribedTopicPartitions( + AdminClient adminClient, Properties properties) { return Collections.singleton(new TopicPartition("topic", 0)); } } diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceTest.java index 259668c5d..2a0936dba 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceTest.java @@ -61,7 +61,7 @@ public void testGetLineageVertexWhenSubscriberNotAnKafkaDatasetFacetProvider() { new KafkaSubscriber() { @Override public Set getSubscribedTopicPartitions( - AdminClient adminClient) { + AdminClient adminClient, Properties properties) { return null; } }) @@ -176,7 +176,8 @@ public Optional getDatasetIdentifier() { } @Override - public Set getSubscribedTopicPartitions(AdminClient adminClient) { + public Set getSubscribedTopicPartitions( + AdminClient adminClient, Properties properties) { return null; } } diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberTest.java index 4c5a50243..f763bae76 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberTest.java @@ -33,6 +33,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Properties; import java.util.Set; import java.util.regex.Pattern; @@ -46,6 +47,7 @@ public class KafkaSubscriberTest { private static final String TOPIC2 = "pattern-topic"; private static final TopicPartition NON_EXISTING_TOPIC = new TopicPartition("removed", 0); private static AdminClient adminClient; + private static Properties properties; @BeforeClass public static void setup() throws Throwable { @@ -53,6 +55,7 @@ public static void setup() throws Throwable { KafkaSourceTestEnv.createTestTopic(TOPIC1); KafkaSourceTestEnv.createTestTopic(TOPIC2); adminClient = KafkaSourceTestEnv.getAdminClient(); + properties = new Properties(); } @AfterClass @@ -67,7 +70,7 @@ public void testTopicListSubscriber() { KafkaSubscriber subscriber = KafkaSubscriber.getTopicListSubscriber(Arrays.asList(TOPIC1, TOPIC2)); final Set subscribedPartitions = - subscriber.getSubscribedTopicPartitions(adminClient); + subscriber.getSubscribedTopicPartitions(adminClient, properties); final Set expectedSubscribedPartitions = new HashSet<>(KafkaSourceTestEnv.getPartitionsForTopics(topics)); @@ -83,7 +86,7 @@ public void testNonExistingTopic() { KafkaSubscriber.getTopicListSubscriber( Collections.singletonList(NON_EXISTING_TOPIC.topic())); - assertThatThrownBy(() -> subscriber.getSubscribedTopicPartitions(adminClient)) + assertThatThrownBy(() -> subscriber.getSubscribedTopicPartitions(adminClient, properties)) .isInstanceOf(RuntimeException.class) .satisfies(anyCauseMatches(UnknownTopicOrPartitionException.class)); } @@ -93,7 +96,7 @@ public void testTopicPatternSubscriber() { Pattern pattern = Pattern.compile("pattern.*"); KafkaSubscriber subscriber = KafkaSubscriber.getTopicPatternSubscriber(pattern); final Set subscribedPartitions = - subscriber.getSubscribedTopicPartitions(adminClient); + subscriber.getSubscribedTopicPartitions(adminClient, properties); final Set expectedSubscribedPartitions = new HashSet<>( @@ -114,7 +117,7 @@ public void testPartitionSetSubscriber() { KafkaSubscriber subscriber = KafkaSubscriber.getPartitionSetSubscriber(partitions); final Set subscribedPartitions = - subscriber.getSubscribedTopicPartitions(adminClient); + subscriber.getSubscribedTopicPartitions(adminClient, properties); assertThat(subscribedPartitions).isEqualTo(partitions); assertThat(((KafkaDatasetIdentifierProvider) subscriber).getDatasetIdentifier().get()) @@ -128,7 +131,7 @@ public void testNonExistingPartition() { KafkaSubscriber.getPartitionSetSubscriber( Collections.singleton(nonExistingPartition)); - assertThatThrownBy(() -> subscriber.getSubscribedTopicPartitions(adminClient)) + assertThatThrownBy(() -> subscriber.getSubscribedTopicPartitions(adminClient, properties)) .isInstanceOf(RuntimeException.class) .hasMessage( String.format(