Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-37366] Allow configurable retry for Kafka topic metadata fetch #155

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,20 @@ public class KafkaSourceOptions {
.defaultValue(true)
.withDescription("Whether to commit consuming offset on checkpoint.");

public static final ConfigOption<Integer> TOPIC_METADATA_REQUEST_MAX_RETRY =
ConfigOptions.key("topic.metadata.max.retry")
.intType()
.defaultValue(0)
.withDescription(
"Max number of retries for topic metadata request before failing the source.");

public static final ConfigOption<Long> TOPIC_METADATA_REQUEST_RETRY_INTERVAL_MS =
ConfigOptions.key("topic.metadata.retry.interval.ms")
.longType()
.defaultValue(Duration.ofSeconds(30).toMillis())
.withDescription(
"The interval in milliseconds between retries for topic metadata request.");

@SuppressWarnings("unchecked")
public static <T> T getOption(
Properties props, ConfigOption<?> configOption, Function<String, T> parser) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ public void close() {
* @return Set of subscribed {@link TopicPartition}s
*/
private Set<TopicPartition> getSubscribedTopicPartitions() {
return subscriber.getSubscribedTopicPartitions(adminClient);
return subscriber.getSubscribedTopicPartitions(adminClient, properties);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import java.io.Serializable;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.regex.Pattern;

Expand All @@ -51,9 +52,11 @@ public interface KafkaSubscriber extends Serializable {
* Get a set of subscribed {@link TopicPartition}s.
*
* @param adminClient The admin client used to retrieve subscribed topic partitions.
* @param properties The properties for the configuration.
* @return A set of subscribed {@link TopicPartition}s
*/
Set<TopicPartition> getSubscribedTopicPartitions(AdminClient adminClient);
Set<TopicPartition> getSubscribedTopicPartitions(
AdminClient adminClient, Properties properties);

// ----------------- factory methods --------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,37 +18,45 @@

package org.apache.flink.connector.kafka.source.enumerator.subscriber;

import org.apache.flink.connector.kafka.source.KafkaSourceOptions;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.TopicDescription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

/** The base implementations of {@link KafkaSubscriber}. */
class KafkaSubscriberUtils {
private static final Logger LOG = LoggerFactory.getLogger(KafkaSubscriberUtils.class);

private KafkaSubscriberUtils() {}

static Map<String, TopicDescription> getAllTopicMetadata(AdminClient adminClient) {
static Map<String, TopicDescription> getAllTopicMetadata(
AdminClient adminClient, Properties properties) {
try {
Set<String> allTopicNames = adminClient.listTopics().names().get();
return getTopicMetadata(adminClient, allTopicNames);
return getTopicMetadata(adminClient, allTopicNames, properties);
} catch (Exception e) {
throw new RuntimeException("Failed to get metadata for all topics.", e);
}
}

static Map<String, TopicDescription> getTopicMetadata(
AdminClient adminClient, Pattern topicPattern) {
AdminClient adminClient, Pattern topicPattern, Properties properties) {
try {
Set<String> allTopicNames = adminClient.listTopics().names().get();
Set<String> matchedTopicNames =
allTopicNames.stream()
.filter(name -> topicPattern.matcher(name).matches())
.collect(Collectors.toSet());
return getTopicMetadata(adminClient, matchedTopicNames);
return getTopicMetadata(adminClient, matchedTopicNames, properties);
} catch (Exception e) {
throw new RuntimeException(
String.format("Failed to get metadata for %s topics.", topicPattern.pattern()),
Expand All @@ -57,12 +65,42 @@ static Map<String, TopicDescription> getTopicMetadata(
}

static Map<String, TopicDescription> getTopicMetadata(
AdminClient adminClient, Set<String> topicNames) {
try {
return adminClient.describeTopics(topicNames).allTopicNames().get();
} catch (Exception e) {
throw new RuntimeException(
String.format("Failed to get metadata for topics %s.", topicNames), e);
AdminClient adminClient, Set<String> topicNames, Properties properties) {
int maxRetries =
KafkaSourceOptions.getOption(
properties,
KafkaSourceOptions.TOPIC_METADATA_REQUEST_MAX_RETRY,
Integer::parseInt);
long retryDelay =
KafkaSourceOptions.getOption(
properties,
KafkaSourceOptions.TOPIC_METADATA_REQUEST_RETRY_INTERVAL_MS,
Long::parseLong);
for (int attempt = 0; attempt <= maxRetries; attempt++) {
try {
return adminClient.describeTopics(topicNames).allTopicNames().get();
} catch (Exception e) {
if (attempt == maxRetries) {
throw new RuntimeException(
String.format("Failed to get metadata for topics %s.", topicNames), e);
} else {
LOG.warn(
"Attempt {} to get metadata for topics {} failed. Retrying in {} ms...",
attempt,
topicNames,
retryDelay);
try {
TimeUnit.MILLISECONDS.sleep(retryDelay);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt(); // Restore interrupted state
LOG.error("Thread was interrupted during metadata fetch retry delay.", ie);
}
}
}
Comment on lines +68 to +99
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of introducing our own retry logic, can't we reuse what's already implemented in the KafkaAdmin?

    public static final String RETRIES_CONFIG = "retries";
    public static final String RETRIES_DOC = "Setting a value greater than zero will cause the client to resend any request that fails with a potentially transient error." +
        " It is recommended to set the value to either zero or `MAX_VALUE` and use corresponding timeout parameters to control how long a client should retry a request.";

    public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms";
    public static final String RETRY_BACKOFF_MS_DOC = "The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.";

You should be able to set it with even without your PR like this

  properties.retries = '10',
  properties.retry.backoff.ms = '30000',

But that also influences the consumer retry behavior.

We could think about supporting properties.admin.retry = 10. WDYT?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the suggestion, @AHeise . However, the default value for the retries config is already set to MAX_INT (see code and confluent doc here. And I believe Flink does not overwrite the config value. w/o my PR, the flink job will fail as soon as the metadata request fails. So I don't think this config control the behavior for failed metadata requests from AdminClient.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please share the failure? If it's something that the AdminClient doesn't deem to be retriable than it's obvious that we need your solution.

It actually may be worth to draft a test that fails without your fix and succeeds with it. We do have some tests that remove the kafka broker (search for stopBroker).

}
// This statement will never be reached because either a valid result is returned or an
// exception is thrown.
throw new IllegalStateException(
"Unexpected error in getTopicMetadata: reached unreachable code.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;

Expand All @@ -46,15 +47,16 @@ class PartitionSetSubscriber implements KafkaSubscriber, KafkaDatasetIdentifierP
}

@Override
public Set<TopicPartition> getSubscribedTopicPartitions(AdminClient adminClient) {
public Set<TopicPartition> getSubscribedTopicPartitions(
AdminClient adminClient, Properties properties) {
final Set<String> topicNames =
subscribedPartitions.stream()
.map(TopicPartition::topic)
.collect(Collectors.toSet());

LOG.debug("Fetching descriptions for topics: {}", topicNames);
final Map<String, TopicDescription> topicMetadata =
getTopicMetadata(adminClient, topicNames);
getTopicMetadata(adminClient, topicNames, properties);

Set<TopicPartition> existingSubscribedPartitions = new HashSet<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;

import static org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata;
Expand All @@ -50,10 +51,11 @@ class TopicListSubscriber implements KafkaSubscriber, KafkaDatasetIdentifierProv
}

@Override
public Set<TopicPartition> getSubscribedTopicPartitions(AdminClient adminClient) {
public Set<TopicPartition> getSubscribedTopicPartitions(
AdminClient adminClient, Properties properties) {
LOG.debug("Fetching descriptions for topics: {}", topics);
final Map<String, TopicDescription> topicMetadata =
getTopicMetadata(adminClient, new HashSet<>(topics));
getTopicMetadata(adminClient, new HashSet<>(topics), properties);

Set<TopicPartition> subscribedPartitions = new HashSet<>();
for (TopicDescription topic : topicMetadata.values()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.regex.Pattern;

Expand All @@ -47,10 +48,11 @@ class TopicPatternSubscriber implements KafkaSubscriber, KafkaDatasetIdentifierP
}

@Override
public Set<TopicPartition> getSubscribedTopicPartitions(AdminClient adminClient) {
public Set<TopicPartition> getSubscribedTopicPartitions(
AdminClient adminClient, Properties properties) {
LOG.debug("Fetching descriptions for {} topics on Kafka cluster", topicPattern.pattern());
final Map<String, TopicDescription> matchedTopicMetadata =
getTopicMetadata(adminClient, topicPattern);
getTopicMetadata(adminClient, topicPattern, properties);

Set<TopicPartition> subscribedTopicPartitions = new HashSet<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Stream;
Expand Down Expand Up @@ -251,7 +252,8 @@ private KafkaSourceBuilder<String> getBasicBuilder() {
private static class ExampleCustomSubscriber implements KafkaSubscriber {

@Override
public Set<TopicPartition> getSubscribedTopicPartitions(AdminClient adminClient) {
public Set<TopicPartition> getSubscribedTopicPartitions(
AdminClient adminClient, Properties properties) {
return Collections.singleton(new TopicPartition("topic", 0));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void testGetLineageVertexWhenSubscriberNotAnKafkaDatasetFacetProvider() {
new KafkaSubscriber() {
@Override
public Set<TopicPartition> getSubscribedTopicPartitions(
AdminClient adminClient) {
AdminClient adminClient, Properties properties) {
return null;
}
})
Expand Down Expand Up @@ -176,7 +176,8 @@ public Optional<DefaultKafkaDatasetIdentifier> getDatasetIdentifier() {
}

@Override
public Set<TopicPartition> getSubscribedTopicPartitions(AdminClient adminClient) {
public Set<TopicPartition> getSubscribedTopicPartitions(
AdminClient adminClient, Properties properties) {
return null;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.regex.Pattern;

Expand All @@ -46,13 +47,15 @@ public class KafkaSubscriberTest {
private static final String TOPIC2 = "pattern-topic";
private static final TopicPartition NON_EXISTING_TOPIC = new TopicPartition("removed", 0);
private static AdminClient adminClient;
private static Properties properties;

@BeforeClass
public static void setup() throws Throwable {
KafkaSourceTestEnv.setup();
KafkaSourceTestEnv.createTestTopic(TOPIC1);
KafkaSourceTestEnv.createTestTopic(TOPIC2);
adminClient = KafkaSourceTestEnv.getAdminClient();
properties = new Properties();
}

@AfterClass
Expand All @@ -67,7 +70,7 @@ public void testTopicListSubscriber() {
KafkaSubscriber subscriber =
KafkaSubscriber.getTopicListSubscriber(Arrays.asList(TOPIC1, TOPIC2));
final Set<TopicPartition> subscribedPartitions =
subscriber.getSubscribedTopicPartitions(adminClient);
subscriber.getSubscribedTopicPartitions(adminClient, properties);

final Set<TopicPartition> expectedSubscribedPartitions =
new HashSet<>(KafkaSourceTestEnv.getPartitionsForTopics(topics));
Expand All @@ -83,7 +86,7 @@ public void testNonExistingTopic() {
KafkaSubscriber.getTopicListSubscriber(
Collections.singletonList(NON_EXISTING_TOPIC.topic()));

assertThatThrownBy(() -> subscriber.getSubscribedTopicPartitions(adminClient))
assertThatThrownBy(() -> subscriber.getSubscribedTopicPartitions(adminClient, properties))
.isInstanceOf(RuntimeException.class)
.satisfies(anyCauseMatches(UnknownTopicOrPartitionException.class));
}
Expand All @@ -93,7 +96,7 @@ public void testTopicPatternSubscriber() {
Pattern pattern = Pattern.compile("pattern.*");
KafkaSubscriber subscriber = KafkaSubscriber.getTopicPatternSubscriber(pattern);
final Set<TopicPartition> subscribedPartitions =
subscriber.getSubscribedTopicPartitions(adminClient);
subscriber.getSubscribedTopicPartitions(adminClient, properties);

final Set<TopicPartition> expectedSubscribedPartitions =
new HashSet<>(
Expand All @@ -114,7 +117,7 @@ public void testPartitionSetSubscriber() {
KafkaSubscriber subscriber = KafkaSubscriber.getPartitionSetSubscriber(partitions);

final Set<TopicPartition> subscribedPartitions =
subscriber.getSubscribedTopicPartitions(adminClient);
subscriber.getSubscribedTopicPartitions(adminClient, properties);

assertThat(subscribedPartitions).isEqualTo(partitions);
assertThat(((KafkaDatasetIdentifierProvider) subscriber).getDatasetIdentifier().get())
Expand All @@ -128,7 +131,7 @@ public void testNonExistingPartition() {
KafkaSubscriber.getPartitionSetSubscriber(
Collections.singleton(nonExistingPartition));

assertThatThrownBy(() -> subscriber.getSubscribedTopicPartitions(adminClient))
assertThatThrownBy(() -> subscriber.getSubscribedTopicPartitions(adminClient, properties))
.isInstanceOf(RuntimeException.class)
.hasMessage(
String.format(
Expand Down
Loading