[KIP-848] Mock handler and Integration tests passing#4662
[KIP-848] Mock handler and Integration tests passing#4662Emanuele Sabellico (emasab) merged 14 commits intomasterfrom
Conversation
6667956 to
ca1eb2d
Compare
1d9962c to
ca5e6d0
Compare
ca1eb2d to
953f668
Compare
fd13d77 to
f519c11
Compare
953f668 to
e4614be
Compare
f519c11 to
029b5ab
Compare
e4614be to
10e7cc1
Compare
029b5ab to
d7428d0
Compare
10e7cc1 to
5789307
Compare
d7428d0 to
0379129
Compare
434da75 to
e5f3101
Compare
0379129 to
c6c9d2e
Compare
e5f3101 to
109d348
Compare
c6c9d2e to
cf1ad35
Compare
109d348 to
3dd0e45
Compare
3dd0e45 to
3bba91f
Compare
0396b1f to
27817d0
Compare
3bba91f to
f4fb7a1
Compare
91df086 to
82774f9
Compare
f04dc32 to
8d9ed36
Compare
Pranav Rathi (pranavrth)
left a comment
There was a problem hiding this comment.
For src files.
| /** | ||
| * @brief Get member epoch of a group metadata. | ||
| * Corresponds to the generation id in consumer protocol classic; | ||
| * | ||
| * @param group_metadata The group metadata | ||
| * | ||
| * @returns The member epoch id contained in the passed \p group_metadata. | ||
| */ | ||
| RD_EXPORT | ||
| int32_t rd_kafka_consumer_group_metadata_member_epoch( | ||
| const rd_kafka_consumer_group_metadata_t *group_metadata); |
There was a problem hiding this comment.
Do we have method for retrieving geenration id? If not, do we need one for member epoch? If we really need this for member epoch, let's create one for generation id separately.
There was a problem hiding this comment.
In Java GroupMetadata they haven't added a new one for member epoch, maybe naming can be changed in a future major release.
There was a problem hiding this comment.
I was saying that shall we add a new method rd_kafka_consumer_group_metadata_generation_id as well.
There was a problem hiding this comment.
OK, I think I'll leave only rd_kafka_consumer_group_metadata_generation_id as they didn't add the memberEpoch method in Java
There was a problem hiding this comment.
/**
* @brief Get the generation id (classic protocol)
* or member epoch (consumer protocol) of a group
Is this fine description?
There was a problem hiding this comment.
Yes, correct
| rd_kafka_buf_read_str(rkbuf, &ServerAssignor); | ||
|
|
||
| /* #TopicPartitions */ | ||
| rd_kafka_buf_read_arraycnt(rkbuf, &TopicPartitionsCnt, |
There was a problem hiding this comment.
I think we should use rd_kafka_buf_read_topic_partitions here.
There was a problem hiding this comment.
If you do that, then you need to scan the topic partition list, and for each topic partition you need to compare topic id to see if it has changed to call rd_kafka_mock_topic_find_by_id. If it returned like a Map<Topic, List<Partition>> or we didn't have to do an operation at each topic change I'd be more inclined to do than. What's your view on this?
There was a problem hiding this comment.
I think its better to use 2 loops here. First to read the data from the rpc and second for our business logic. This ensures readability, reusability and separation of concern.
There was a problem hiding this comment.
ok, done
Pranav Rathi (pranavrth)
left a comment
There was a problem hiding this comment.
Some 14 files more remaining but the main one is tests/0147-consumer_group_consumer_mock.c.
| do_test_CreateTopics("temp queue, op timeout 0", rk, NULL, 0, 0); | ||
| /* FIXME: KRaft async CreateTopics is working differently than | ||
| * wth Zookeeper | ||
| * do_test_CreateTopics("temp queue, op timeout 0", rk, NULL, 0, 0); */ |
There was a problem hiding this comment.
Does AK team know about it?
Let's have another function to determine whether its a zookeeper or kraft cluster and disable only when it kraft.
There was a problem hiding this comment.
Sure, I opened this issue.
Then talked about it internally. I think it was a bug and not a feature given they didn't acknowledge it was a feature to replicate with KRaft, we could remove the test for both when that is closed.
| if (test_consumer_group_protocol_classic() && | ||
| !(rebalance_cb1.assign_call_cnt == expected_cb1_assign_call_cnt && | ||
| rebalance_cb2.assign_call_cnt == expected_cb2_assign_call_cnt)) |
There was a problem hiding this comment.
We should have similar calculations for the new protocol as well.
Same for others as well below.
There was a problem hiding this comment.
With KIP-848 the callback count isn't fixed as in classic protocol so we can only verify the expected final assignment but not the number of callbacks.
There was a problem hiding this comment.
I think we can do that deterministically but might be more complex.
| } else if (0) { | ||
| /* FIXME: enable this once new errors are handled in OffsetCommit. */ | ||
| t_consumer_group_consumer_retry_commit_on_fenced_member_epoch(); | ||
| } |
There was a problem hiding this comment.
I will enable this in my other opened PR.
There was a problem hiding this comment.
I checked it and I think we should remove this test, as now OffsetCommit returns STALE_MEMBER_EPOCH that is not retried automatically and FENCED _MEMBER_EPOCH is returned by the Heartbeat only
| if (test_consumer_group_protocol_classic()) { | ||
| ApiKey = RD_KAFKAP_Heartbeat; | ||
| } else { | ||
| ApiKey = RD_KAFKAP_ConsumerGroupHeartbeat; | ||
| } |
There was a problem hiding this comment.
This should be opposite condition. if condition should be for consumer as that is the new one.
There was a problem hiding this comment.
I've always used always test_consumer_group_protocol_classic(), even if implementing the other function (that I can remove) so it's consistent for search, also it isolates the parts that we want to remove in a distant future.
Also if there was a third protocol, we hope not, that should start from the assumptions and tests of the consumer protocol, not with those of the classic protocol.
There was a problem hiding this comment.
We should be using test_consumer_group_protocol_consumer at all the places instead of test_consumer_group_protocol_classic. Reason behind this is:
- Let's say we add a new protocol in future. We want all the tests to run. We are making sure of that by using
!test_consumer_group_protocol_consumer()here instead. - In
!test_consumer_group_protocol_classic()condition, we will skip those tests with the new protocol added.
There was a problem hiding this comment.
No, in this case we're running the RD_KAFKAP_ConsumerGroupHeartbeat test that is more appropriate than running the classic one, if we use test_consumer_group_protocol_consumer ... else in case of a new protocol we run the classic tests, while running the newer ones would be more appropriate.
There was a problem hiding this comment.
I am talking about all the places. Not just here. Basically the idea is to make sure that we run all the test cases when we add new protocols. Let's ensure that when there is a new protocol, all the tests are executed.
There was a problem hiding this comment.
yes, all tests about the new protocol, the tests about the classic protocol don't have to be executed as they won't be applicable. Probably the tests about the 848 protocol won't be applicable either but it's more probable it'll be similar to 848 in case it happens.
Pranav Rathi (pranavrth)
left a comment
There was a problem hiding this comment.
Some more comments
There was a problem hiding this comment.
We should release new trivup version. Lets check with Magnus.
|
🎉 All Contributor License Agreements have been signed. Ready to merge. |
| } else { | ||
| rd_kafka_mock_cgrp_consumer_member_leave( | ||
| mcgrp, member); | ||
| member = NULL; | ||
| } |
There was a problem hiding this comment.
In the static membership case (-2), the member can rejoin. We shouldn't destroy the member. You can add a FIXME for now here.
There was a problem hiding this comment.
I've implemented it and added some tests for static group membership with the mock cluster
| - make -j -C tests build | ||
| - make -C tests run_local_quick | ||
| - DESTDIR="$PWD/dest" make install | ||
| - export TEST_KAFKA_VERSION=4.0.0 |
There was a problem hiding this comment.
Where is this being used? 4.0.0 is not released yet.
There was a problem hiding this comment.
This is used in tests to check if a test can run or not, if for example it's >= a specific version, given we're using master I used the next planned release here, but could have used 9.9.9 too
- Mock handler implementation - Rename current consumer protocol from generic to classic - Mock handler with automatic or manual assignment - More consumer group metadata getters - Test helpers - Expedite next HB after FindCoordinator doing it with an exponential backoff to avoid tight loops - Configurable session timeout and HB interval - Fix mock handler ListOffsets response LeaderEpoch instead of CurrentLeaderEpoch - Integration tests passing with AK trunk - Improve documentation and KIP 848 specific mock tests - Add mock tests for unknown topic id in metadata request and partial reconciliation - Make test 0147 more reliable - Fix test 0106 after HB timeout change - Exclude test case with AK trunk - Rename rd_kafka_buf_write_tags to rd_kafka_buf_write_tags_empty - Trivup 0.12.5 can run a KafkaCluster directly with KRaft and AK trunk - Trivup 0.12.6 build with a specific commit
and Py 3.12
an issue with apache/kafka#16464 on AK > 3.8.0
Address comment about using rd_kafka_buf_read_topic_partitions Asserts on non-nullable fields
doing it with an exponential backoff to avoid tight loops
LeaderEpoch instead of CurrentLeaderEpoch