-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Re-ordering of offset commit requests can cause committed offset to move "backwards". #2940
Description
Description
Offsets committed by multiple consumer group sessions can be re-ordered by the time they arrive at Kafka, potentially resulting in duplicate message delivery, should the offset be used to resume consumption.
If a single sarama.ConsumerGroup is consuming from more than one partition, then reordering of offset commit requests can occur.
This is because of a race condition between the goroutines used to run the ConsumerClaim method for different topic partitions.
When ConsumerGroupSession.Commit() is called, this ends up calling into offsetManager.flushToBroker().
This in turn:
- Builds a offset commit request, using any so-far uncommitted offsets across all of the partitions belonging to the group
- Finds the group's coordinator (if not already known)
- Sends the offset commit request to the broker that is the group coordinator.
However, there is no locking to prevent the interleaving of two (or more) go routines between the building an offset commit request, and sending the request to Kafka.
For example, consider the following interactions between two goroutines P0 and P1 - each belonging to the same consumer group, but consuming from different partitions of a topic:
- P0 marks offset 10 and calls commit (the uncommitted offsets tracked by the session are: {P0 -> 11})
- P0 builds the offset commit request (containing {P0 -> 11}).
- P0 goroutine yields, and P1 goroutine is scheduled
- P1 marks offset 20 and calls commit (the uncommitted offsets tracked by the session are: {P0 -> 11, P1 -> 21})
- P1 goroutine yields, and P0 goroutine is scheduled
- P0 sends the offset commit ({P0 -> 11}) request, gets the response, and resumes running code in the ConsumerClaim method
- P0 marks offset 11 and calls commit (the uncommitted offsets tracked by the session are {0->12, 1->21})
- P0 builds the offset commit request (containing {0->12, 1->21}).
- P0 sends its offset commit request (containing {0->12, 1->21})
- P0 goroutine yields, and P1 goroutine is scheduled
- P1 sends its offset commit request (containing {P0 -> 11, P1 -> 21}).
From the Kafka broker's perspective, it receives:
- Offset commit request (step 6): {P0 -> 11}
- Offset commit request (step 9) {P0 -> 12, P2 -> 21}
- Offset commit request (step 10): {P0 -> 11, P2 -> 21}
This causes the offset committed for P0 to move "backwards" from 12 to 11.
Due to the nature of this race condition, it is more likely to occur if:
- A single Sarama ConsumerGroup is consuming from many partitions (at least 2 are required to trigger this bug)
- The application Frequently manual commits offsets (e.g. on each message that it consumes)
- Enough messages are present on the partitions for multiple messages to be delivered to a single call of the ConsumerClaim method
- Non-zero latency between client and broker (e.g. not both running on the same laptop).
Versions
| Sarama | Kafka | Go |
|---|---|---|
| main (d2246cc) | 3.6 and 3.8 | 1.21.10 |
Configuration
config := sarama.NewConfig()
config.Version = sarama.V3_3_0_0
config.Consumer.Return.Errors = true
config.Consumer.Offsets.Initial = sarama.OffsetOldest
config.Consumer.Offsets.AutoCommit.Enable = falseLogs
Not applicable. Sarama does not emit any logging relating to this problem.