Description
Offset commit metadata can be used to send additional data to the broker when making an offset commit. See this doc from the Java consumer.
Librdkafka currently sends the offset commit metadata when the commit is requested by calling rd_kafka_commit_queue but not when calling rd_kafka_offsets_store to use the auto-commit.
How to reproduce
- Using
confluent-kafka-go, which supports offset commit metadata, store an offset with metadata to be auto-committed:
package main
import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"group.id": "test",
"enable.auto.offset.store": false,
"enable.auto.commit": true,
})
if err != nil {
panic(err)
}
_, version := kafka.LibraryVersion()
fmt.Printf("Started consumer version:%s\n", version)
err = consumer.Subscribe("test", func(consumer *kafka.Consumer, event kafka.Event) error {
fmt.Printf("%v\n", event.String())
return nil
})
if err != nil {
panic(err)
}
for {
event := consumer.Poll(1000)
if event == nil {
fmt.Println("No event")
continue
}
switch e := event.(type) {
case kafka.Error:
fmt.Printf("%s\n", e.Error())
case *kafka.Message:
fmt.Printf("%s: %s\n", e.TopicPartition, e.Value)
metadata := "test metadata"
_, err = consumer.StoreOffsets([]kafka.TopicPartition{{
Topic: e.TopicPartition.Topic,
Partition: e.TopicPartition.Partition,
Offset: e.TopicPartition.Offset + 1,
Metadata: &metadata,
}})
if err != nil {
panic(err)
}
}
break
}
err = consumer.Close()
if err != nil {
panic(err)
}
}
Started consumer version:1.9.1
AssignedPartitions: [test[0]@unset]
No event
test[0]@1: hello
RevokedPartitions: [test[0]@unset]
- Using
kafka-python, print the offsets for that consumer group:
from kafka import KafkaAdminClient
config = {"bootstrap_servers": "localhost:9092"}
client = KafkaAdminClient(**config)
print(client.list_consumer_group_offsets("test"))
{TopicPartition(topic='test', partition=0): OffsetAndMetadata(offset=1, metadata='')}
- Now replace
StoreOffsets with CommitOffsets in the above go script to make the commit synchronously and run the same python script:
Started consumer version:1.9.1
AssignedPartitions: [test[0]@unset]
No event
test[0]@2: hello
RevokedPartitions: [test[0]@unset]
{TopicPartition(topic='test', partition=0): OffsetAndMetadata(offset=2, metadata='test metadata')}
Investigation
rd_kafka_offset_store should pass the metadata to rd_kafka_offset_store0 so that it can store the last metadata sent in the topic partition toppar.
It can then be recovered when re-creating the offsets list from stored data here.
master...mathispesch:librdkafka:store-offsets-metadata is a proposed patch, that I can continue working on to try and factorise it and add tests.
Checklist
Description
Offset commit metadata can be used to send additional data to the broker when making an offset commit. See this doc from the Java consumer.
Librdkafka currently sends the offset commit metadata when the commit is requested by calling
rd_kafka_commit_queuebut not when callingrd_kafka_offsets_storeto use the auto-commit.How to reproduce
confluent-kafka-go, which supports offset commit metadata, store an offset with metadata to be auto-committed:kafka-python, print the offsets for that consumer group:{TopicPartition(topic='test', partition=0): OffsetAndMetadata(offset=1, metadata='')}StoreOffsetswithCommitOffsetsin the above go script to make the commit synchronously and run the same python script:{TopicPartition(topic='test', partition=0): OffsetAndMetadata(offset=2, metadata='test metadata')}Investigation
rd_kafka_offset_storeshould pass the metadata tord_kafka_offset_store0so that it can store the last metadata sent in the topic partitiontoppar.It can then be recovered when re-creating the offsets list from stored data here.
master...mathispesch:librdkafka:store-offsets-metadata is a proposed patch, that I can continue working on to try and factorise it and add tests.
Checklist
1.9.12.7.2enable.auto.commit=true, enable.auto.offset.store=falseUbuntu 20.04debug=..as necessary) from librdkafka: https://gist.github.com/mathispesch/1df4d26873fa483b202497a198936c76