Skip to content

[KIP-848] Mock handler and Integration tests passing#4662

Merged
Emanuele Sabellico (emasab) merged 14 commits intomasterfrom
dev_kip848_mock_handler_and_integration_tests
Dec 13, 2024
Merged

[KIP-848] Mock handler and Integration tests passing#4662
Emanuele Sabellico (emasab) merged 14 commits intomasterfrom
dev_kip848_mock_handler_and_integration_tests

Conversation

@emasab
Copy link
Copy Markdown
Contributor

@emasab Emanuele Sabellico (emasab) commented Mar 26, 2024

  • Mock handler implementation
  • Rename current consumer protocol from generic to classic
  • Mock handler with automatic or manual assignment
  • More consumer group metadata getters
  • Test helpers
  • Expedite next HB after FindCoordinator
    doing it with an exponential backoff to avoid tight loops
  • Configurable session timeout and HB interval
  • Fix mock handler ListOffsets response
    LeaderEpoch instead of CurrentLeaderEpoch
  • Integration tests passing with AK trunk
  • Improve documentation and KIP 848 specific mock tests

@emasab Emanuele Sabellico (emasab) force-pushed the dev_kip848_use_metadata_cache_and_fixes branch from 6667956 to ca1eb2d Compare March 26, 2024 17:48
@emasab Emanuele Sabellico (emasab) force-pushed the dev_kip848_mock_handler_and_integration_tests branch 2 times, most recently from 1d9962c to ca5e6d0 Compare March 27, 2024 10:43
@emasab Emanuele Sabellico (emasab) force-pushed the dev_kip848_use_metadata_cache_and_fixes branch from ca1eb2d to 953f668 Compare March 28, 2024 14:41
@emasab Emanuele Sabellico (emasab) force-pushed the dev_kip848_mock_handler_and_integration_tests branch 6 times, most recently from fd13d77 to f519c11 Compare March 28, 2024 22:35
@emasab Emanuele Sabellico (emasab) force-pushed the dev_kip848_use_metadata_cache_and_fixes branch from 953f668 to e4614be Compare April 1, 2024 14:40
@emasab Emanuele Sabellico (emasab) force-pushed the dev_kip848_mock_handler_and_integration_tests branch from f519c11 to 029b5ab Compare April 1, 2024 14:41
@emasab Emanuele Sabellico (emasab) force-pushed the dev_kip848_use_metadata_cache_and_fixes branch from e4614be to 10e7cc1 Compare April 1, 2024 15:07
@emasab Emanuele Sabellico (emasab) force-pushed the dev_kip848_mock_handler_and_integration_tests branch from 029b5ab to d7428d0 Compare April 1, 2024 15:07
@emasab Emanuele Sabellico (emasab) force-pushed the dev_kip848_use_metadata_cache_and_fixes branch from 10e7cc1 to 5789307 Compare April 1, 2024 16:48
@emasab Emanuele Sabellico (emasab) force-pushed the dev_kip848_mock_handler_and_integration_tests branch from d7428d0 to 0379129 Compare April 1, 2024 16:50
@emasab Emanuele Sabellico (emasab) force-pushed the dev_kip848_use_metadata_cache_and_fixes branch 2 times, most recently from 434da75 to e5f3101 Compare April 3, 2024 13:40
@emasab Emanuele Sabellico (emasab) force-pushed the dev_kip848_mock_handler_and_integration_tests branch from 0379129 to c6c9d2e Compare April 3, 2024 13:41
@emasab Emanuele Sabellico (emasab) force-pushed the dev_kip848_use_metadata_cache_and_fixes branch from e5f3101 to 109d348 Compare April 3, 2024 16:54
@emasab Emanuele Sabellico (emasab) force-pushed the dev_kip848_mock_handler_and_integration_tests branch from c6c9d2e to cf1ad35 Compare April 3, 2024 16:54
@emasab Emanuele Sabellico (emasab) force-pushed the dev_kip848_use_metadata_cache_and_fixes branch from 109d348 to 3dd0e45 Compare April 5, 2024 08:28
@emasab Emanuele Sabellico (emasab) changed the base branch from dev_kip848_use_metadata_cache_and_fixes to master April 5, 2024 08:38
@emasab Emanuele Sabellico (emasab) changed the base branch from master to dev_kip848_use_metadata_cache_and_fixes April 5, 2024 08:39
@emasab Emanuele Sabellico (emasab) force-pushed the dev_kip848_use_metadata_cache_and_fixes branch from 3dd0e45 to 3bba91f Compare April 5, 2024 19:19
@emasab Emanuele Sabellico (emasab) force-pushed the dev_kip848_mock_handler_and_integration_tests branch from 0396b1f to 27817d0 Compare April 5, 2024 19:21
@emasab Emanuele Sabellico (emasab) force-pushed the dev_kip848_use_metadata_cache_and_fixes branch from 3bba91f to f4fb7a1 Compare April 5, 2024 19:52
@emasab Emanuele Sabellico (emasab) force-pushed the dev_kip848_mock_handler_and_integration_tests branch 3 times, most recently from 91df086 to 82774f9 Compare April 6, 2024 12:14
Base automatically changed from dev_kip848 to master April 18, 2024 11:03
@emasab Emanuele Sabellico (emasab) force-pushed the dev_kip848_mock_handler_and_integration_tests branch 2 times, most recently from f04dc32 to 8d9ed36 Compare April 24, 2024 15:48
Comment thread src/rdkafka_mock_handlers.c Outdated
Copy link
Copy Markdown
Member

@pranavrth Pranav Rathi (pranavrth) left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For src files.

Comment thread .semaphore/semaphore.yml Outdated
Comment thread .semaphore/semaphore.yml Outdated
Comment thread src/rdkafka.h Outdated
Comment thread src/rdkafka.h
Comment on lines +4627 to +4646
/**
* @brief Get member epoch of a group metadata.
* Corresponds to the generation id in consumer protocol classic;
*
* @param group_metadata The group metadata
*
* @returns The member epoch id contained in the passed \p group_metadata.
*/
RD_EXPORT
int32_t rd_kafka_consumer_group_metadata_member_epoch(
const rd_kafka_consumer_group_metadata_t *group_metadata);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have method for retrieving geenration id? If not, do we need one for member epoch? If we really need this for member epoch, let's create one for generation id separately.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Java GroupMetadata they haven't added a new one for member epoch, maybe naming can be changed in a future major release.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was saying that shall we add a new method rd_kafka_consumer_group_metadata_generation_id as well.

Copy link
Copy Markdown
Contributor Author

@emasab Emanuele Sabellico (emasab) Nov 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I think I'll leave only rd_kafka_consumer_group_metadata_generation_id as they didn't add the memberEpoch method in Java

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/**
 * @brief Get the generation id (classic protocol)
 *        or member epoch (consumer protocol) of a group 

Is this fine description?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, correct

Comment thread src/rdkafka_mock.h Outdated
Comment thread src/rdkafka_mock_cgrp.c Outdated
Comment thread src/rdkafka_mock_cgrp.c
Comment thread src/rdkafka_mock_cgrp.c
Comment thread src/rdkafka_mock_cgrp.c Outdated
Comment thread src/rdkafka_mock_cgrp.c Outdated
Comment thread src/rdkafka_mock_handlers.c
Comment thread src/rdkafka_mock_handlers.c Outdated
rd_kafka_buf_read_str(rkbuf, &ServerAssignor);

/* #TopicPartitions */
rd_kafka_buf_read_arraycnt(rkbuf, &TopicPartitionsCnt,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should use rd_kafka_buf_read_topic_partitions here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you do that, then you need to scan the topic partition list, and for each topic partition you need to compare topic id to see if it has changed to call rd_kafka_mock_topic_find_by_id. If it returned like a Map<Topic, List<Partition>> or we didn't have to do an operation at each topic change I'd be more inclined to do than. What's your view on this?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think its better to use 2 loops here. First to read the data from the rpc and second for our business logic. This ensures readability, reusability and separation of concern.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, done

Comment thread src/rdkafka_mock_handlers.c
Copy link
Copy Markdown
Member

@pranavrth Pranav Rathi (pranavrth) left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some 14 files more remaining but the main one is tests/0147-consumer_group_consumer_mock.c.

Comment thread tests/0045-subscribe_update.c
Comment thread tests/0081-admin.c
do_test_CreateTopics("temp queue, op timeout 0", rk, NULL, 0, 0);
/* FIXME: KRaft async CreateTopics is working differently than
* wth Zookeeper
* do_test_CreateTopics("temp queue, op timeout 0", rk, NULL, 0, 0); */
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does AK team know about it?

Let's have another function to determine whether its a zookeeper or kraft cluster and disable only when it kraft.

Copy link
Copy Markdown
Contributor Author

@emasab Emanuele Sabellico (emasab) Oct 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I opened this issue.
Then talked about it internally. I think it was a bug and not a feature given they didn't acknowledge it was a feature to replicate with KRaft, we could remove the test for both when that is closed.

Comment thread tests/0081-admin.c Outdated
Comment thread tests/0112-assign_unknown_part.c
Comment on lines +935 to 937
if (test_consumer_group_protocol_classic() &&
!(rebalance_cb1.assign_call_cnt == expected_cb1_assign_call_cnt &&
rebalance_cb2.assign_call_cnt == expected_cb2_assign_call_cnt))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have similar calculations for the new protocol as well.

Same for others as well below.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With KIP-848 the callback count isn't fixed as in classic protocol so we can only verify the expected final assignment but not the number of callbacks.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can do that deterministically but might be more complex.

Comment thread tests/0113-cooperative_rebalance.cpp
Comment on lines +3381 to +3384
} else if (0) {
/* FIXME: enable this once new errors are handled in OffsetCommit. */
t_consumer_group_consumer_retry_commit_on_fenced_member_epoch();
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will enable this in my other opened PR.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked it and I think we should remove this test, as now OffsetCommit returns STALE_MEMBER_EPOCH that is not retried automatically and FENCED _MEMBER_EPOCH is returned by the Heartbeat only

Comment thread tests/0120-asymmetric_subscription.c
Comment thread tests/0120-asymmetric_subscription.c
Comment on lines +317 to +321
if (test_consumer_group_protocol_classic()) {
ApiKey = RD_KAFKAP_Heartbeat;
} else {
ApiKey = RD_KAFKAP_ConsumerGroupHeartbeat;
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be opposite condition. if condition should be for consumer as that is the new one.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've always used always test_consumer_group_protocol_classic(), even if implementing the other function (that I can remove) so it's consistent for search, also it isolates the parts that we want to remove in a distant future.
Also if there was a third protocol, we hope not, that should start from the assumptions and tests of the consumer protocol, not with those of the classic protocol.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be using test_consumer_group_protocol_consumer at all the places instead of test_consumer_group_protocol_classic. Reason behind this is:

  1. Let's say we add a new protocol in future. We want all the tests to run. We are making sure of that by using !test_consumer_group_protocol_consumer() here instead.
  2. In !test_consumer_group_protocol_classic() condition, we will skip those tests with the new protocol added.

Copy link
Copy Markdown
Contributor Author

@emasab Emanuele Sabellico (emasab) Nov 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, in this case we're running the RD_KAFKAP_ConsumerGroupHeartbeat test that is more appropriate than running the classic one, if we use test_consumer_group_protocol_consumer ... else in case of a new protocol we run the classic tests, while running the newer ones would be more appropriate.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am talking about all the places. Not just here. Basically the idea is to make sure that we run all the test cases when we add new protocols. Let's ensure that when there is a new protocol, all the tests are executed.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, all tests about the new protocol, the tests about the classic protocol don't have to be executed as they won't be applicable. Probably the tests about the 848 protocol won't be applicable either but it's more probable it'll be similar to 848 in case it happens.

Copy link
Copy Markdown
Member

@pranavrth Pranav Rathi (pranavrth) left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some more comments

Comment thread tests/0016-client_swname.c Outdated
Comment thread tests/0029-assign_offset.c Outdated
Comment thread tests/java/Makefile
Comment thread tests/test.c Outdated
Comment thread tests/test.c
Comment thread tests/trivup/trivup-0.12.6.tar.gz Outdated
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should release new trivup version. Lets check with Magnus.

Comment thread tests/testcpp.cpp Outdated
Comment thread tests/0118-commit_rebalance.c
Comment thread tests/0118-commit_rebalance.c
@confluent-cla-assistant
Copy link
Copy Markdown

🎉 All Contributor License Agreements have been signed. Ready to merge.
Please push an empty commit if you would like to re-run the checks to verify CLA status for all contributors.

Comment thread src/rdkafka_mock_int.h
Comment thread src/rdkafka_mock_cgrp.c Outdated
Comment on lines +2869 to +2876
} else {
rd_kafka_mock_cgrp_consumer_member_leave(
mcgrp, member);
member = NULL;
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the static membership case (-2), the member can rejoin. We shouldn't destroy the member. You can add a FIXME for now here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've implemented it and added some tests for static group membership with the mock cluster

Comment thread src/rdkafka_mock_cgrp.c
Comment thread tests/0120-asymmetric_subscription.c
Comment thread tests/0147-consumer_group_consumer_mock.c Outdated
Comment thread .semaphore/semaphore.yml
- make -j -C tests build
- make -C tests run_local_quick
- DESTDIR="$PWD/dest" make install
- export TEST_KAFKA_VERSION=4.0.0
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this being used? 4.0.0 is not released yet.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used in tests to check if a test can run or not, if for example it's >= a specific version, given we're using master I used the next planned release here, but could have used 9.9.9 too

Comment thread src/rdkafka.h
- Mock handler implementation
- Rename current consumer protocol from generic to classic
- Mock handler with automatic or manual assignment
- More consumer group metadata getters
- Test helpers
- Expedite next HB after FindCoordinator
  doing it with an exponential backoff to avoid tight loops
- Configurable session timeout and HB interval
- Fix mock handler ListOffsets response
  LeaderEpoch instead of CurrentLeaderEpoch
- Integration tests passing with AK trunk
- Improve documentation and KIP 848 specific mock tests
- Add mock tests for unknown topic id
  in metadata request and partial reconciliation
- Make test 0147 more reliable
- Fix test 0106 after HB timeout change
- Exclude test case with AK trunk
- Rename rd_kafka_buf_write_tags to
  rd_kafka_buf_write_tags_empty
- Trivup 0.12.5 can run a KafkaCluster
   directly with KRaft and AK trunk
- Trivup 0.12.6 build with a specific commit
Address comment about using rd_kafka_buf_read_topic_partitions
Asserts on non-nullable fields
Copy link
Copy Markdown
Member

@pranavrth Pranav Rathi (pranavrth) left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!.

Copy link
Copy Markdown
Member

@pranavrth Pranav Rathi (pranavrth) left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants