-
Notifications
You must be signed in to change notification settings - Fork 629
Topic operation retry mechanism #2861
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Topic operation retry mechanism #2861
Conversation
…is enabled to retry.
…is enabled to retry.
…is enabled to retry.
| } | ||
| // do a sanity check on the partition set | ||
| int partitionSize = CollectionUtils.isEmpty(partitions) ? 0 : partitions.size(); | ||
| int partitionSize = partitions.size(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If partittions is null that will cause an NPE, but that is ok, since that is caught by the catch block and retried. Is that the thinking you had?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When Partitions returns null, it does not retry in this method. Because the tolerateLowerPartitionsOnBroker value for consumer is true.
It is retried after 30 seconds with a schedule. However, the service has already started. This situation is then retried asynchronously. But I think it would be better to retry it here.
The retrying code blocks are below.
BindingService.class
public <T> Binding<T> doBindConsumer(T input, String inputName,
Binder<T, ConsumerProperties, ?> binder,
ConsumerProperties consumerProperties, String target) {
if (this.taskScheduler == null
|| this.bindingServiceProperties.getBindingRetryInterval() <= 0) {
return binder.bindConsumer(target,
this.bindingServiceProperties.getGroup(inputName), input,
consumerProperties);
}
else {
try {
return binder.bindConsumer(target,
this.bindingServiceProperties.getGroup(inputName), input,
consumerProperties);
}
catch (RuntimeException e) {
LateBinding<T> late = new LateBinding<T>(target,
e.getCause() == null ? e.toString() : e.getCause().getMessage(), consumerProperties, true, this.objectMapper);
rescheduleConsumerBinding(input, inputName, binder, consumerProperties,
target, late, e);
this.consumerBindings.put(inputName, Collections.singletonList(late));
return late;
}
}
}
public <T> void rescheduleConsumerBinding(final T input, final String inputName,
final Binder<T, ConsumerProperties, ?> binder,
final ConsumerProperties consumerProperties, final String target,
final LateBinding<T> late, RuntimeException exception) {
assertNotIllegalException(exception);
this.log.error("Failed to create consumer binding; retrying in "
+ this.bindingServiceProperties.getBindingRetryInterval() + " seconds",
exception);
this.scheduleTask(() -> {
try {
late.setDelegate(binder.bindConsumer(target,
this.bindingServiceProperties.getGroup(inputName), input,
consumerProperties));
}
catch (RuntimeException e) {
rescheduleConsumerBinding(input, inputName, binder, consumerProperties,
target, late, e);
}
});
}
|
Majority of the code changes in this PR are not in |
The flow that caused me to write these codes is as follows. The relevant methods are called in order. Then the getPartitionInfo method is called. But here the tolerateLowerPartitionsOnBroker value is sent as true. That's why that block of code doesn't retry. AbstractMessageChannelBinder.class -> doBindConsumer() -> createConsumerEndpoint() -> processTopic() -> getPartitionInfo() |
| // we can determine | ||
| // that the exception was due to an unknown topic on the broker, just | ||
| // simply rethrow that. | ||
| if (ex instanceof UnknownTopicOrPartitionException) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I couldn't comment on the block above. That's why I'm writing here.
// This call may return null or throw an exception.
partitions = callable.call();
This block returns null. However, if tolerateLowerPartitionsOnBroker is true in the block below, it is not retried. As I understand it, the tolerateLowerPartitionsOnBroker parameter does the following: If the actual number of partitions is less than the expected number of partitions, should this situation be tolerated?
However, when the partition comes to null, I do not have the actual number of partitions. Actually, it came back as null for another reason, so I need to retry this immediately. For this reason, I wrote a code that throws an exception if partitions are null.
|
@omercelikceng Thanks for the explanations. I am taking it locally for a spin, and if all goes well, I will merge shortly. |
@sobychacko Can you merge the first other pull request? Otherwise, 1 test throws an error in this code. Since the test threw an error, I was able to find the other error. |
|
@omercelikceng Thank you for the PR! Merged upstream. |
Hello, I have approximately 70 microservices. I also run each service at scale (scale number 3). If I start the services at the same time, I see some errors. I will put the error information below.
Then I reviewed the code. Actually, the code block that I think causes the error is the getPartitionsForTopic method in the KafkaTopicProvisioner class.
When I examine the code, it is deliberately ensured that it is not retried. But when I edit the code this way, it retries and does not throw an error. I think we need to retry here. Can you review?
Error :