Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-37366] Allow configurable retry for Kafka topic metadata fetch #155

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

suez1224
Copy link

@suez1224 suez1224 commented Feb 26, 2025

  • add topic.metadata.max.retry and topic.metadata.retry.interval.ms options to configure kafka topic metadata fetch retry.
  • implement the kafka topic metadata fetch failure retry logic in KafkaSubscriberUtils.getTopicMetadata()

Copy link

boring-cyborg bot commented Feb 26, 2025

Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)

@suez1224 suez1224 force-pushed the FLINK-37366 branch 3 times, most recently from 926456f to 308cc0d Compare February 26, 2025 08:24
Copy link
Contributor

@AHeise AHeise left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before doing a detailed review. I have a fundamental question on the approach that I inlined. PTAL

Comment on lines +68 to +99
AdminClient adminClient, Set<String> topicNames, Properties properties) {
int maxRetries =
KafkaSourceOptions.getOption(
properties,
KafkaSourceOptions.TOPIC_METADATA_REQUEST_MAX_RETRY,
Integer::parseInt);
long retryDelay =
KafkaSourceOptions.getOption(
properties,
KafkaSourceOptions.TOPIC_METADATA_REQUEST_RETRY_INTERVAL_MS,
Long::parseLong);
for (int attempt = 0; attempt <= maxRetries; attempt++) {
try {
return adminClient.describeTopics(topicNames).allTopicNames().get();
} catch (Exception e) {
if (attempt == maxRetries) {
throw new RuntimeException(
String.format("Failed to get metadata for topics %s.", topicNames), e);
} else {
LOG.warn(
"Attempt {} to get metadata for topics {} failed. Retrying in {} ms...",
attempt,
topicNames,
retryDelay);
try {
TimeUnit.MILLISECONDS.sleep(retryDelay);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt(); // Restore interrupted state
LOG.error("Thread was interrupted during metadata fetch retry delay.", ie);
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of introducing our own retry logic, can't we reuse what's already implemented in the KafkaAdmin?

    public static final String RETRIES_CONFIG = "retries";
    public static final String RETRIES_DOC = "Setting a value greater than zero will cause the client to resend any request that fails with a potentially transient error." +
        " It is recommended to set the value to either zero or `MAX_VALUE` and use corresponding timeout parameters to control how long a client should retry a request.";

    public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms";
    public static final String RETRY_BACKOFF_MS_DOC = "The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.";

You should be able to set it with even without your PR like this

  properties.retries = '10',
  properties.retry.backoff.ms = '30000',

But that also influences the consumer retry behavior.

We could think about supporting properties.admin.retry = 10. WDYT?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the suggestion, @AHeise . However, the default value for the retries config is already set to MAX_INT (see code and confluent doc here. And I believe Flink does not overwrite the config value. w/o my PR, the flink job will fail as soon as the metadata request fails. So I don't think this config control the behavior for failed metadata requests from AdminClient.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please share the failure? If it's something that the AdminClient doesn't deem to be retriable than it's obvious that we need your solution.

It actually may be worth to draft a test that fails without your fix and succeeds with it. We do have some tests that remove the kafka broker (search for stopBroker).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants