feat: exponential backoff for clients (KIP-580)#3099
Conversation
ac48413 to
2210250
Compare
| // | ||
| // backoffFunc := sarama.NewExponentialBackoff(config.Producer.Retry.Backoff, 2*time.Second) | ||
| // config.Producer.Retry.BackoffFunc = backoffFunc | ||
| func NewExponentialBackoff(backoff time.Duration, maxBackoff time.Duration) func(retries, maxRetries int) time.Duration { |
There was a problem hiding this comment.
I suppose, I would like to see a version that can take sarama.NewExponentialBackoff(config) so that the appropriate fields from the config will be extracted and used without me needing to dig them out.
We probably also need a config.Producer.Retry.BackoffMax.
PS: I guess we cannot actually provide a simple config option, because each of the backoffs has its own sub-struct. 🤦♀️
There was a problem hiding this comment.
That makes sense! I also considered making sarama.NewExponentialBackoff(config), but as you pointed out in the PS section, each backoff belongs to its own sub-struct, which makes a simple config-based approach impractical.
This is exactly why I made NewExponentialBackoff a standalone function—neither a method of Config nor a function that takes Config as an input parameter. This approach avoids ambiguity and ensures flexibility across different contexts (producer, metadata, transaction manager) without forcing unnecessary dependencies.
Regarding config.Producer.Retry.BackoffMax, I chose not to introduce it because it would require adding a similar field to every retry-related sub-struct (Metadata, TransactionManager), making the API more tedious to maintain. Keeping the backoff logic encapsulated in a separate function simplifies the design while still allowing explicit configuration per use case.
Let me know your thoughts!
There was a problem hiding this comment.
I don’t know that there is a significantly stronger maintenance in adding the BackoffMax. Is it a lot of “toil”? Yes, but as you note, there is already a retry struct in a lot of the other config structs. So, the choice has kind of already been made.
utils.go
Outdated
|
|
||
| // NewExponentialBackoff returns a function that implements an exponential backoff strategy with jitter. | ||
| // It follows KIP-580, implementing the formula: | ||
| // MIN(retry.backoff.max.ms, (retry.backoff.ms * 2**(retries - 1)) * random(0.8, 1.2)) |
There was a problem hiding this comment.
It’s extremely important that when copy-pasting from a standard, you copy-paste it exactly. My point: the correct formula is:
MIN(retry.backoff.max.ms, (retry.backoff.ms * 2**(failures - 1)) * random(0.8, 1.2))
That word failures rather than retries is important here, because—bad news—the func(retries, maxRetries int) time.Duration function that we return? It’s being called with a decrementing retries, and not an incrementing failures. 😭
I’m not sure exactly how we can work around this generally… since the number of starting retries is potentially quite flexible (in particular: async_producer sets it to an initial pp.highwatermark -1).
There was a problem hiding this comment.
Thanks for pointing this out! Here’s what I found:
Formula Correction
Updated the copied formula to exactly match KIP-580. Thanks for catching that!
Verification of retries Behavior in Sarama
Although pp.flushRetryBuffers flushes messages with decrementing retries, I ran tests and confirmed with additional logs that retries
- starts with 1 when
pp.backoff(msg.retries)is called for the first time, and - is incrementing,
at least in the producer case. Given this, the concern about decrementing retries shouldn’t apply in this context.
Line 1374 in 9ae475a
Flexible retries in Special Cases
The scenario where retries is more dynamic (e.g., pp.highwatermark - 1) appears to only occur for flagged (fin) messages being sent back to the input channel.
Line 732 in 9ae475a
Since these messages serve a special purpose, precise backoff duration is less critical for them. That said, if keeping a consistent backoff behavior for all cases is desirable, I’m open to discussing potential adjustments.
Given these findings, I believe our current implementation correctly follows KIP-580 in most cases. Let me know if you think any further checks are needed!
There was a problem hiding this comment.
🤦♀️ I appear to have also missed where the function is receiving the maxRetries anyways. So, even if the retries were decrementing, we would still be able to determine the correct incrementing failures.
2cf05e9 to
d8df714
Compare
puellanivis
left a comment
There was a problem hiding this comment.
Good improvements. I have some additional thoughts.
utils.go
Outdated
| backoff = max(backoff, 0) | ||
| maxBackoff = max(maxBackoff, 0) |
There was a problem hiding this comment.
As noted from how how KIP-580 says defaults should be set, this should probably be:
if backoff <= 0 {
backoff = defaultBackoff // 100 ms
}
if maxBackoff <= 0 {
maxBackoff = defaultMaxBackoff // 1000 ms
}
As a single liner this looks like:
backoff = cmp.Or(max(backoff, 0), defaultBackoff)
Where the max(backoff, 0) clamps negatives to the zero value, which cmp.Or then skips over. Unfortunately, cmp.Or was introduced in go1.22.0, so not something we can use without bumping the go.mod go version.
There was a problem hiding this comment.
Make sense. I choose the if statements.
50d5176 to
756051e
Compare
| // | ||
| // backoffFunc := sarama.NewExponentialBackoff(config.Producer.Retry.Backoff, 2*time.Second) | ||
| // config.Producer.Retry.BackoffFunc = backoffFunc | ||
| func NewExponentialBackoff(backoff time.Duration, maxBackoff time.Duration) func(retries, maxRetries int) time.Duration { |
There was a problem hiding this comment.
I don’t know that there is a significantly stronger maintenance in adding the BackoffMax. Is it a lot of “toil”? Yes, but as you note, there is already a retry struct in a lot of the other config structs. So, the choice has kind of already been made.
| } | ||
| } | ||
|
|
||
| func TestExponentialBackoffDefaults(t *testing.T) { |
6ddfd8b to
ff2dc5f
Compare
puellanivis
left a comment
There was a problem hiding this comment.
👍 I can no longer think of anything worth commenting about.
Thank you, @puellanivis . Can @dnwe review this PR? |
|
Thanks to @puellanivis for already doing a thorough review against the KIP-580 spec here, that was really helpful! I've looked over the changes myself and they seem good. Really my only qualm is that we don't immediately have a clean way to match the Java client and make exponential backoff the default behaviour in config.go I've also asked @prestona to review the changes prior to merging |
3de95be to
f9a677c
Compare
|
Hi @prestona , I did rebase with the latest master branch yesterday. Can you please review this PR? Thank you very much. |
puellanivis
left a comment
There was a problem hiding this comment.
Continues to look good from my side.
Signed-off-by: Wenli Wan <[email protected]>
f9a677c to
cdeaf9b
Compare
dnwe
left a comment
There was a problem hiding this comment.
I want to cut a release in order to ship some other fixes and features, so I'm going to go ahead and approve and merge this PR as well without waiting for further reviews. It is currently opt-in and we can always add further PRs on top in a follow-up release if further refinements are needed anyway.
Configurable Exponential Backoff for Clients
This PR introduces a configurable exponential backoff function for the producer, admin client, and transaction manager, aligning with KIP-580 specifications.
Key Changes
NewExponentialBackoff(initialBackoff, maxBackoff)as a static function, rather than a method ofConfig.Configcontains multiple backoff settings (e.g., for the admin client, transaction manager, and producer), attaching the function toConfigwould introduce ambiguity.AsyncProducer.Inspiration & References
Testing
✅ Verified in
TestAsyncProducerWithExponentialBackoffDurations, ensuring:maxBackoff.Would love feedback! 🚀