Skip to content

rd_kafka_offsets_store does not store and send offset commit metadata #3927

@mathispesch

Description

@mathispesch

Description

Offset commit metadata can be used to send additional data to the broker when making an offset commit. See this doc from the Java consumer.

Librdkafka currently sends the offset commit metadata when the commit is requested by calling rd_kafka_commit_queue but not when calling rd_kafka_offsets_store to use the auto-commit.

How to reproduce

  1. Using confluent-kafka-go, which supports offset commit metadata, store an offset with metadata to be auto-committed:
package main

import (
	"fmt"

	"github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {
	consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
		"bootstrap.servers":        "localhost:9092",
		"group.id":                 "test",
		"enable.auto.offset.store": false,
		"enable.auto.commit":       true,
	})
	if err != nil {
		panic(err)
	}
	_, version := kafka.LibraryVersion()
	fmt.Printf("Started consumer version:%s\n", version)

	err = consumer.Subscribe("test", func(consumer *kafka.Consumer, event kafka.Event) error {
		fmt.Printf("%v\n", event.String())
		return nil
	})
	if err != nil {
		panic(err)
	}

	for {
		event := consumer.Poll(1000)
		if event == nil {
			fmt.Println("No event")
			continue
		}
		switch e := event.(type) {
		case kafka.Error:
			fmt.Printf("%s\n", e.Error())
		case *kafka.Message:
			fmt.Printf("%s: %s\n", e.TopicPartition, e.Value)
			metadata := "test metadata"
			_, err = consumer.StoreOffsets([]kafka.TopicPartition{{
				Topic:     e.TopicPartition.Topic,
				Partition: e.TopicPartition.Partition,
				Offset:    e.TopicPartition.Offset + 1,
				Metadata:  &metadata,
			}})
			if err != nil {
				panic(err)
			}
		}
		break
	}
	err = consumer.Close()
	if err != nil {
		panic(err)
	}
}
Started consumer version:1.9.1
AssignedPartitions: [test[0]@unset]
No event
test[0]@1: hello
RevokedPartitions: [test[0]@unset]
  1. Using kafka-python, print the offsets for that consumer group:
from kafka import KafkaAdminClient

config = {"bootstrap_servers": "localhost:9092"}
client = KafkaAdminClient(**config)
print(client.list_consumer_group_offsets("test"))
{TopicPartition(topic='test', partition=0): OffsetAndMetadata(offset=1, metadata='')}
  1. Now replace StoreOffsets with CommitOffsets in the above go script to make the commit synchronously and run the same python script:
Started consumer version:1.9.1
AssignedPartitions: [test[0]@unset]
No event
test[0]@2: hello
RevokedPartitions: [test[0]@unset]
{TopicPartition(topic='test', partition=0): OffsetAndMetadata(offset=2, metadata='test metadata')}

Investigation

rd_kafka_offset_store should pass the metadata to rd_kafka_offset_store0 so that it can store the last metadata sent in the topic partition toppar.

It can then be recovered when re-creating the offsets list from stored data here.

master...mathispesch:librdkafka:store-offsets-metadata is a proposed patch, that I can continue working on to try and factorise it and add tests.

Checklist

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions