KAFKA-15370: Support Participation in 2PC (KIP-939) (1/N)#17687
KAFKA-15370: Support Participation in 2PC (KIP-939) (1/N)#17687jolshan merged 7 commits intoapache:trunkfrom
Conversation
This is just a mechanical change to make prepareTransitionTo method use named parameters instead of positional parameters.
| updateTimestamp: Long, | ||
| clientTransactionVersion: TransactionVersion): TxnTransitMetadata = { | ||
| private def prepareTransitionTo(state: TransactionState, | ||
| producerId: Long = this.producerId, |
There was a problem hiding this comment.
There was a previous effort to remove default arguments in order to not accidentally set something incorrect. I think the defaults make sense (keeping the same) so I think it makes sense, but wanted to mention this change.
There was a problem hiding this comment.
The defaults just mean that there is no change, so if the caller doesn't use an argument, then it remains unchanged for the transition. I think it makes code easier to read than to require every parameter to be specified: the changed stuff is easy to see from just glancing at the function call, no need to mentally parse which arguments are updated and which stay the same.
There was a problem hiding this comment.
I agree with these points. Was just sharing some historical context on this (and other methods). I think it is valid to use here.
| topicPartitions: immutable.Set[TopicPartition] = this.topicPartitions.toSet, | ||
| txnStartTimestamp: Long = this.txnStartTimestamp, | ||
| txnLastUpdateTimestamp: Long = this.txnLastUpdateTimestamp, | ||
| clientTransactionVersion: TransactionVersion = this.clientTransactionVersion): TxnTransitMetadata = { |
There was a problem hiding this comment.
Is the idea to keep clientTransactionVersion the same until each PrepareCommit/PrepareAbortCall? Or will KIP-939 introduce changes to set this in another transactional state.
There was a problem hiding this comment.
Good question. I don't think KIP-939 would rely on persisted value of client version, we should be able to just look at the values of the fields (nextProducerId, nextProducerEpoch) and figure out what logic needs to be done.
But it's a good point that this change actually makes the clientVersion "sticky" in the sense that once it's set it'll keep the value during the transitions. Maybe to keep this change completely mechanical, transition to ongoing or empty should reset the version to TV_0.
There was a problem hiding this comment.
Yeah, Things may also change with the change to add partitions implicitly (as we will have the client version there)
There was a problem hiding this comment.
Adding a summary of offline conversations for visibility:
- we'll keep the existing client version in the state unless it needs to be changed
- we'll update the
completeTransictionToto always copy all fields to eliminate a second decision point for which fields are toing to be update; having a second decision point is confusing and can lead to bugs
|
Thanks for the PR, makes things much more clear and readable! |
Update completeTransitionTo to fully take the state from transitMetadata, this eliminates a redundant decision point.
Fix unit tests to reflect changed logic (always copy transition data)
| assertEquals(producerEpoch + 1, txnMetadata.producerEpoch) | ||
| assertEquals(producerEpoch, txnMetadata.lastProducerEpoch) | ||
| assertEquals(1L, txnMetadata.txnStartTimestamp) | ||
| assertEquals(-1L, txnMetadata.txnStartTimestamp) |
There was a problem hiding this comment.
This logic is changed (now we unconditionally copy data from the transitMetadata when we complete transition, so original value of 1 is not preserved), so the unit test is changed to codify the new logic. Looking at the history, it doesn't seem like there was a significance in the logic, it's just the UT codified some behavior, and now it codifies a different behavior.
| assertEquals(producerEpoch, txnMetadata.lastProducerEpoch) | ||
| assertEquals(0, txnMetadata.producerEpoch) | ||
| assertEquals(1L, txnMetadata.txnStartTimestamp) | ||
| assertEquals(-1L, txnMetadata.txnStartTimestamp) |
There was a problem hiding this comment.
Unconditionally copy data from the transitMetadata when we complete transition.
| state = Ongoing, | ||
| topicPartitions = (topicPartitions ++ addedTopicPartitions), | ||
| txnStartTimestamp = newTxnStartTimestamp, | ||
| txnLastUpdateTimestamp = updateTimestamp |
There was a problem hiding this comment.
should we include clientTransactionVersion here since it is now passed to this method too?
Address review feedback.
…e-old-protocol-versions * apache-github/trunk: KAFKA-18334: Produce v4-v6 should be undeprecated (apache#18288) KAFKA-13722: code cleanup after deprecated StateStore.init() was removed (apache#18249) KAFKA-15370: Support Participation in 2PC (KIP-939) (1/N) (apache#17687)
This is just a mechanical change to make prepareTransitionTo method use named parameters instead of positional parameters. Reviewers: Justine Olshan <[email protected]>, Ritika Reddy <[email protected]>
This is just a mechanical change to make prepareTransitionTo method use named parameters instead of positional parameters.