-
Notifications
You must be signed in to change notification settings - Fork 14.9k
KAFKA-15003: When partitions are partition assignments change we do n… #13735
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…ot update TopicIdReplicaAssignment during migration (DUAL_WRITE). Fixed the metadata change events in the Migration component to check correctly for the diff in existing topic changes and replicate the metadata to the Zookeeper. Also, made the diff check exhaustive enough to handle the partial writes in Zookeeper when we're try to replicate changes using a snapshot in the event of Controller failover. Add migration client and integration tests to verify the change.
mumrah
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a great catch @akhileshchg! Looks like we had some gaps in topic/partition metadata even beyond the snapshot handling.
Left some comments inline
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java
Outdated
Show resolved
Hide resolved
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java
Outdated
Show resolved
Hide resolved
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java
Outdated
Show resolved
Hide resolved
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java
Outdated
Show resolved
Hide resolved
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java
Outdated
Show resolved
Hide resolved
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java
Show resolved
Hide resolved
| newPartitions.put(topicId, newTopicPartitions); | ||
| changedTopics.add(topicId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's possible that we can see a difference between topicPartitionsInZk (computed based on the partition znodes) and the topic assignment in Kraft, but actually have a correct assignment in ZK (stored in the topic znode). That said, I don't think there's any harm in re-writing the kraft assignment to ZK.
| val (migrationZkVersion, responses) = zkClient.retryMigrationRequestsUntilConnected(requests, state) | ||
| val resultCodes = responses.map { response => response.path -> response.resultCode }.toMap | ||
| if (resultCodes(TopicZNode.path(topicName)).equals(Code.NODEEXISTS)) { | ||
| // topic already created, just return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this read "partition already created"?
|
Do we handle the case where # of partitions decreased? How do we handle changes to the partition znodes? Looks like we catch EEXIST in a bunch of cases, do we follow up to see that it has the right stuff in there? |
|
You can’t shrink number of partitions right? Except when the topic is
completely deleted? I think we handle that case.
On Wed, May 31, 2023 at 11:46 AM Colin Patrick McCabe < ***@***.***> wrote:
Do we handle the case where # of partitions decreased?
How do we handle changes to the partition znodes? Looks like we catch
EEXIST in a bunch of cases, do we follow up to see that it has the right
stuff in there?
—
Reply to this email directly, view it on GitHub
<#13735 (comment)>, or
unsubscribe
<https://github.com/notifications/unsubscribe-auth/AAPTMROW33NT2Q4GQ3RGOS3XI6GZNANCNFSM6AAAAAAYIB7K4U>
.
You are receiving this because you were mentioned.Message ID:
***@***.***>
--
Sent from my phone
|
|
@akhileshchg there's a case where a topic is deleted and re-created with fewer partitions. Since ZK indexes topics by name, we can't readily see if the deletion happened when processing a KRaft snapshot. I added logic to handle the case of unexpected partitions in ZK. |
cmccabe
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
| return builder.toString(); | ||
| } | ||
|
|
||
| public boolean hasSameAssignment(PartitionRegistration registration) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be equals. I'll fix it
Fixed the metadata change events in the Migration component to check correctly for the diff in existing topic changes and replicate the metadata to the Zookeeper. Also, made the diff check exhaustive enough to handle the partial writes in Zookeeper when we're try to replicate changes using a snapshot in the event of Controller failover. Add migration client and integration tests to verify the change. Co-authored-by: Akhilesh Chaganti <[email protected]>
Fixed the metadata change events in the Migration component to check correctly for the diff in existing topic changes and replicate the metadata to the Zookeeper. Also, made the diff check exhaustive enough to handle the partial writes in Zookeeper when we're try to replicate changes using a snapshot in the event of Controller failover. Add migration client and integration tests to verify the change. Co-authored-by: Akhilesh Chaganti <[email protected]>
|
committed |
…tream-3.5 * commit 'c2f6f29ca6e1306ac77ec726bac4cd09bd1aa80b': (76 commits) KAFKA-15019: Improve handling of broker heartbeat timeouts (apache#13759) KAFKA-15003: Fix ZK sync logic for partition assignments (apache#13735) MINOR: Add 3.5 upgrade steps for ZK and KRaft (apache#13792) KAFKA-15010 ZK migration failover support (apache#13758) KAFKA-15017 Fix snapshot load in dual write mode for ClientQuotas and SCRAM (apache#13757) MINOR: Update LICENSE-binary following snappy upgrade (apache#13791) Upgrade to snappy v1.1.10.0 (apache#13786) KAFKA-15004: Fix configuration dual-write during migration (apache#13767) KAFKA-8713: JsonConverter replace.null.with.default should prevent emitting default for Struct fields (apache#13781) KAFKA-14996: Handle overly large user operations on the kcontroller (apache#13742) ...
…ot update TopicIdReplicaAssignment during migration (DUAL_WRITE).
Fixed the metadata change events in the Migration component to check correctly for the diff in existing topic changes and replicate the metadata to the Zookeeper. Also, made the diff check exhaustive enough to handle the partial writes in Zookeeper when we're try to replicate changes using a snapshot in the event of Controller failover.
Add migration client and integration tests to verify the change.