Skip to content

Conversation

@mumrah
Copy link
Member

@mumrah mumrah commented May 24, 2023

Previously, if the KRaft controller failed over while metadata changes were pending in the KRaftMigrationDriver queue, the writes to ZK would be lost. This patch adds snapshot reconciliation so that the controller can ensure a consistent state with ZK after a failover.

Internally this adds a new state SYNC_KRAFT_TO_ZK to the KRaftMigrationDriver state machine. The controller passes through this state after the initial migration and each time a controller becomes active thereafter.

Comment on lines 270 to 303
Map<String, Double> quotaMap = clientQuotasImage.entities().get(entity).quotaMap();
Map<String, Double> quotaMap = getClientQuotaMapForEntity(clientQuotasImage, entity);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was an NPE here prior to this patch. If the SCRAM credentials changed, but ClientQuotas did not, the get(entity) call would return a null.

@mumrah mumrah added the kraft label May 26, 2023
new util.HashSet[Integer](zkClient.getSortedBrokerList.map(Integer.valueOf).toSet.asJava)
}

override def readProducerId(): util.Optional[java.lang.Long] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem to be returning nextProducerId, it's returning the current producer id.

Why not just have writeProducerId take a ProducerIdsBlock object so we don't have to do a bunch of dubious translation?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh right, in KRaft when we give out a new block, we persist the next ID in the log.

applyMigrationOperation("Recovering migration state from ZK", zkMigrationClient::getOrCreateMigrationRecoveryState);
String maybeDone = migrationLeadershipState.zkMigrationComplete() ? "done" : "not done";
log.info("Recovered migration state {}. ZK migration is {}.", migrationLeadershipState, maybeDone);
log.info("ZK migration is {}.", maybeDone);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd really prefer to say something like "Initial ZK load is done" / "Initial ZK load is not done"

We should also change ZkMigrationLeadershipState.zkMigrationComplete -> ZkMigrationLeadershipState.initialZkLoadComplete

After all, the whole process here is technically "ZK migration" not just the initial load

offsetAndEpochAfterMigration.epoch());
applyMigrationOperation("Finished migrating ZK data", state -> zkMigrationClient.setMigrationRecoveryState(newState));
transitionTo(MigrationDriverState.KRAFT_CONTROLLER_TO_BROKER_COMM);
applyMigrationOperation("Finished migrating ZK data to KRaft", state -> zkMigrationClient.setMigrationRecoveryState(newState));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a comment here about how we always go through the sync kraft -> zk state here, even immediately after we've loaded from zk.

Copy link
Contributor

@cmccabe cmccabe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM once comments are addressed (produceID comment is the most urgent)

@mumrah mumrah merged commit d27ba5b into apache:trunk Jun 1, 2023
mumrah added a commit that referenced this pull request Jun 1, 2023
This patch adds snapshot reconciliation during ZK to KRaft migration. This reconciliation happens whenever a snapshot is loaded by KRaft, or during a controller failover. Prior to this patch, it was possible to miss metadata updates coming from KRaft when dual-writing to ZK.

Internally this adds a new state SYNC_KRAFT_TO_ZK to the KRaftMigrationDriver state machine. The controller passes through this state after the initial ZK migration and each time a controller becomes active.

Logging during dual-write was enhanced to include a count of write operations happening.

Reviewers: Colin P. McCabe <[email protected]>
emissionnebula added a commit to confluentinc/kafka that referenced this pull request Jun 16, 2023
…tream-3.5

* commit 'c2f6f29ca6e1306ac77ec726bac4cd09bd1aa80b': (76 commits)
  KAFKA-15019: Improve handling of broker heartbeat timeouts (apache#13759)
  KAFKA-15003: Fix ZK sync logic for partition assignments (apache#13735)
  MINOR: Add 3.5 upgrade steps for ZK and KRaft (apache#13792)
  KAFKA-15010 ZK migration failover support (apache#13758)
  KAFKA-15017 Fix snapshot load in dual write mode for ClientQuotas and SCRAM  (apache#13757)
  MINOR: Update LICENSE-binary following snappy upgrade (apache#13791)
  Upgrade to snappy v1.1.10.0 (apache#13786)
  KAFKA-15004: Fix configuration dual-write during migration (apache#13767)
  KAFKA-8713: JsonConverter replace.null.with.default should prevent emitting default for Struct fields (apache#13781)
  KAFKA-14996: Handle overly large user operations on the kcontroller (apache#13742)
  ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants