Description
Overview
This bug affects the idempotent producer in Sarama's AsyncProducer implementation. When messages are buffered due to the highWatermark mechanism during retries, they skip sequence number assignment, which breaks the idempotency guarantee.
The Bug Scenario
Step 1: Initial State
highWatermark = 0 (normal state)
- Messages M1, M2, M3 are sent normally
Step 2: Retry Triggered
- Message M1 fails (e.g.,
ErrNotLeaderForPartition)
- M1's retry count increments:
retries: 0 → 1
highWatermark increases to 1
- A
fin message with retries=0 is sent to synchronize state
Step 3: New Message Arrives During Retry
- New message M2 arrives with
retries=0
- Since
M2.retries (0) < highWatermark (1), M2 is buffered into retryState[0].buf
- Critical Issue: M2 skips the sequence number assignment code (lines 682-690 in
partitionProducer.dispatch())
Step 4: Retry Completes
- M1 retry succeeds
- The
fin(retries=0) message returns
flushRetryBuffers() is called
Step 5: Bug Manifests
- M2 is flushed from
retryState[0].buf and sent directly to brokerProducer.input (lines 753-755 in flushRetryBuffers())
- M2 is sent WITHOUT a sequence number (
hasSequence=false, sequenceNumber=0)
- This breaks the idempotency guarantee
Root Cause Analysis
Location 1: partitionProducer.dispatch() (lines 711-714)
if pp.parent.conf.Producer.Idempotent && msg.retries == 0 && msg.flags == 0 {
msg.sequenceNumber, msg.producerEpoch = pp.parent.txnmgr.getAndIncrementSequenceNumber(msg.Topic, msg.Partition)
msg.hasSequence = true
}
- This code assigns sequence numbers to new messages
- Messages buffered due to
highWatermark bypass this code
Location 2: partitionProducer.flushRetryBuffers() (lines 753-755)
for _, msg := range pp.retryState[pp.highWatermark].buf {
pp.brokerProducer.input <- msg // Sent directly without sequence number check
}
- Buffered messages are sent directly without verifying or assigning sequence numbers
Solution
Add sequence number assignment in flushRetryBuffers() before sending buffered messages:
for _, msg := range pp.retryState[pp.highWatermark].buf {
// Assign sequence number for idempotent producers
if pp.parent.conf.Producer.Idempotent && msg.retries == 0 && msg.flags == 0 && !msg.hasSequence {
msg.sequenceNumber, msg.producerEpoch = pp.parent.txnmgr.getAndIncrementSequenceNumber(msg.Topic, msg.Partition)
msg.hasSequence = true
}
pp.brokerProducer.input <- msg
}
Reproduce
This is a corner case. Before reproducing this bug, we need to make two small changes to the source code.
1: asyncProducer.returnSuccesses() (lines 1362-1370)
Comment out msg.clear() because we need to check the sequenceNumber.
func (p *asyncProducer) returnSuccesses(batch []*ProducerMessage) {
for _, msg := range batch {
if p.conf.Producer.Return.Successes {
// msg.clear()
p.successes <- msg
}
p.inFlight.Done()
}
}
2: brokerProducer.run() (lines 926-939)
Add time.Sleep(10 * time.Millisecond) to simulate processing delay.
func (bp *brokerProducer) run() {
var output chan<- *produceSet
var timerChan <-chan time.Time
Logger.Printf("producer/broker/%d starting up\n", bp.broker.ID())
for {
select {
case msg, ok := <-bp.input:
time.Sleep(10 * time.Millisecond) // Add a sleep to simulate processing delay.
if !ok {
Logger.Printf("producer/broker/%d input chan closed\n", bp.broker.ID())
bp.shutdown()
return
}
Here is the unit test case to reproduce the bug.
package sarama
import (
"fmt"
"sync"
"testing"
"time"
)
// TestIdempotentProducerSequenceNumberForBufferedMessages tests sequence number assignment
// for idempotent producers under the highWatermark buffering mechanism.
//
// Problem scenario:
// 1. Message M1 fails to send, triggering retry, highWatermark changes from 0 to 1
// 2. New message M2 (retries=0) arrives, gets buffered to retryState[0].buf because retries < highWatermark
// 3. M2 skips sequence number assignment logic when buffered (dispatch method lines 711-714)
// 4. M1 retry succeeds, flushRetryBuffers() is called
// 5. M2 is taken from retryState[0].buf and sent directly (lines 753-755)
// 6. M2 has no sequence number, breaking idempotency guarantee
func TestIdempotentProducerSequenceNumberForBufferedMessages(t *testing.T) {
t.Log("=== Test Description ===")
t.Log("This test verifies that idempotent producers correctly assign sequence numbers")
t.Log("under the highWatermark buffering mechanism by checking ProduceRequests received by broker")
t.Log("")
broker := NewMockBroker(t, 1)
defer broker.Close()
// Prepare metadata response
metadataResponse := &MetadataResponse{
Version: 4,
ControllerID: 1,
}
metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
metadataResponse.AddTopicPartition("test-topic", 0, broker.BrokerID(), nil, nil, nil, ErrNoError)
broker.Returns(metadataResponse)
// InitProducerID response
initProducerID := &InitProducerIDResponse{
ProducerID: 1000,
ProducerEpoch: 1,
}
broker.Returns(initProducerID)
// Configure producer
config := NewTestConfig()
config.Producer.Idempotent = true
config.Producer.RequiredAcks = WaitForAll
config.Producer.Retry.Max = 3
config.Producer.Retry.Backoff = 200 * time.Millisecond
config.Producer.Return.Successes = true
config.Producer.Return.Errors = false
config.Producer.Flush.Messages = 1
config.Net.MaxOpenRequests = 1
config.Version = V0_11_0_0
producer, err := NewAsyncProducer([]string{broker.Addr()}, config)
if err != nil {
t.Fatal(err)
}
// Track requests received by broker
var requestsMu sync.Mutex
var produceRequests []*ProduceRequest
// Set custom handler to capture requests
handlerCount := 0
broker.setHandler(func(req *request) (res encoderWithHeader) {
requestsMu.Lock()
handlerCount++
count := handlerCount
requestsMu.Unlock()
switch req.body.key() {
case 3: // MetadataRequest
return metadataResponse
case 22: // InitProducerIDRequest
return initProducerID
case 0: // ProduceRequest
preq := req.body.(*ProduceRequest)
// Save request for later verification
requestsMu.Lock()
produceRequests = append(produceRequests, preq)
reqNum := len(produceRequests)
requestsMu.Unlock()
// Check sequence number in RecordBatch
for topic, partitions := range preq.records {
for partition, records := range partitions {
if records.RecordBatch != nil {
batch := records.RecordBatch
t.Logf("ProduceRequest #%d: topic=%s, partition=%d, FirstSequence=%d",
reqNum, topic, partition, batch.FirstSequence)
// Check if sequence number is present
// FirstSequence should be >= 0 to indicate presence
if batch.FirstSequence < 0 {
t.Errorf("BUG DETECTED: ProduceRequest #%d has no sequence number! FirstSequence=%d",
reqNum, batch.FirstSequence)
}
}
}
}
// First request returns error to trigger retry
if count == 1 {
t.Log(" -> Returning ErrNotLeaderForPartition (trigger retry)")
produceError := &ProduceResponse{Version: 3}
produceError.AddTopicPartition("test-topic", 0, ErrNotLeaderForPartition)
return produceError
}
// Subsequent requests return success
t.Logf(" -> Returning success")
produceSuccess := &ProduceResponse{Version: 3}
produceSuccess.AddTopicPartition("test-topic", 0, ErrNoError)
return produceSuccess
}
return nil
})
go func() {
// Send first message
t.Log("\nStep 1: Send first message M1")
producer.Input() <- &ProducerMessage{
Topic: "test-topic",
Key: StringEncoder("M1"),
Value: StringEncoder("value1"),
}
// Send 999 more messages in a loop
for i := 2; i <= 1000; i++ {
producer.Input() <- &ProducerMessage{
Topic: "test-topic",
Key: StringEncoder(fmt.Sprintf("M%d", i)),
Value: StringEncoder(fmt.Sprintf("value%d", i)),
}
time.Sleep(1 * time.Millisecond)
}
}()
// Wait for messages to complete
t.Log("\nStep 3: Wait for messages to succeed")
for i := 0; i < 1000; i++ {
select {
case msg := <-producer.Successes():
key := string(msg.Key.(StringEncoder))
t.Logf("Received success message: key=%s", key)
if !msg.hasSequence {
t.Fatalf("BUG DETECTED: Message %s has no sequence number! hasSequence=%t", key, msg.hasSequence)
}
case <-time.After(5 * time.Second):
t.Fatalf("Timeout: received only %d success messages, expected 1000", i)
}
}
closeProducer(t, producer)
}
Versions
| Sarama |
Kafka |
Go |
| v1.46.2 |
mockKafka |
1.24 |
Configuration
See unit test case.
Logs
logs: CLICK ME
[DEBUG] 16:04:14.851203 producer/broker/1 detected epoch rollover, waiting for new buffer
async_producer_sequence_test.go:152: ✓ 收到成功消息: key=M37
async_producer_sequence_test.go:154: ❌ BUG检测: 消息 M37 没有序列号!hasSequence=false
Additional Context
Description
Overview
This bug affects the idempotent producer in Sarama's AsyncProducer implementation. When messages are buffered due to the highWatermark mechanism during retries, they skip sequence number assignment, which breaks the idempotency guarantee.
The Bug Scenario
Step 1: Initial State
highWatermark = 0(normal state)Step 2: Retry Triggered
ErrNotLeaderForPartition)retries: 0 → 1highWatermarkincreases to1finmessage withretries=0is sent to synchronize stateStep 3: New Message Arrives During Retry
retries=0M2.retries (0) < highWatermark (1), M2 is buffered intoretryState[0].bufpartitionProducer.dispatch())Step 4: Retry Completes
fin(retries=0)message returnsflushRetryBuffers()is calledStep 5: Bug Manifests
retryState[0].bufand sent directly tobrokerProducer.input(lines 753-755 influshRetryBuffers())hasSequence=false,sequenceNumber=0)Root Cause Analysis
Location 1:
partitionProducer.dispatch()(lines 711-714)highWatermarkbypass this codeLocation 2:
partitionProducer.flushRetryBuffers()(lines 753-755)Solution
Add sequence number assignment in
flushRetryBuffers()before sending buffered messages:Reproduce
This is a corner case. Before reproducing this bug, we need to make two small changes to the source code.
1:
asyncProducer.returnSuccesses()(lines 1362-1370)Comment out
msg.clear()because we need to check thesequenceNumber.2:
brokerProducer.run()(lines 926-939)Add
time.Sleep(10 * time.Millisecond)to simulate processing delay.Here is the unit test case to reproduce the bug.
Versions
Configuration
See unit test case.
Logs
logs: CLICK ME
Additional Context