Skip to content

KAFKA-17945, KAFKA-17944: Handling leader epoch error for offset reset APIs#17979

Merged
AndrewJSchofield merged 6 commits intoapache:trunkfrom
apoorvmittal10:KAFKA-17945
Dec 6, 2024
Merged

KAFKA-17945, KAFKA-17944: Handling leader epoch error for offset reset APIs#17979
AndrewJSchofield merged 6 commits intoapache:trunkfrom
apoorvmittal10:KAFKA-17945

Conversation

@apoorvmittal10
Copy link
Copy Markdown
Contributor

The PR sends known leader epoch while fetch offset information. If API throws exception then PR adds way to handle the exceptions and surface same to make decision for handling share partition itself.

The test for handling exception for share partition already exists in SharePartitionTest. For other changes, tests have been added in the PR.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@github-actions github-actions Bot added core Kafka Broker KIP-932 Queues for Kafka labels Nov 28, 2024
Copy link
Copy Markdown
Contributor

@adixitconfluent adixitconfluent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. Left a couple of minor comments.

sharePartition.updateCacheAndOffsets(offsetForEarliestTimestamp(topicIdPartition, replicaManager));
try {
long startOffset = offsetForEarliestTimestamp(topicIdPartition, replicaManager, sharePartition.leaderEpoch());
sharePartition.updateCacheAndOffsets(startOffset);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: line 90 and 91 can be merged into a single line

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

/**
* The method is used to handle the share partition exception.
* The method returns a BiConsumer that handles share partition exceptions. The BiConsumer accepts
* a share partition key and a throwable and handles the exception.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MINOR: second "and" is extra

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apoorvmittal10 : Thanks for the PR. Just a minor comment.

SharePartition sharePartition = partitionCacheMap.remove(sharePartitionKey);
if (sharePartition != null) {
sharePartition.markFenced();
public BiConsumer<SharePartitionKey, Throwable> handleFencedSharePartitionException() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handleFencedSharePartitionException => fencedSharePartitionHandler ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, agree that methof name should be corrected now. Done.

Copy link
Copy Markdown
Contributor

@adixitconfluent adixitconfluent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. LGTM.

Copy link
Copy Markdown
Member

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. Just one tiny nit. The removal of the SharePartitionManager reference from the DelayedShareFetch is nice.

Optional.empty(), true).timestampAndOffsetOpt();
Optional.of(leaderEpoch), true).timestampAndOffsetOpt();
if (timestampAndOffset.isEmpty()) {
throw new OffsetNotAvailableException("offset for Earliest timestamp not found for topic partition: " + topicIdPartition);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Capitalisation of the message is a bit odd. Why not "Offset for earliest timestamp"? Similar command for the latest case too.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah correct, something which this Pr inherited. Done.

Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apoorvmittal10 : Thanks for the updated PR. LGTM. I will let @AndrewJSchofield merge the PR when he is satisfied.

@AndrewJSchofield
Copy link
Copy Markdown
Member

@apoorvmittal10 : Thanks for the updated PR. LGTM. I will let @AndrewJSchofield merge the PR when he is satisfied.

Certainly. I don't have write access yet while the ASF wheels are turning, but I expect I will be able to merge in a day or two.

Copy link
Copy Markdown
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apoorvmittal10 thanks for this patch. overall LGTM.

assertTrue(resultData.isEmpty());
Mockito.verify(shareFetch, times(1)).addErroneous(tp0, exception);
Mockito.verify(exceptionHandler, times(1)).accept(any(SharePartitionKey.class), any(Throwable.class));
Mockito.verify(sp0, times(0)).updateCacheAndOffsets(1L);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using any(Long.class) instead of 1L is more appropriate since we expect that updateCacheAndOffsets should not be called, regardless of the input.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, done.

Throwable exception = new FencedLeaderEpochException("Fenced exception");
doThrow(exception).when(replicaManager).fetchOffsetForTimestamp(any(TopicPartition.class), anyLong(), any(), any(), anyBoolean());
when(sp0.acquire(anyString(), anyInt(), any(FetchPartitionData.class))).thenReturn(ShareAcquiredRecords.empty());
doNothing().when(sp0).updateCacheAndOffsets(any(Long.class));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since sp0 is a mock object, there's no need to stub updateCacheAndOffsets as it already does nothing.

Copy link
Copy Markdown
Contributor Author

@apoorvmittal10 apoorvmittal10 Dec 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Removed from other places as well.


assertTrue(resultData.isEmpty());
Mockito.verify(shareFetch, times(1)).addErroneous(tp0, exception);
Mockito.verify(exceptionHandler, times(1)).accept(any(SharePartitionKey.class), any(Throwable.class));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please replace any(SharePartitionKey.class), any(Throwable.class) by new SharePartitionKey("grp", tp0), exception

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

log.error("Error while fetching offset for earliest timestamp for topicIdPartition: {}", topicIdPartition, e);
shareFetch.addErroneous(topicIdPartition, e);
exceptionHandler.accept(new SharePartitionKey(shareFetch.groupId(), topicIdPartition), e);
// Do not fill the response for this partition and continue.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excuse me, could you explain the difference between returning empty records and skipping partition data in the response?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So here as the topicPartition is added in errorneous so no need to fill it in response to DelayedShareFetch. When final response from SharePartitionManager will be computed then it merge the erroneous and resultant ones.

@apoorvmittal10
Copy link
Copy Markdown
Contributor Author

@AndrewJSchofield @chia7712 Please if you could re-check.

@AndrewJSchofield AndrewJSchofield merged commit ccca9f1 into apache:trunk Dec 6, 2024
tedyu pushed a commit to tedyu/kafka that referenced this pull request Jan 6, 2025
…t APIs (apache#17979)

The PR sends known leader epoch while fetch offset information. If API throws exception then PR adds way to handle the exceptions and surface same to make decision for handling share partition itself.

Reviewers: Abhinav Dixit <[email protected]>, Andrew Schofield <[email protected]>, Chia-Ping Tsai <[email protected]>, Jun Rao <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Kafka Broker KIP-932 Queues for Kafka

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants