Skip to content

Missing Sequence Number Assignment for Buffered Messages in Idempotent Producer #3354

@gangcheng1030

Description

@gangcheng1030
Description

Overview

This bug affects the idempotent producer in Sarama's AsyncProducer implementation. When messages are buffered due to the highWatermark mechanism during retries, they skip sequence number assignment, which breaks the idempotency guarantee.

The Bug Scenario

Step 1: Initial State

  • highWatermark = 0 (normal state)
  • Messages M1, M2, M3 are sent normally

Step 2: Retry Triggered

  • Message M1 fails (e.g., ErrNotLeaderForPartition)
  • M1's retry count increments: retries: 0 → 1
  • highWatermark increases to 1
  • A fin message with retries=0 is sent to synchronize state

Step 3: New Message Arrives During Retry

  • New message M2 arrives with retries=0
  • Since M2.retries (0) < highWatermark (1), M2 is buffered into retryState[0].buf
  • Critical Issue: M2 skips the sequence number assignment code (lines 682-690 in partitionProducer.dispatch())

Step 4: Retry Completes

  • M1 retry succeeds
  • The fin(retries=0) message returns
  • flushRetryBuffers() is called

Step 5: Bug Manifests

  • M2 is flushed from retryState[0].buf and sent directly to brokerProducer.input (lines 753-755 in flushRetryBuffers())
  • M2 is sent WITHOUT a sequence number (hasSequence=false, sequenceNumber=0)
  • This breaks the idempotency guarantee

Root Cause Analysis

Location 1: partitionProducer.dispatch() (lines 711-714)

if pp.parent.conf.Producer.Idempotent && msg.retries == 0 && msg.flags == 0 {
    msg.sequenceNumber, msg.producerEpoch = pp.parent.txnmgr.getAndIncrementSequenceNumber(msg.Topic, msg.Partition)
    msg.hasSequence = true
}
  • This code assigns sequence numbers to new messages
  • Messages buffered due to highWatermark bypass this code

Location 2: partitionProducer.flushRetryBuffers() (lines 753-755)

for _, msg := range pp.retryState[pp.highWatermark].buf {
    pp.brokerProducer.input <- msg  // Sent directly without sequence number check
}
  • Buffered messages are sent directly without verifying or assigning sequence numbers

Solution

Add sequence number assignment in flushRetryBuffers() before sending buffered messages:

for _, msg := range pp.retryState[pp.highWatermark].buf {
    // Assign sequence number for idempotent producers
    if pp.parent.conf.Producer.Idempotent && msg.retries == 0 && msg.flags == 0 && !msg.hasSequence {
        msg.sequenceNumber, msg.producerEpoch = pp.parent.txnmgr.getAndIncrementSequenceNumber(msg.Topic, msg.Partition)
        msg.hasSequence = true
    }
    pp.brokerProducer.input <- msg
}

Reproduce

This is a corner case. Before reproducing this bug, we need to make two small changes to the source code.

1: asyncProducer.returnSuccesses() (lines 1362-1370)

Comment out msg.clear() because we need to check the sequenceNumber.

func (p *asyncProducer) returnSuccesses(batch []*ProducerMessage) {
	for _, msg := range batch {
		if p.conf.Producer.Return.Successes {
			// msg.clear()
			p.successes <- msg
		}
		p.inFlight.Done()
	}
}

2: brokerProducer.run() (lines 926-939)

Add time.Sleep(10 * time.Millisecond) to simulate processing delay.

func (bp *brokerProducer) run() {
	var output chan<- *produceSet
	var timerChan <-chan time.Time
	Logger.Printf("producer/broker/%d starting up\n", bp.broker.ID())

	for {
		select {
		case msg, ok := <-bp.input:
			time.Sleep(10 * time.Millisecond) // Add a sleep to simulate processing delay.
			if !ok {
				Logger.Printf("producer/broker/%d input chan closed\n", bp.broker.ID())
				bp.shutdown()
				return
			}

Here is the unit test case to reproduce the bug.

package sarama

import (
	"fmt"
	"sync"
	"testing"
	"time"
)

// TestIdempotentProducerSequenceNumberForBufferedMessages tests sequence number assignment
// for idempotent producers under the highWatermark buffering mechanism.
//
// Problem scenario:
// 1. Message M1 fails to send, triggering retry, highWatermark changes from 0 to 1
// 2. New message M2 (retries=0) arrives, gets buffered to retryState[0].buf because retries < highWatermark
// 3. M2 skips sequence number assignment logic when buffered (dispatch method lines 711-714)
// 4. M1 retry succeeds, flushRetryBuffers() is called
// 5. M2 is taken from retryState[0].buf and sent directly (lines 753-755)
// 6. M2 has no sequence number, breaking idempotency guarantee
func TestIdempotentProducerSequenceNumberForBufferedMessages(t *testing.T) {
	t.Log("=== Test Description ===")
	t.Log("This test verifies that idempotent producers correctly assign sequence numbers")
	t.Log("under the highWatermark buffering mechanism by checking ProduceRequests received by broker")
	t.Log("")

	broker := NewMockBroker(t, 1)
	defer broker.Close()

	// Prepare metadata response
	metadataResponse := &MetadataResponse{
		Version:      4,
		ControllerID: 1,
	}
	metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
	metadataResponse.AddTopicPartition("test-topic", 0, broker.BrokerID(), nil, nil, nil, ErrNoError)
	broker.Returns(metadataResponse)

	// InitProducerID response
	initProducerID := &InitProducerIDResponse{
		ProducerID:    1000,
		ProducerEpoch: 1,
	}
	broker.Returns(initProducerID)

	// Configure producer
	config := NewTestConfig()
	config.Producer.Idempotent = true
	config.Producer.RequiredAcks = WaitForAll
	config.Producer.Retry.Max = 3
	config.Producer.Retry.Backoff = 200 * time.Millisecond
	config.Producer.Return.Successes = true
	config.Producer.Return.Errors = false
	config.Producer.Flush.Messages = 1
	config.Net.MaxOpenRequests = 1
	config.Version = V0_11_0_0

	producer, err := NewAsyncProducer([]string{broker.Addr()}, config)
	if err != nil {
		t.Fatal(err)
	}

	// Track requests received by broker
	var requestsMu sync.Mutex
	var produceRequests []*ProduceRequest

	// Set custom handler to capture requests
	handlerCount := 0
	broker.setHandler(func(req *request) (res encoderWithHeader) {
		requestsMu.Lock()
		handlerCount++
		count := handlerCount
		requestsMu.Unlock()

		switch req.body.key() {
		case 3: // MetadataRequest
			return metadataResponse
		case 22: // InitProducerIDRequest
			return initProducerID
		case 0: // ProduceRequest
			preq := req.body.(*ProduceRequest)

			// Save request for later verification
			requestsMu.Lock()
			produceRequests = append(produceRequests, preq)
			reqNum := len(produceRequests)
			requestsMu.Unlock()

			// Check sequence number in RecordBatch
			for topic, partitions := range preq.records {
				for partition, records := range partitions {
					if records.RecordBatch != nil {
						batch := records.RecordBatch
						t.Logf("ProduceRequest #%d: topic=%s, partition=%d, FirstSequence=%d",
							reqNum, topic, partition, batch.FirstSequence)

						// Check if sequence number is present
						// FirstSequence should be >= 0 to indicate presence
						if batch.FirstSequence < 0 {
							t.Errorf("BUG DETECTED: ProduceRequest #%d has no sequence number! FirstSequence=%d",
								reqNum, batch.FirstSequence)
						}
					}
				}
			}

			// First request returns error to trigger retry
			if count == 1 {
				t.Log("  -> Returning ErrNotLeaderForPartition (trigger retry)")
				produceError := &ProduceResponse{Version: 3}
				produceError.AddTopicPartition("test-topic", 0, ErrNotLeaderForPartition)
				return produceError
			}

			// Subsequent requests return success
			t.Logf("  -> Returning success")
			produceSuccess := &ProduceResponse{Version: 3}
			produceSuccess.AddTopicPartition("test-topic", 0, ErrNoError)
			return produceSuccess
		}
		return nil
	})

	go func() {
		// Send first message
		t.Log("\nStep 1: Send first message M1")
		producer.Input() <- &ProducerMessage{
			Topic: "test-topic",
			Key:   StringEncoder("M1"),
			Value: StringEncoder("value1"),
		}

		// Send 999 more messages in a loop
		for i := 2; i <= 1000; i++ {
			producer.Input() <- &ProducerMessage{
				Topic: "test-topic",
				Key:   StringEncoder(fmt.Sprintf("M%d", i)),
				Value: StringEncoder(fmt.Sprintf("value%d", i)),
			}
			time.Sleep(1 * time.Millisecond)
		}
	}()

	// Wait for messages to complete
	t.Log("\nStep 3: Wait for messages to succeed")
	for i := 0; i < 1000; i++ {
		select {
		case msg := <-producer.Successes():
			key := string(msg.Key.(StringEncoder))
			t.Logf("Received success message: key=%s", key)
			if !msg.hasSequence {
				t.Fatalf("BUG DETECTED: Message %s has no sequence number! hasSequence=%t", key, msg.hasSequence)
			}
		case <-time.After(5 * time.Second):
			t.Fatalf("Timeout: received only %d success messages, expected 1000", i)
		}
	}

	closeProducer(t, producer)
}
Versions
Sarama Kafka Go
v1.46.2 mockKafka 1.24
Configuration

See unit test case.

Logs
logs: CLICK ME

[DEBUG] 16:04:14.851203 producer/broker/1 detected epoch rollover, waiting for new buffer
    async_producer_sequence_test.go:152: ✓ 收到成功消息: key=M37
    async_producer_sequence_test.go:154: ❌ BUG检测: 消息 M37 没有序列号!hasSequence=false

Additional Context

Metadata

Metadata

Assignees

No one assigned

    Labels

    needs-investigationIssues that require followup from maintainers

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions