KAFKA-17945, KAFKA-17944: Handling leader epoch error for offset reset APIs#17979
KAFKA-17945, KAFKA-17944: Handling leader epoch error for offset reset APIs#17979AndrewJSchofield merged 6 commits intoapache:trunkfrom
Conversation
adixitconfluent
left a comment
There was a problem hiding this comment.
Thanks for the PR. Left a couple of minor comments.
| sharePartition.updateCacheAndOffsets(offsetForEarliestTimestamp(topicIdPartition, replicaManager)); | ||
| try { | ||
| long startOffset = offsetForEarliestTimestamp(topicIdPartition, replicaManager, sharePartition.leaderEpoch()); | ||
| sharePartition.updateCacheAndOffsets(startOffset); |
There was a problem hiding this comment.
nit: line 90 and 91 can be merged into a single line
| /** | ||
| * The method is used to handle the share partition exception. | ||
| * The method returns a BiConsumer that handles share partition exceptions. The BiConsumer accepts | ||
| * a share partition key and a throwable and handles the exception. |
There was a problem hiding this comment.
MINOR: second "and" is extra
junrao
left a comment
There was a problem hiding this comment.
@apoorvmittal10 : Thanks for the PR. Just a minor comment.
| SharePartition sharePartition = partitionCacheMap.remove(sharePartitionKey); | ||
| if (sharePartition != null) { | ||
| sharePartition.markFenced(); | ||
| public BiConsumer<SharePartitionKey, Throwable> handleFencedSharePartitionException() { |
There was a problem hiding this comment.
handleFencedSharePartitionException => fencedSharePartitionHandler ?
There was a problem hiding this comment.
Thanks, agree that methof name should be corrected now. Done.
adixitconfluent
left a comment
There was a problem hiding this comment.
Thanks for the PR. LGTM.
AndrewJSchofield
left a comment
There was a problem hiding this comment.
Thanks for the PR. Just one tiny nit. The removal of the SharePartitionManager reference from the DelayedShareFetch is nice.
| Optional.empty(), true).timestampAndOffsetOpt(); | ||
| Optional.of(leaderEpoch), true).timestampAndOffsetOpt(); | ||
| if (timestampAndOffset.isEmpty()) { | ||
| throw new OffsetNotAvailableException("offset for Earliest timestamp not found for topic partition: " + topicIdPartition); |
There was a problem hiding this comment.
nit: Capitalisation of the message is a bit odd. Why not "Offset for earliest timestamp"? Similar command for the latest case too.
There was a problem hiding this comment.
Yeah correct, something which this Pr inherited. Done.
junrao
left a comment
There was a problem hiding this comment.
@apoorvmittal10 : Thanks for the updated PR. LGTM. I will let @AndrewJSchofield merge the PR when he is satisfied.
Certainly. I don't have write access yet while the ASF wheels are turning, but I expect I will be able to merge in a day or two. |
chia7712
left a comment
There was a problem hiding this comment.
@apoorvmittal10 thanks for this patch. overall LGTM.
| assertTrue(resultData.isEmpty()); | ||
| Mockito.verify(shareFetch, times(1)).addErroneous(tp0, exception); | ||
| Mockito.verify(exceptionHandler, times(1)).accept(any(SharePartitionKey.class), any(Throwable.class)); | ||
| Mockito.verify(sp0, times(0)).updateCacheAndOffsets(1L); |
There was a problem hiding this comment.
Using any(Long.class) instead of 1L is more appropriate since we expect that updateCacheAndOffsets should not be called, regardless of the input.
There was a problem hiding this comment.
Good point, done.
| Throwable exception = new FencedLeaderEpochException("Fenced exception"); | ||
| doThrow(exception).when(replicaManager).fetchOffsetForTimestamp(any(TopicPartition.class), anyLong(), any(), any(), anyBoolean()); | ||
| when(sp0.acquire(anyString(), anyInt(), any(FetchPartitionData.class))).thenReturn(ShareAcquiredRecords.empty()); | ||
| doNothing().when(sp0).updateCacheAndOffsets(any(Long.class)); |
There was a problem hiding this comment.
Since sp0 is a mock object, there's no need to stub updateCacheAndOffsets as it already does nothing.
There was a problem hiding this comment.
Done. Removed from other places as well.
|
|
||
| assertTrue(resultData.isEmpty()); | ||
| Mockito.verify(shareFetch, times(1)).addErroneous(tp0, exception); | ||
| Mockito.verify(exceptionHandler, times(1)).accept(any(SharePartitionKey.class), any(Throwable.class)); |
There was a problem hiding this comment.
Could you please replace any(SharePartitionKey.class), any(Throwable.class) by new SharePartitionKey("grp", tp0), exception
| log.error("Error while fetching offset for earliest timestamp for topicIdPartition: {}", topicIdPartition, e); | ||
| shareFetch.addErroneous(topicIdPartition, e); | ||
| exceptionHandler.accept(new SharePartitionKey(shareFetch.groupId(), topicIdPartition), e); | ||
| // Do not fill the response for this partition and continue. |
There was a problem hiding this comment.
Excuse me, could you explain the difference between returning empty records and skipping partition data in the response?
There was a problem hiding this comment.
So here as the topicPartition is added in errorneous so no need to fill it in response to DelayedShareFetch. When final response from SharePartitionManager will be computed then it merge the erroneous and resultant ones.
|
@AndrewJSchofield @chia7712 Please if you could re-check. |
…t APIs (apache#17979) The PR sends known leader epoch while fetch offset information. If API throws exception then PR adds way to handle the exceptions and surface same to make decision for handling share partition itself. Reviewers: Abhinav Dixit <[email protected]>, Andrew Schofield <[email protected]>, Chia-Ping Tsai <[email protected]>, Jun Rao <[email protected]>
The PR sends known leader epoch while fetch offset information. If API throws exception then PR adds way to handle the exceptions and surface same to make decision for handling share partition itself.
The test for handling exception for share partition already exists in SharePartitionTest. For other changes, tests have been added in the PR.
Committer Checklist (excluded from commit message)