KAFKA-18014: Add duration based offset reset option for ShareConsumer#18096
KAFKA-18014: Add duration based offset reset option for ShareConsumer#18096AndrewJSchofield merged 16 commits intoapache:trunkfrom
Conversation
…set offset options
…group config on auto reset offset cases
| import java.util.Objects; | ||
| import java.util.Optional; | ||
|
|
||
| public class ShareGroupAutoOffsetResetStrategy { |
There was a problem hiding this comment.
As mentioned in #17573 (comment), let each AutoOffsetResetStrategy evolve themselves
There was a problem hiding this comment.
Can we have javadocs please on the class.
There was a problem hiding this comment.
Moreover the purpose of having a separate enum as ShareGroupAutoOffsetReset was to just mention that ShareGroup do no support all offset reset strategy but the ones explicitly mentioned in the ShareGroupAutoOffsetReset. Which is the sub strategies from AutoOffsetResetStrategy.
This class seems same as AutoOffsetResetStartegy, why do we need a different one? If we do need one then do we need to copy the methods as it is or could just use ShareGroupAutoOffsetResetStrategy as a wrapper over AutoOffsetResetStrategy to differentiate with self evolving startegies of ShareGroup?
There was a problem hiding this comment.
I think we still need a separate ShareGroupAutoOffsetResetStrategy, because this allows us to separate the enum and validation logic. As for the wrapper part you mentioned, perhaps we could use the Delegation pattern to reuse AutoOffsetResetStrategy's logic. What do you think?
There was a problem hiding this comment.
I think we still need a separate ShareGroupAutoOffsetResetStrategy
Yes, that's fine.
perhaps we could use the Delegation pattern
That could be a good option.
There was a problem hiding this comment.
I wouldn't over-engineer this. It's a pretty trivial class and there's not much saved. Up to you in this case from my point of view.
There was a problem hiding this comment.
@apoorvmittal10 Would like to know your opinion on this.
|
Hi @omkreddy, PR is ready, PTAL. Thanks! |
AndrewJSchofield
left a comment
There was a problem hiding this comment.
Thanks for the PR. A few comments to address.
| Option<FileRecords.TimestampAndOffset> timestampAndOffset = replicaManager.fetchOffsetForTimestamp( | ||
| topicIdPartition.topicPartition(), timestampToSearch, Option.empty(), Optional.of(leaderEpoch), true).timestampAndOffsetOpt(); | ||
| if (timestampAndOffset.isEmpty()) { | ||
| throw new OffsetNotAvailableException("offset for timestamp to search: " + timestampToSearch + " not found for topic partition: " + topicIdPartition); |
There was a problem hiding this comment.
For consistency, I would adjust the capitalisation. I suggest "Offset for timestamp " + timestamptoSearch + " not found for topic partition" instead.
There was a problem hiding this comment.
Looking at the latest changes, this was not done as requested. Please can you change the wording.
| fromString(offsetStrategy); | ||
| } catch (Exception e) { | ||
| throw new ConfigException(name, value, "Invalid value `" + offsetStrategy + "` for configuration " + | ||
| name + ". The value must be either 'earliest', 'latest', 'none' or of the format 'by_duration:<PnDTnHnMn.nS.>'."); |
There was a problem hiding this comment.
none is not permitted for a share group.
There was a problem hiding this comment.
You still need to remove 'none' from this exception message because it is not a valid option here.
apoorvmittal10
left a comment
There was a problem hiding this comment.
Thanks for the PR. I ll have a second pass later today on PR.
| static long offsetForTimestamp(TopicIdPartition topicIdPartition, ReplicaManager replicaManager, long timestampToSearch, int leaderEpoch) { | ||
| Option<FileRecords.TimestampAndOffset> timestampAndOffset = replicaManager.fetchOffsetForTimestamp( | ||
| topicIdPartition.topicPartition(), timestampToSearch, Option.empty(), Optional.of(leaderEpoch), true).timestampAndOffsetOpt(); | ||
| if (timestampAndOffset.isEmpty()) { | ||
| throw new OffsetNotAvailableException("Offset for timestamp: " + timestampToSearch + " not found for topic partition: " + topicIdPartition); | ||
| } | ||
| return timestampAndOffset.get().offset; | ||
| } | ||
|
|
There was a problem hiding this comment.
Should we send the the isolation level as well instead of Option.empty()? Also we can have common method and send respective timestamps from: offsetForEarliestTimestamp, offsetForLatestTimestamp and offsetForTimestamp methods.
There was a problem hiding this comment.
I suggest READ_UNCOMMITTED isolation level is the best option here. When we add support for isolation levels in share groups, we can revisit. So far, share groups always use READ_UNCOMMITTED.
| import java.util.Objects; | ||
| import java.util.Optional; | ||
|
|
||
| public class ShareGroupAutoOffsetResetStrategy { |
There was a problem hiding this comment.
Can we have javadocs please on the class.
AndrewJSchofield
left a comment
There was a problem hiding this comment.
I think this PR is almost ready to merge. Just a couple of outstanding issues (isolation level, javadoc).
|
|
||
| if (offsetResetStrategy == GroupConfig.ShareGroupAutoOffsetReset.EARLIEST) | ||
| if (offsetResetStrategy.timestamp().isEmpty()) { | ||
| throw new Exception("The timestamp is not available for the share partition: " + topicIdPartition); |
There was a problem hiding this comment.
This will translate to UnknownServerError to the client. Do we want to return that or specific exception?
There was a problem hiding this comment.
I think I would change this to return NoOffsetForPartitionException(topicIdPartition.topicPartition()):
https://github.com/apache/kafka/pull/17972/files#diff-8a7549a9127f9c2eac443cfb35f0d7c1ee8e2884576badd3670a7f19460c59d2R293
There was a problem hiding this comment.
So, currently, this could only occur when BY_DURATION is used and duration somehow is not set. Would IllegalStateException be better to throw as there shouldn't be a case when offsetResetStrategy.timestamp() should ideally be empty?
There was a problem hiding this comment.
I think we can remove this exception here, because ShareGroupAutoOffsetResetStrategy already has the validation itself, so the .timestamp() must have value if its type is BY_DURATION. And we can add some comment to indicate this. Thoughts?
There was a problem hiding this comment.
I would say it would be good to have a check as the method returns Optional. If it can never be empty then the method should never return Optional.
|
Thanks for your review! address some of the comments in 3d40b6e, PTAL. Thanks! |
…pAutoOffsetResetStrategyTest
ebf9cac to
8f2ac44
Compare
apoorvmittal10
left a comment
There was a problem hiding this comment.
Thanks for addressing the comments. I have replied to the question and took another pass.
|
Refactored PTAL. Thanks! |
AndrewJSchofield
left a comment
There was a problem hiding this comment.
A few additional comments. I think is getting close to being ready to merge.
| Option<FileRecords.TimestampAndOffset> timestampAndOffset = replicaManager.fetchOffsetForTimestamp( | ||
| topicIdPartition.topicPartition(), timestampToSearch, Option.empty(), Optional.of(leaderEpoch), true).timestampAndOffsetOpt(); | ||
| if (timestampAndOffset.isEmpty()) { | ||
| throw new OffsetNotAvailableException("offset for timestamp to search: " + timestampToSearch + " not found for topic partition: " + topicIdPartition); |
There was a problem hiding this comment.
Looking at the latest changes, this was not done as requested. Please can you change the wording.
| import java.util.Objects; | ||
| import java.util.Optional; | ||
|
|
||
| public class ShareGroupAutoOffsetResetStrategy { |
There was a problem hiding this comment.
I wouldn't over-engineer this. It's a pretty trivial class and there's not much saved. Up to you in this case from my point of view.
| fromString(offsetStrategy); | ||
| } catch (Exception e) { | ||
| throw new ConfigException(name, value, "Invalid value `" + offsetStrategy + "` for configuration " + | ||
| name + ". The value must be either 'earliest', 'latest', 'none' or of the format 'by_duration:<PnDTnHnMn.nS.>'."); |
There was a problem hiding this comment.
You still need to remove 'none' from this exception message because it is not a valid option here.
| Option<FileRecords.TimestampAndOffset> timestampAndOffset = replicaManager.fetchOffsetForTimestamp( | ||
| topicIdPartition.topicPartition(), timestampToSearch, new Some<>(IsolationLevel.READ_UNCOMMITTED), Optional.of(leaderEpoch), true).timestampAndOffsetOpt(); | ||
| if (timestampAndOffset.isEmpty()) { | ||
| throw new OffsetNotAvailableException("offset for timestamp " + timestampToSearch + " not found for topic partition: " + topicIdPartition); |
There was a problem hiding this comment.
nit: Please capitalise the "Offset" for consistency with the offsetForEarliestTimestamp and offsetForLatestTimestamp methods. I know this is a trivial point but we have high standards for these things.
There was a problem hiding this comment.
Thank you very much for your careful review! 😁
AndrewJSchofield
left a comment
There was a problem hiding this comment.
Thanks for the PR. I've run the code on my local environment and it works nicely. A couple of final points since the generated HTML documentation is not valid as the code stands. But apart from that, ready to merge.
| public static final String SHARE_AUTO_OFFSET_RESET_DEFAULT = ShareGroupAutoOffsetReset.LATEST.toString(); | ||
| public static final String SHARE_AUTO_OFFSET_RESET_DOC = "The strategy to initialize the share-partition start offset."; | ||
| public static final String SHARE_AUTO_OFFSET_RESET_DEFAULT = ShareGroupAutoOffsetResetStrategy.LATEST.name(); | ||
| public static final String SHARE_AUTO_OFFSET_RESET_DOC = "The strategy to initialize the share-partition start offset. " + |
There was a problem hiding this comment.
I was looking into what actually happens with these documentation strings and they need to be valid for HTML generation. So, I've slightly tweaked the text to replace the < and > so they're not interpreted as tags. Please can you use this string exactly.
public static final String SHARE_AUTO_OFFSET_RESET_DOC = "The strategy to initialize the share-partition start offset. " +
"<ul><li>earliest: automatically reset the offset to the earliest offset</li>" +
"<li>latest: automatically reset the offset to the latest offset</li>" +
"<li>by_duration:<duration>: automatically reset the offset to a configured duration from the current timestamp. " +
"<duration> must be specified in ISO8601 format (PnDTnHnMn.nS). " +
"Negative duration is not allowed.</li>" +
"<li>anything else: throw exception to the share consumer.</li></ul>";
| } | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
This validator needs a toString() method. Otherwise, the documentation will include the validator's object reference instead of a string for the valid options. I suggest:
public String toString() {
return "[earliest, latest, by_duration:PnDTnHnMn.nS]";
}
That worked nicely in the generated HTML.
|
There should also be an integration test but I've created a separate task https://issues.apache.org/jira/browse/KAFKA-18260. Feel free to pick this up @peterxcli if you're interested. |
AndrewJSchofield
left a comment
There was a problem hiding this comment.
Thanks for the PR. Looks good to me. Waiting for a green build before merging.
…apache#18096) Kafka consumer supports auto.offset.reset config option, which is used when there is no initial offset in Kafka (or) if the current offset does not exist any more on the server. This config currently supports earliest/latest/none options. Currently consumer resets might force applications to reprocess large amounts of data from earlier offsets. With infinite storage, its beneficial to have a duration based offset reset strategy. This will allow applications to consume/initialise from a fixed duration when there is no initial offset in Kafka. As part of KIP-932, we are adding support for share consumer groups. Share consumer groups supports dynamic group configuration property share.auto.offset.reset. This is used to set the initial Share-Partition Start Offset (SPSO) based on the share.auto.offset.reset configuration. Currently share.auto.offset.reset supports earliest and latest options to automatically reset the offset Similar to the Kafka Consumer, we will add support for by_duration: config value for share.auto.offset.reset. Reviewers: Andrew Schofield <[email protected]>, Apoorv Mittal <[email protected]>
Kafka consumer supports auto.offset.reset config option, which is used when there is no initial offset in Kafka (or) if the current offset does not exist any more on the server. This config currently supports earliest/latest/none options. Currently consumer resets might force applications to reprocess large amounts of data from earlier offsets. With infinite storage, its beneficial to have a duration based offset reset strategy. This will allow applications to consume/initialise from a fixed duration when there is no initial offset in Kafka.
As part of KIP-932, we are adding support for share consumer groups. Share consumer groups supports dynamic group configuration property share.auto.offset.reset. This is used to set the initial Share-Partition Start Offset (SPSO) based on the share.auto.offset.reset configuration. Currently share.auto.offset.reset supports earliest and latest options to automatically reset the offset
Similar to the Kafka Consumer, we will add support for by_duration: config value for
share.auto.offset.reset.Some conflict with: #17979
Committer Checklist (excluded from commit message)