KAFKA-16726: Added share.auto.offset.reset dynamic config for share groups#17573
KAFKA-16726: Added share.auto.offset.reset dynamic config for share groups#17573omkreddy merged 18 commits intoapache:trunkfrom
Conversation
…roups in GroupConfig
….auto.offset.reset changes and pass
apoorvmittal10
left a comment
There was a problem hiding this comment.
Thanks for the PR, left some comments.
| return ShareGroupAutoOffsetReset.forStrategy(shareAutoOffsetReset); | ||
| } | ||
|
|
||
| public enum ShareGroupAutoOffsetReset { |
There was a problem hiding this comment.
Why not to use existing OffsetResetStrategy?
There was a problem hiding this comment.
I would keep them separate so we can evolve them separately.
There was a problem hiding this comment.
Thanks for the review. It's better to use OffsetResetStrategy, so I'll remove the newly added enum.
There was a problem hiding this comment.
It's better to use OffsetResetStrategy, so I'll remove the newly added enum.
Did you miss it or changed mind?
What's the final decision, are we keeping it? I would prefer to use OffsetResetStrategy as don't see any difference to one defined as new. If we need to have an independent one, in future, because of some more states then create a new one. Else it's unnecessary.
There was a problem hiding this comment.
I expect that OffsetResetStrategy will evolve independently and I prefer to separate them now.
There was a problem hiding this comment.
Had a word with @AndrewJSchofield. Sure, let's have separate one.
@chirag-wadhwa5 When we define just earliest and latest in this enum then do we need the explicit validations we have specified for this config?
There was a problem hiding this comment.
I don't think we would require any explicit validations on top of it. I have removed the unnecessary validations now.
| return ShareGroupAutoOffsetReset.forStrategy(shareAutoOffsetReset); | ||
| } | ||
|
|
||
| public enum ShareGroupAutoOffsetReset { |
There was a problem hiding this comment.
I would keep them separate so we can evolve them separately.
apoorvmittal10
left a comment
There was a problem hiding this comment.
Some comments to address.
| Option<FileRecords.TimestampAndOffset> timestampAndOffset = replicaManager.fetchOffsetForTimestamp( | ||
| topicIdPartition.topicPartition(), ListOffsetsRequest.LATEST_TIMESTAMP, new Some<>(IsolationLevel.READ_UNCOMMITTED), | ||
| Optional.empty(), true).timestampAndOffsetOpt(); | ||
| return timestampAndOffset.isEmpty() ? (long) 0 : timestampAndOffset.get().offset; |
There was a problem hiding this comment.
Is it safe to return 0 from this method when timestampAndOffset is empty?
There was a problem hiding this comment.
Thanks for the review. We could instead throw an error saying the latest offset not found, or maybe maybe initialize to the earliest offset, in case the latest is not found. What would you suggest here @apoorvmittal10 ?
There was a problem hiding this comment.
If we send 0 when timestamp is empty but if first fetch offset in log is at 10 then will the fetch throw an exception? If yes, then what will happen next? And in waht cases the the timestamp can be empty from replica manager?
There was a problem hiding this comment.
I think there is the possibility of an exception in the slightly unusual situation of a new leader which is lagging behind the current epoch. While I'm sure we're not going to bump into that right now, it is a real situation in the replica manager I think and the interface here needs to be flexible enough for an exception and we need to have a plan for what to do if the exception occurs. @mumrah can you help here please? I guess we'd want to do some kind of retry, which is perhaps best achieved by aborting the share-partition initialisation and letting the next attempt to initialize the share-partition have another try. The edges of the replication behaviour are a bit challenging I think.
There was a problem hiding this comment.
Hi @apoorvmittal10 , when replicaManager.fetchOffsetForTimestamp is called with LATEST_TIMESTAMP, as per the current code, only 2 things are possible, either it will throw an error or it will return a populated response (not a None object, which is possible when the same method is called with EARLIEST_TIMESTAMP). Now the error handling is there for when an error is thrown, but apart from that I don't suppose we need any other checks for whether the offset value is populated or not. But, if we want to have this check in place, I don't think we can assume any other value in place of this. The only right way would be to fail the share partition initialization. Another point to note is when offsetForEarliestTimestamp is called with EARLIEST_TIMESTAMP, the current code would return 0 if the returned value from replicaManager.fetchOffsetForTimestamp is None, which again would be a wrong assumption in case the start offset of the log is beyond 0. I believe the correct way is to fail the share partition initialization here as well.
adixitconfluent
left a comment
There was a problem hiding this comment.
Did a partial review. Left some comments.
apoorvmittal10
left a comment
There was a problem hiding this comment.
Thanks for the changes. Some comments for clarification and changes. Mostly LGTM!
| return ShareGroupAutoOffsetReset.forStrategy(shareAutoOffsetReset); | ||
| } | ||
|
|
||
| public enum ShareGroupAutoOffsetReset { |
There was a problem hiding this comment.
It's better to use OffsetResetStrategy, so I'll remove the newly added enum.
Did you miss it or changed mind?
What's the final decision, are we keeping it? I would prefer to use OffsetResetStrategy as don't see any difference to one defined as new. If we need to have an independent one, in future, because of some more states then create a new one. Else it's unnecessary.
| @@ -128,4 +130,17 @@ static long offsetForEarliestTimestamp(TopicIdPartition topicIdPartition, Replic | |||
| Optional.empty(), true).timestampAndOffsetOpt(); | |||
There was a problem hiding this comment.
Unfortunately, I think this is more complicated that it seems. If KIP-405 is enabled, then returning EARLIEST_TIMESTAMP is asynchronous. It may be necessary to wait before there's an answer to the earliest offset. If we put that concern aside for this PR, we do need an issue to track this.
| Option<FileRecords.TimestampAndOffset> timestampAndOffset = replicaManager.fetchOffsetForTimestamp( | ||
| topicIdPartition.topicPartition(), ListOffsetsRequest.LATEST_TIMESTAMP, new Some<>(IsolationLevel.READ_UNCOMMITTED), | ||
| Optional.empty(), true).timestampAndOffsetOpt(); | ||
| return timestampAndOffset.isEmpty() ? (long) 0 : timestampAndOffset.get().offset; |
There was a problem hiding this comment.
I think there is the possibility of an exception in the slightly unusual situation of a new leader which is lagging behind the current epoch. While I'm sure we're not going to bump into that right now, it is a real situation in the replica manager I think and the interface here needs to be flexible enough for an exception and we need to have a plan for what to do if the exception occurs. @mumrah can you help here please? I guess we'd want to do some kind of retry, which is perhaps best achieved by aborting the share-partition initialisation and letting the next attempt to initialize the share-partition have another try. The edges of the replication behaviour are a bit challenging I think.
| return ShareGroupAutoOffsetReset.forStrategy(shareAutoOffsetReset); | ||
| } | ||
|
|
||
| public enum ShareGroupAutoOffsetReset { |
There was a problem hiding this comment.
I expect that OffsetResetStrategy will evolve independently and I prefer to separate them now.
…FetchAcknowledgeRequestTest
apoorvmittal10
left a comment
There was a problem hiding this comment.
LGTM, just I am not understanding the change with re-ordered tests and an open query on returning 0 with offsetForTimestamp method.
| val send: Seq[TopicIdPartition] = Seq(topicIdPartition) | ||
|
|
||
| // Send the share fetch request to fetch the records produced above | ||
| val metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH) | ||
| val acknowledgementsMap: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty | ||
| val shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap) | ||
| val shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest) | ||
| // Send the second share fetch request to fetch the records produced above | ||
| metadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)) | ||
| acknowledgementsMap = Map.empty | ||
| shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap) | ||
| shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest) |
There was a problem hiding this comment.
Cant we define the offset reset startegy while creating KafkaShareConsumer?
…fsetForLatestTimestamp in ShareFetchUtils
| @ParameterizedTest(name = "{displayName}.persister={0}") | ||
| @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) | ||
| public void testShareAutoOffsetResetMultipleGroupsWithDifferentValue(String persister) { | ||
| KafkaShareConsumer<byte[], byte[]> shareConsumer1 = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); |
There was a problem hiding this comment.
How about shareConsumerEarliest and shareConsumerLatest? Much more descriptive than 1 or 2.
| } | ||
|
|
||
| public enum ShareGroupAutoOffsetReset { | ||
| LATEST, EARLIEST, NONE; |
There was a problem hiding this comment.
This doesn't really have the value "NONE" does it? The KIP mentions only LATEST and EARLIEST I think.
apoorvmittal10
left a comment
There was a problem hiding this comment.
LGTM, agree with Andrew's comments.
adixitconfluent
left a comment
There was a problem hiding this comment.
Thanks for the PR. LGTM.
| val send: Seq[TopicIdPartition] = Seq(topicIdPartition) | ||
|
|
||
| // Send the first share fetch request to initialize the share partition | ||
| var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH) |
There was a problem hiding this comment.
looks like some of these lines are repeating, is it possible to extract a function and reuse the same.
omkreddy
left a comment
There was a problem hiding this comment.
@chirag-wadhwa5 Thanks for the PR. LGTM
…roups (apache#17573) This PR adds another dynamic config share.auto.offset.reset fir share groups. Reviewers: Andrew Schofield <[email protected]>, Apoorv Mittal <[email protected]>, Abhinav Dixit <[email protected]>, Manikumar Reddy <[email protected]>
…roups (apache#17573) This PR adds another dynamic config share.auto.offset.reset fir share groups. Reviewers: Andrew Schofield <[email protected]>, Apoorv Mittal <[email protected]>, Abhinav Dixit <[email protected]>, Manikumar Reddy <[email protected]>
This PR adds another dynamic config share.auto.offset.reset fir share groups.
Reference: KAFKA-16726