Add protocol/parsing changes [KIP-881] #4189
Merged
Milind L (milindl) merged 6 commits intomasterfrom May 16, 2023
Merged
Conversation
fcb8a73 to
cdb5340
Compare
aad6e2f to
b13bade
Compare
bf236c3 to
c4cb7c9
Compare
Contributor
Author
|
Rebased onto master. |
* Metadata and leader epoch refactor. store private metadata into a struct that contains the public one. * Remove rd_kafka_broker_id_rack_pair. Replaced by rd_kafka_metadata_internal_t * Style fix, documentation, remove internal accessors. * Remove internal struct from assignor cb. Add internal accessor function for that. * Use a define instead of a function call for rd_kafka_metadata_get_internal * Make macro parenthesized --------- Co-authored-by: Milind L <[email protected]>
Emanuele Sabellico (emasab)
approved these changes
May 16, 2023
Contributor
Emanuele Sabellico (emasab)
left a comment
There was a problem hiding this comment.
Looks good! Just check that documentation param.
| * | ||
| * @param rkbp if non-NULL, will be set to the broker object with | ||
| * refcount increased, or NULL on error. | ||
| * @param rack if non-NULL, it will set the rack of the broker object. |
Contributor
There was a problem hiding this comment.
Is this parameter documentation stale?
Contributor
Author
There was a problem hiding this comment.
Yes, it was, I've fixed it now. Merging after CI run.
Thanks!
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
There are two commits in this change:
Change the embedded MemberMetadata protocol, ref to ConsumerProtocolSubscription.json.
Add RackId (and GenerationId) to the message we are sending, and on receive, parse RackId (and GenerationId), storing it inside
rd_kafka_group_member_tso it can be accessed by the assignors.Adds a unit test for it additional to the code changes for checking serialization/deserialization.
Change the Metadata response parsing code - currently, the Rack for each broker is simply skipped over. This change stores it inside the METADATA_OP, and then extracts it to pass it to assignors.
This needs to happen through a new field in the op union, we can't change the publicly exposed metadata type without significant effort.
Another rejected approach: add a rack field to the rd_kafka_broker_t, and use the rd_kafka_broker_update, which dispatches an op on the broker thread, to update a broker's rack. However, this was rejected for two reasons, first, that the actual update happens on the broker thread, and the assignor runs on the main thread, so the latest broker rack might not be updated. Second, because broker id may change, but in the assignor, we use the latest broker id right from the metadata, so to get the rack, find_broker_by_nodeid() call would fail.
This can be merged at any time, this doesn't cause any breakages in the existing functionality.
This is based off KIP-320, because it is a prereq for KIP-881, and contains significant changes in similar areas of the code, so this prevents merge conflicts at a later stage.