-
Notifications
You must be signed in to change notification settings - Fork 14.9k
KAFKA-15010 ZK migration failover support #13758
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…into KAKFA-15010-migration-failover
| Map<String, Double> quotaMap = clientQuotasImage.entities().get(entity).quotaMap(); | ||
| Map<String, Double> quotaMap = getClientQuotaMapForEntity(clientQuotasImage, entity); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was an NPE here prior to this patch. If the SCRAM credentials changed, but ClientQuotas did not, the get(entity) call would return a null.
| new util.HashSet[Integer](zkClient.getSortedBrokerList.map(Integer.valueOf).toSet.asJava) | ||
| } | ||
|
|
||
| override def readProducerId(): util.Optional[java.lang.Long] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't seem to be returning nextProducerId, it's returning the current producer id.
Why not just have writeProducerId take a ProducerIdsBlock object so we don't have to do a bunch of dubious translation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh right, in KRaft when we give out a new block, we persist the next ID in the log.
| applyMigrationOperation("Recovering migration state from ZK", zkMigrationClient::getOrCreateMigrationRecoveryState); | ||
| String maybeDone = migrationLeadershipState.zkMigrationComplete() ? "done" : "not done"; | ||
| log.info("Recovered migration state {}. ZK migration is {}.", migrationLeadershipState, maybeDone); | ||
| log.info("ZK migration is {}.", maybeDone); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd really prefer to say something like "Initial ZK load is done" / "Initial ZK load is not done"
We should also change ZkMigrationLeadershipState.zkMigrationComplete -> ZkMigrationLeadershipState.initialZkLoadComplete
After all, the whole process here is technically "ZK migration" not just the initial load
| offsetAndEpochAfterMigration.epoch()); | ||
| applyMigrationOperation("Finished migrating ZK data", state -> zkMigrationClient.setMigrationRecoveryState(newState)); | ||
| transitionTo(MigrationDriverState.KRAFT_CONTROLLER_TO_BROKER_COMM); | ||
| applyMigrationOperation("Finished migrating ZK data to KRaft", state -> zkMigrationClient.setMigrationRecoveryState(newState)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add a comment here about how we always go through the sync kraft -> zk state here, even immediately after we've loaded from zk.
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
Show resolved
Hide resolved
cmccabe
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM once comments are addressed (produceID comment is the most urgent)
This patch adds snapshot reconciliation during ZK to KRaft migration. This reconciliation happens whenever a snapshot is loaded by KRaft, or during a controller failover. Prior to this patch, it was possible to miss metadata updates coming from KRaft when dual-writing to ZK. Internally this adds a new state SYNC_KRAFT_TO_ZK to the KRaftMigrationDriver state machine. The controller passes through this state after the initial ZK migration and each time a controller becomes active. Logging during dual-write was enhanced to include a count of write operations happening. Reviewers: Colin P. McCabe <[email protected]>
…tream-3.5 * commit 'c2f6f29ca6e1306ac77ec726bac4cd09bd1aa80b': (76 commits) KAFKA-15019: Improve handling of broker heartbeat timeouts (apache#13759) KAFKA-15003: Fix ZK sync logic for partition assignments (apache#13735) MINOR: Add 3.5 upgrade steps for ZK and KRaft (apache#13792) KAFKA-15010 ZK migration failover support (apache#13758) KAFKA-15017 Fix snapshot load in dual write mode for ClientQuotas and SCRAM (apache#13757) MINOR: Update LICENSE-binary following snappy upgrade (apache#13791) Upgrade to snappy v1.1.10.0 (apache#13786) KAFKA-15004: Fix configuration dual-write during migration (apache#13767) KAFKA-8713: JsonConverter replace.null.with.default should prevent emitting default for Struct fields (apache#13781) KAFKA-14996: Handle overly large user operations on the kcontroller (apache#13742) ...
Previously, if the KRaft controller failed over while metadata changes were pending in the KRaftMigrationDriver queue, the writes to ZK would be lost. This patch adds snapshot reconciliation so that the controller can ensure a consistent state with ZK after a failover.
Internally this adds a new state
SYNC_KRAFT_TO_ZKto the KRaftMigrationDriver state machine. The controller passes through this state after the initial migration and each time a controller becomes active thereafter.