Skip to content

Short metadata refresh time triggers a refetch of already fetched message despite no cluster changes #4249

@mensfeld

Description

Hello,

I'm one of the maintainers of rdkafka-ruby bindings and Karafka framework author (https://github.com/karafka/karafka). I wanted to upgrade librdkafka from 2.0.2 to 2.1.0 in the ruby bindings. However, I noticed one issue that prevented me from doing this.

For some scenarios, I use a really short topic.metadata.refresh.interval.ms of 5_000 ms. This is used mainly in dev to update consumer and producer metadata states upon cluster changes quickly. I noticed that despite no cluster changes and only a few messages being produced, such a short interval causes duplicates when polling almost every time metadata refresh occurs.

The flow of the code is as follows:

  1. I create a topic with one partition
  2. I produce to it one message (first message)
  3. I create a consumer that consumes a single message, and after its consumption, it creates another message to be consumed
  4. I store all the offsets in an array, and I expect no duplicates

The code below works as expected with librdkafka 2.0.2, and none of my integration tests fail.

Consumer #each just runs a poll and yields the message.

topic = 'test'
offsets = []

Thread.new do
  # start after the consumer starts since it starts from latest
  sleep(5)
  Karafka.producer.produce_sync(topic: topic, payload: '1')
end

config = {
  'bootstrap.servers': 'localhost:9092',
  'group.id': Time.now.to_f.to_s,
  'auto.offset.reset': 'latest',
  'enable.auto.offset.store': 'false',
  'debug': 'all',
  'topic.metadata.refresh.interval.ms': 5_000
}

consumer = Rdkafka::Config.new(config).consumer

consumer.subscribe(topic)

consumer.each do |message|
  raise if offsets.include?(message.offset)

  offsets << message.offset

  Karafka.producer.produce_sync(topic: topic, payload: '1')
end

Note: Karafka.producer is just a wrapped librdkafka that dispatches a message and waits for the delivery callback. Producer metadata refresh is set much higher (100 000 ms)

Checklist

Please provide the following information:

  • librdkafka version (release number or git tag): 2.1.0
  • Apache Kafka version: 2.8.1 and 3.4.0 from bitnami
  • librdkafka client configuration: presented in the above code snippet
  • Operating system: Linux 5.4.0-146-generic #163-Ubuntu SMP Fri Mar 17 18:26:02 UTC 2023 x86_64 x86_64 x86_64 GNU/Linux
  • Provide logs (with debug=.. as necessary) from librdkafka
  • Provide broker log excerpts
  • Critical issue - I would consider it critical, because metadata refresh should not cause duplicates on poll without a rebalance. A metadata refresh on a cluster without changes should not cause any of those issues.

Logs

Bitnami Kafka logs:

kafka    | [2023-04-12 11:20:46,844] INFO [GroupCoordinator 1]: Dynamic member with unknown member id joins group 1681298446.83954 in Empty state. Created a new member id rdkafka-bcd9def1-b0e3-4ad2-ba33-8ff6f0f53810 and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
kafka    | [2023-04-12 11:20:46,844] INFO [GroupCoordinator 1]: Preparing to rebalance group 1681298446.83954 in state PreparingRebalance with old generation 0 (__consumer_offsets-14) (reason: Adding new member rdkafka-bcd9def1-b0e3-4ad2-ba33-8ff6f0f53810 with group instance id None; client reason: not provided) (kafka.coordinator.group.GroupCoordinator)
kafka    | [2023-04-12 11:20:46,845] INFO [GroupCoordinator 1]: Stabilized group 1681298446.83954 generation 1 (__consumer_offsets-14) with 1 members (kafka.coordinator.group.GroupCoordinator)
kafka    | [2023-04-12 11:20:46,846] INFO [GroupCoordinator 1]: Assignment received from leader rdkafka-bcd9def1-b0e3-4ad2-ba33-8ff6f0f53810 for group 1681298446.83954 for generation 1. The group has 1 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)
kafka    | [2023-04-12 11:20:49,098] INFO [GroupMetadataManager brokerId=1] Group 1681298338.3458133 transitioned to Dead in generation 2 (kafka.coordinator.group.GroupMetadataManager)
kafka    | [2023-04-12 11:20:49,100] INFO [GroupMetadataManager brokerId=1] Group 1681298390.4270868 transitioned to Dead in generation 2 (kafka.coordinator.group.GroupMetadataManager)
kafka    | [2023-04-12 11:20:49,100] INFO [GroupMetadataManager brokerId=1] Group anything transitioned to Dead in generation 11 (kafka.coordinator.group.GroupMetadataManager)

librdkafka debug all logs + some basic Karafka logs

Full issue log here: https://gist.github.com/mensfeld/9782da56cffc5fd7293000522e4b6744 as it did not fit into the comment

D, [2023-04-12T13:20:51.858815 #418730] DEBUG -- : rdkafka: [thrd:main]: Partition test2 [0] changed fetch state active -> validate-epoch-wait
D, [2023-04-12T13:20:51.858846 #418730] DEBUG -- : rdkafka: [thrd:main]: 127.0.0.1:9092/1: test2 [0]: querying broker for epoch validation of offset 480 (leader epoch 0): epoch updated from metadata
D, [2023-04-12T13:20:51.858860 #418730] DEBUG -- : rdkafka: [thrd:main]: 127.0.0.1:9092/1:   Broker #0/1: 127.0.0.1:9092 NodeId 1
D, [2023-04-12T13:20:51.858875 #418730] DEBUG -- : rdkafka: [thrd:main]: 127.0.0.1:9092/1: 1/1 requested topic(s) seen in metadata
D, [2023-04-12T13:20:51.858884 #418730] DEBUG -- : rdkafka: [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Topic test2 [0] in state validate-epoch-wait at offset 480 (leader epoch 0) (11/100000 msgs, 0/65536 kb queued, opv 4) is not fetchable: not in active fetch state
D, [2023-04-12T13:20:51.858895 #418730] DEBUG -- : rdkafka: [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Removed test2 [0] from fetch list (0 entries, opv 4): not in active fetch state
D, [2023-04-12T13:20:51.858903 #418730] DEBUG -- : rdkafka: [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Sent OffsetForLeaderEpochRequest (v2, 48 bytes @ 0, CorrId 15)
D, [2023-04-12T13:20:51.858912 #418730] DEBUG -- : rdkafka: [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Received FetchResponse (v11, 136 bytes, CorrId 14, rtt 5.93ms)
D, [2023-04-12T13:20:51.858920 #418730] DEBUG -- : rdkafka: [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Topic test2 [0] MessageSet size 69, error "Success", MaxOffset 481, LSO 481, Ver 4/4
D, [2023-04-12T13:20:51.858929 #418730] DEBUG -- : rdkafka: [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Enqueue 1 message(s) (1 bytes, 1 ops) on test2 [0] fetch queue (qlen 16, v4, last_offset 480, 0 ctrl msgs, 0 aborted msgsets, uncompressed)
D, [2023-04-12T13:20:51.858938 #418730] DEBUG -- : rdkafka: [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Received OffsetForLeaderEpochResponse (v2, 37 bytes, CorrId 15, rtt 5.77ms)
D, [2023-04-12T13:20:51.858947 #418730] DEBUG -- : rdkafka: [thrd:main]: 127.0.0.1:9092/1: test2 [0]: offset and epoch validation succeeded: broker end offset 481 (offset leader epoch 0)
D, [2023-04-12T13:20:51.858955 #418730] DEBUG -- : rdkafka: [thrd:main]: Partition test2 [0] changed fetch state validate-epoch-wait -> active
D, [2023-04-12T13:20:51.858964 #418730] DEBUG -- : rdkafka: [thrd:main]: Partition test2 [0] start fetching at offset 480 (leader epoch 0)
D, [2023-04-12T13:20:51.858972 #418730] DEBUG -- : rdkafka: [thrd:app]: test2 [0] is the new sticky partition
D, [2023-04-12T13:20:51.858981 #418730] DEBUG -- : rdkafka: [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/1: test2 [0] 1 message(s) in xmit queue (1 added from partition queue)
D, [2023-04-12T13:20:51.863871 #418730] DEBUG -- : rdkafka: [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/1: test2 [0] 1 message(s) in xmit queue (0 added from partition queue)
D, [2023-04-12T13:20:51.863906 #418730] DEBUG -- : rdkafka: [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/1: test2 [0]: Produce MessageSet with 1 message(s) (69 bytes, ApiVersion 7, MsgVersion 2, MsgId 3, BaseSeq 2, PID{Id:11,Epoch:0}, uncompressed)
D, [2023-04-12T13:20:51.863919 #418730] DEBUG -- : rdkafka: [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/1: Sent ProduceRequest (v7, 123 bytes @ 0, CorrId 6)
D, [2023-04-12T13:20:51.864340 #418730] DEBUG -- : rdkafka: [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/1: Received ProduceResponse (v7, 49 bytes, CorrId 6, rtt 0.43ms)
D, [2023-04-12T13:20:51.864366 #418730] DEBUG -- : rdkafka: [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/1: test2 [0]: MessageSet with 1 message(s) (MsgId 3, BaseSeq 2) delivered
I, [2023-04-12T13:20:51.868928 #418730]  INFO -- : [4e51e22256f1] Sync producing of a message to 'test2' topic took 10.200689999386668 ms
D, [2023-04-12T13:20:51.868963 #418730] DEBUG -- : [4e51e22256f1] {:topic=>"test2", :payload=>"1"}
D, [2023-04-12T13:20:52.850354 #418730] DEBUG -- : rdkafka: [thrd:127.0.0.1:9092/1]: Topic test2 [0]: fetch decide: updating to version 4 (was 4) at offset 480 (leader epoch 0) (was offset 481 (leader epoch 0))
D, [2023-04-12T13:20:52.850420 #418730] DEBUG -- : rdkafka: [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Topic test2 [0] in state active at offset 480 (leader epoch 0) (1/100000 msgs, 0/65536 kb queued, opv 4) is fetchable
D, [2023-04-12T13:20:52.850444 #418730] DEBUG -- : rdkafka: [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Added test2 [0] to fetch list (1 entries, opv 4, 0 messages queued): fetchable
D, [2023-04-12T13:20:52.850472 #418730] DEBUG -- : rdkafka: [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Fetch topic test2 [0] at offset 480 (leader epoch 0, current leader epoch 0, v4)
D, [2023-04-12T13:20:52.850492 #418730] DEBUG -- : rdkafka: [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Fetch 1/1/1 toppar(s)
D, [2023-04-12T13:20:52.850509 #418730] DEBUG -- : rdkafka: [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Sent FetchRequest (v11, 95 bytes @ 0, CorrId 16)
/mnt/software/Karafka/karafka/spec/integrations/consumption/loop_with_messages_with_headers_spec.rb:60:in `block in <main>': unhandled exception
  from /mnt/software/Karafka/karafka-rdkafka/lib/rdkafka/consumer.rb:460:in `block in each'
  from /mnt/software/Karafka/karafka-rdkafka/lib/rdkafka/consumer.rb:457:in `loop'
  from /mnt/software/Karafka/karafka-rdkafka/lib/rdkafka/consumer.rb:457:in `each'
  from /mnt/software/Karafka/karafka/spec/integrations/consumption/loop_with_messages_with_headers_spec.rb:59:in `<main>'
I, [2023-04-12T13:20:52.942391 #418730]  INFO -- : [4e51e22256f1] Closing producer took 0.03628700040280819 ms

Exactly the same code under 2.0.2 works as expected and can run for several minutes without any issues. I can reproduce it every single time.

I suspect, that it may be related to the metadata cache eviction flow: v2.0.2...v2.1.0#diff-17c2af7f93fd5ac3f7afc7993bcfaa03bf3cb614e522a8dae82e2b077bcfd3beR163

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions