Skip to content

Manual offset incrementation/auto commiting stopped working on Azure eventhub since version 2.1.0 #4384

@pbizon

Description

@pbizon

Read the FAQ first: https://github.com/confluentinc/librdkafka/wiki/FAQ

Do NOT create issues for questions, use the discussion forum: https://github.com/confluentinc/librdkafka/discussions

Description

Changes introduced in this PR:
a0871a1#diff-14f07c2626b2863a69e4b9b625720d25a860298da6d8f3dd81289b617ad7a9b5
cause that before setting a new offset in rd_kafka_topic_partition_list_set_offsets we compare not only offsets but also leader_epochs.

    if (rd_kafka_fetch_pos_cmp(&rktp->rktp_stored_pos,
                                &rktp->rktp_committed_pos) >
        0) {
        ...
        }
rd_kafka_fetch_pos_cmp(const rd_kafka_fetch_pos_t *a,
                       const rd_kafka_fetch_pos_t *b) {
        if (a->leader_epoch < b->leader_epoch)
                return -1;
        else if (a->leader_epoch > b->leader_epoch)
                return 1;
        else if (a->offset < b->offset)
                return -1;
        else if (a->offset > b->offset)
                return 1;
        else
                return 0;
}

Everything works fine with Apache Kafka implementation which send -1 as leaderEpoch in OffsetFetchResponse.

{"severity":7,"fac":"OFFSETFETCH","message":"[thrd:main]: GroupCoordinator/1: OffsetFetchResponse: test-topic [0] offset 1, leader epoch -1, metadata 0 byte(s): NO_ERROR"}

Unfortunatelly Azure eventhub sends 0.

{"severity":7,"fac":"OFFSETFETCH","message":"[thrd:main]: GroupCoordinator/0: OffsetFetchResponse: test-topic [0] offset -1, leader epoch 0, metadata 0 byte(s): NO_ERROR"}
[KAFKA] {"severity":7,"fac":"OFFSET","message":"[thrd:main]: Topic test-topic [2]: stored offset 316172 (leader epoch -1), committed offset INVALID (leader epoch 0): not including in commit"}
[KAFKA] {"severity":7,"fac":"OFFSET","message":"[thrd:main]: Topic test-topic [3]: stored offset 318224 (leader epoch -1), committed offset 318180 (leader epoch 0): not including in commit"}

And now when the timer expires even though new_offset > old_offset unfortunately new_leader_epoch < old_leader_epoch (-1 < 0) and we are not committing this offset.

I didn't find anything about default value for leader_offset in this document: https://cwiki.apache.org/confluence/display/KAFKA/KIP-320%3A+Allow+fetchers+to+detect+and+handle+log+truncation so I'm assuming that 0 is a valid value.

How to reproduce

Connect consumer to the topic of Azure eventhub with these configuration parameters:
'enable.auto.commit': true,
'auto.commit.interval.ms': 5000,
'enable.auto.offset.store': false,
offset_commit_cb: [Function: offset_commit_cb]

Increment offset manually in your consumer.

Send something to the test topic and observe whether offset_commit_cb has been called or not.

IMPORTANT: Always try to reproduce the issue on the latest released version (see https://github.com/confluentinc/librdkafka/releases), if it can't be reproduced on the latest version the issue has been fixed.

Checklist

IMPORTANT: We will close issues where the checklist has not been completed.

Please provide the following information:

  • librdkafka version (release number or git tag): since v2.1.0
  • Apache Kafka version: Azure Eventhub
  • librdkafka client configuration: 'client.id': '$ConnectionString', 'group.id': 'kafka-consumer-1', 'metadata.broker.list': '[redacted].windows.net:9093', 'topic.metadata.refresh.interval.ms': 60000, 'metadata.max.age.ms': 180000, 'session.timeout.ms': 30000, 'sasl.mechanisms': 'PLAIN', 'sasl.username': '$ConnectionString', 'sasl.password': [redacted], 'security.protocol': 'SASL_SSL', 'ssl.ca.location': '/etc/ssl/certs', 'socket.keepalive.enable': true, 'enable.auto.commit': true, 'auto.commit.interval.ms': 5000, 'enable.auto.offset.store': false, debug: 'broker,consumer,msg,topic', offset_commit_cb: [Function: offset_commit_cb]
  • Operating system: Linux
  • Provide logs (with debug=.. as necessary) from librdkafka
  • Provide broker log excerpts
  • Critical issue

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions