Hello,
I'm one of the maintainers of rdkafka-ruby bindings and Karafka framework author (https://github.com/karafka/karafka). I wanted to upgrade librdkafka from 2.0.2 to 2.1.0 in the ruby bindings. However, I noticed one issue that prevented me from doing this.
For some scenarios, I use a really short topic.metadata.refresh.interval.ms of 5_000 ms. This is used mainly in dev to update consumer and producer metadata states upon cluster changes quickly. I noticed that despite no cluster changes and only a few messages being produced, such a short interval causes duplicates when polling almost every time metadata refresh occurs.
The flow of the code is as follows:
- I create a topic with one partition
- I produce to it one message (first message)
- I create a consumer that consumes a single message, and after its consumption, it creates another message to be consumed
- I store all the offsets in an array, and I expect no duplicates
The code below works as expected with librdkafka 2.0.2, and none of my integration tests fail.
Consumer #each just runs a poll and yields the message.
topic = 'test'
offsets = []
Thread.new do
# start after the consumer starts since it starts from latest
sleep(5)
Karafka.producer.produce_sync(topic: topic, payload: '1')
end
config = {
'bootstrap.servers': 'localhost:9092',
'group.id': Time.now.to_f.to_s,
'auto.offset.reset': 'latest',
'enable.auto.offset.store': 'false',
'debug': 'all',
'topic.metadata.refresh.interval.ms': 5_000
}
consumer = Rdkafka::Config.new(config).consumer
consumer.subscribe(topic)
consumer.each do |message|
raise if offsets.include?(message.offset)
offsets << message.offset
Karafka.producer.produce_sync(topic: topic, payload: '1')
end
Note: Karafka.producer is just a wrapped librdkafka that dispatches a message and waits for the delivery callback. Producer metadata refresh is set much higher (100 000 ms)
Checklist
Please provide the following information:
Logs
Bitnami Kafka logs:
kafka | [2023-04-12 11:20:46,844] INFO [GroupCoordinator 1]: Dynamic member with unknown member id joins group 1681298446.83954 in Empty state. Created a new member id rdkafka-bcd9def1-b0e3-4ad2-ba33-8ff6f0f53810 and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
kafka | [2023-04-12 11:20:46,844] INFO [GroupCoordinator 1]: Preparing to rebalance group 1681298446.83954 in state PreparingRebalance with old generation 0 (__consumer_offsets-14) (reason: Adding new member rdkafka-bcd9def1-b0e3-4ad2-ba33-8ff6f0f53810 with group instance id None; client reason: not provided) (kafka.coordinator.group.GroupCoordinator)
kafka | [2023-04-12 11:20:46,845] INFO [GroupCoordinator 1]: Stabilized group 1681298446.83954 generation 1 (__consumer_offsets-14) with 1 members (kafka.coordinator.group.GroupCoordinator)
kafka | [2023-04-12 11:20:46,846] INFO [GroupCoordinator 1]: Assignment received from leader rdkafka-bcd9def1-b0e3-4ad2-ba33-8ff6f0f53810 for group 1681298446.83954 for generation 1. The group has 1 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)
kafka | [2023-04-12 11:20:49,098] INFO [GroupMetadataManager brokerId=1] Group 1681298338.3458133 transitioned to Dead in generation 2 (kafka.coordinator.group.GroupMetadataManager)
kafka | [2023-04-12 11:20:49,100] INFO [GroupMetadataManager brokerId=1] Group 1681298390.4270868 transitioned to Dead in generation 2 (kafka.coordinator.group.GroupMetadataManager)
kafka | [2023-04-12 11:20:49,100] INFO [GroupMetadataManager brokerId=1] Group anything transitioned to Dead in generation 11 (kafka.coordinator.group.GroupMetadataManager)
librdkafka debug all logs + some basic Karafka logs
Full issue log here: https://gist.github.com/mensfeld/9782da56cffc5fd7293000522e4b6744 as it did not fit into the comment
D, [2023-04-12T13:20:51.858815 #418730] DEBUG -- : rdkafka: [thrd:main]: Partition test2 [0] changed fetch state active -> validate-epoch-wait
D, [2023-04-12T13:20:51.858846 #418730] DEBUG -- : rdkafka: [thrd:main]: 127.0.0.1:9092/1: test2 [0]: querying broker for epoch validation of offset 480 (leader epoch 0): epoch updated from metadata
D, [2023-04-12T13:20:51.858860 #418730] DEBUG -- : rdkafka: [thrd:main]: 127.0.0.1:9092/1: Broker #0/1: 127.0.0.1:9092 NodeId 1
D, [2023-04-12T13:20:51.858875 #418730] DEBUG -- : rdkafka: [thrd:main]: 127.0.0.1:9092/1: 1/1 requested topic(s) seen in metadata
D, [2023-04-12T13:20:51.858884 #418730] DEBUG -- : rdkafka: [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Topic test2 [0] in state validate-epoch-wait at offset 480 (leader epoch 0) (11/100000 msgs, 0/65536 kb queued, opv 4) is not fetchable: not in active fetch state
D, [2023-04-12T13:20:51.858895 #418730] DEBUG -- : rdkafka: [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Removed test2 [0] from fetch list (0 entries, opv 4): not in active fetch state
D, [2023-04-12T13:20:51.858903 #418730] DEBUG -- : rdkafka: [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Sent OffsetForLeaderEpochRequest (v2, 48 bytes @ 0, CorrId 15)
D, [2023-04-12T13:20:51.858912 #418730] DEBUG -- : rdkafka: [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Received FetchResponse (v11, 136 bytes, CorrId 14, rtt 5.93ms)
D, [2023-04-12T13:20:51.858920 #418730] DEBUG -- : rdkafka: [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Topic test2 [0] MessageSet size 69, error "Success", MaxOffset 481, LSO 481, Ver 4/4
D, [2023-04-12T13:20:51.858929 #418730] DEBUG -- : rdkafka: [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Enqueue 1 message(s) (1 bytes, 1 ops) on test2 [0] fetch queue (qlen 16, v4, last_offset 480, 0 ctrl msgs, 0 aborted msgsets, uncompressed)
D, [2023-04-12T13:20:51.858938 #418730] DEBUG -- : rdkafka: [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Received OffsetForLeaderEpochResponse (v2, 37 bytes, CorrId 15, rtt 5.77ms)
D, [2023-04-12T13:20:51.858947 #418730] DEBUG -- : rdkafka: [thrd:main]: 127.0.0.1:9092/1: test2 [0]: offset and epoch validation succeeded: broker end offset 481 (offset leader epoch 0)
D, [2023-04-12T13:20:51.858955 #418730] DEBUG -- : rdkafka: [thrd:main]: Partition test2 [0] changed fetch state validate-epoch-wait -> active
D, [2023-04-12T13:20:51.858964 #418730] DEBUG -- : rdkafka: [thrd:main]: Partition test2 [0] start fetching at offset 480 (leader epoch 0)
D, [2023-04-12T13:20:51.858972 #418730] DEBUG -- : rdkafka: [thrd:app]: test2 [0] is the new sticky partition
D, [2023-04-12T13:20:51.858981 #418730] DEBUG -- : rdkafka: [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/1: test2 [0] 1 message(s) in xmit queue (1 added from partition queue)
D, [2023-04-12T13:20:51.863871 #418730] DEBUG -- : rdkafka: [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/1: test2 [0] 1 message(s) in xmit queue (0 added from partition queue)
D, [2023-04-12T13:20:51.863906 #418730] DEBUG -- : rdkafka: [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/1: test2 [0]: Produce MessageSet with 1 message(s) (69 bytes, ApiVersion 7, MsgVersion 2, MsgId 3, BaseSeq 2, PID{Id:11,Epoch:0}, uncompressed)
D, [2023-04-12T13:20:51.863919 #418730] DEBUG -- : rdkafka: [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/1: Sent ProduceRequest (v7, 123 bytes @ 0, CorrId 6)
D, [2023-04-12T13:20:51.864340 #418730] DEBUG -- : rdkafka: [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/1: Received ProduceResponse (v7, 49 bytes, CorrId 6, rtt 0.43ms)
D, [2023-04-12T13:20:51.864366 #418730] DEBUG -- : rdkafka: [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/1: test2 [0]: MessageSet with 1 message(s) (MsgId 3, BaseSeq 2) delivered
I, [2023-04-12T13:20:51.868928 #418730] INFO -- : [4e51e22256f1] Sync producing of a message to 'test2' topic took 10.200689999386668 ms
D, [2023-04-12T13:20:51.868963 #418730] DEBUG -- : [4e51e22256f1] {:topic=>"test2", :payload=>"1"}
D, [2023-04-12T13:20:52.850354 #418730] DEBUG -- : rdkafka: [thrd:127.0.0.1:9092/1]: Topic test2 [0]: fetch decide: updating to version 4 (was 4) at offset 480 (leader epoch 0) (was offset 481 (leader epoch 0))
D, [2023-04-12T13:20:52.850420 #418730] DEBUG -- : rdkafka: [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Topic test2 [0] in state active at offset 480 (leader epoch 0) (1/100000 msgs, 0/65536 kb queued, opv 4) is fetchable
D, [2023-04-12T13:20:52.850444 #418730] DEBUG -- : rdkafka: [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Added test2 [0] to fetch list (1 entries, opv 4, 0 messages queued): fetchable
D, [2023-04-12T13:20:52.850472 #418730] DEBUG -- : rdkafka: [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Fetch topic test2 [0] at offset 480 (leader epoch 0, current leader epoch 0, v4)
D, [2023-04-12T13:20:52.850492 #418730] DEBUG -- : rdkafka: [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Fetch 1/1/1 toppar(s)
D, [2023-04-12T13:20:52.850509 #418730] DEBUG -- : rdkafka: [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Sent FetchRequest (v11, 95 bytes @ 0, CorrId 16)
/mnt/software/Karafka/karafka/spec/integrations/consumption/loop_with_messages_with_headers_spec.rb:60:in `block in <main>': unhandled exception
from /mnt/software/Karafka/karafka-rdkafka/lib/rdkafka/consumer.rb:460:in `block in each'
from /mnt/software/Karafka/karafka-rdkafka/lib/rdkafka/consumer.rb:457:in `loop'
from /mnt/software/Karafka/karafka-rdkafka/lib/rdkafka/consumer.rb:457:in `each'
from /mnt/software/Karafka/karafka/spec/integrations/consumption/loop_with_messages_with_headers_spec.rb:59:in `<main>'
I, [2023-04-12T13:20:52.942391 #418730] INFO -- : [4e51e22256f1] Closing producer took 0.03628700040280819 ms
Exactly the same code under 2.0.2 works as expected and can run for several minutes without any issues. I can reproduce it every single time.
I suspect, that it may be related to the metadata cache eviction flow: v2.0.2...v2.1.0#diff-17c2af7f93fd5ac3f7afc7993bcfaa03bf3cb614e522a8dae82e2b077bcfd3beR163
Hello,
I'm one of the maintainers of rdkafka-ruby bindings and Karafka framework author (https://github.com/karafka/karafka). I wanted to upgrade librdkafka from 2.0.2 to 2.1.0 in the ruby bindings. However, I noticed one issue that prevented me from doing this.
For some scenarios, I use a really short
topic.metadata.refresh.interval.msof5_000ms. This is used mainly in dev to update consumer and producer metadata states upon cluster changes quickly. I noticed that despite no cluster changes and only a few messages being produced, such a short interval causes duplicates when polling almost every time metadata refresh occurs.The flow of the code is as follows:
The code below works as expected with librdkafka 2.0.2, and none of my integration tests fail.
Consumer
#eachjust runs a poll and yields the message.Note:
Karafka.produceris just a wrapped librdkafka that dispatches a message and waits for the delivery callback. Producer metadata refresh is set much higher (100 000 ms)Checklist
Please provide the following information:
2.1.02.8.1and3.4.0from bitnamiLinux 5.4.0-146-generic #163-Ubuntu SMP Fri Mar 17 18:26:02 UTC 2023 x86_64 x86_64 x86_64 GNU/Linuxdebug=..as necessary) from librdkafkaLogs
Bitnami Kafka logs:
librdkafka debug all logs + some basic Karafka logs
Full issue log here: https://gist.github.com/mensfeld/9782da56cffc5fd7293000522e4b6744 as it did not fit into the comment
Exactly the same code under 2.0.2 works as expected and can run for several minutes without any issues. I can reproduce it every single time.
I suspect, that it may be related to the metadata cache eviction flow: v2.0.2...v2.1.0#diff-17c2af7f93fd5ac3f7afc7993bcfaa03bf3cb614e522a8dae82e2b077bcfd3beR163