Skip to content

KAFKA-18014: Add duration based offset reset option for ShareConsumer#18096

Merged
AndrewJSchofield merged 16 commits intoapache:trunkfrom
peterxcli:k18014
Dec 16, 2024
Merged

KAFKA-18014: Add duration based offset reset option for ShareConsumer#18096
AndrewJSchofield merged 16 commits intoapache:trunkfrom
peterxcli:k18014

Conversation

@peterxcli
Copy link
Copy Markdown
Member

@peterxcli peterxcli commented Dec 7, 2024

Kafka consumer supports auto.offset.reset config option, which is used when there is no initial offset in Kafka (or) if the current offset does not exist any more on the server. This config currently supports earliest/latest/none options. Currently consumer resets might force applications to reprocess large amounts of data from earlier offsets. With infinite storage, its beneficial to have a duration based offset reset strategy. This will allow applications to consume/initialise from a fixed duration when there is no initial offset in Kafka.

As part of KIP-932, we are adding support for share consumer groups. Share consumer groups supports dynamic group configuration property share.auto.offset.reset. This is used to set the initial Share-Partition Start Offset (SPSO) based on the share.auto.offset.reset configuration. Currently share.auto.offset.reset supports earliest and latest options to automatically reset the offset

Similar to the Kafka Consumer, we will add support for by_duration: config value for share.auto.offset.reset.

from KIP-1106

Some conflict with: #17979

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@github-actions github-actions Bot added core Kafka Broker KIP-932 Queues for Kafka labels Dec 7, 2024
import java.util.Objects;
import java.util.Optional;

public class ShareGroupAutoOffsetResetStrategy {
Copy link
Copy Markdown
Member Author

@peterxcli peterxcli Dec 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned in #17573 (comment), let each AutoOffsetResetStrategy evolve themselves

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have javadocs please on the class.

Copy link
Copy Markdown
Contributor

@apoorvmittal10 apoorvmittal10 Dec 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moreover the purpose of having a separate enum as ShareGroupAutoOffsetReset was to just mention that ShareGroup do no support all offset reset strategy but the ones explicitly mentioned in the ShareGroupAutoOffsetReset. Which is the sub strategies from AutoOffsetResetStrategy.

This class seems same as AutoOffsetResetStartegy, why do we need a different one? If we do need one then do we need to copy the methods as it is or could just use ShareGroupAutoOffsetResetStrategy as a wrapper over AutoOffsetResetStrategy to differentiate with self evolving startegies of ShareGroup?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we still need a separate ShareGroupAutoOffsetResetStrategy, because this allows us to separate the enum and validation logic. As for the wrapper part you mentioned, perhaps we could use the Delegation pattern to reuse AutoOffsetResetStrategy's logic. What do you think?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we still need a separate ShareGroupAutoOffsetResetStrategy

Yes, that's fine.

perhaps we could use the Delegation pattern

That could be a good option.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't over-engineer this. It's a pretty trivial class and there's not much saved. Up to you in this case from my point of view.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apoorvmittal10 Would like to know your opinion on this.

@peterxcli
Copy link
Copy Markdown
Member Author

peterxcli commented Dec 8, 2024

Hi @omkreddy, PR is ready, PTAL. Thanks!
Sorry for mentioning you at both side, just to make sure you do receive that 😁~

Copy link
Copy Markdown
Member

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. A few comments to address.

Option<FileRecords.TimestampAndOffset> timestampAndOffset = replicaManager.fetchOffsetForTimestamp(
topicIdPartition.topicPartition(), timestampToSearch, Option.empty(), Optional.of(leaderEpoch), true).timestampAndOffsetOpt();
if (timestampAndOffset.isEmpty()) {
throw new OffsetNotAvailableException("offset for timestamp to search: " + timestampToSearch + " not found for topic partition: " + topicIdPartition);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For consistency, I would adjust the capitalisation. I suggest "Offset for timestamp " + timestamptoSearch + " not found for topic partition" instead.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the latest changes, this was not done as requested. Please can you change the wording.

fromString(offsetStrategy);
} catch (Exception e) {
throw new ConfigException(name, value, "Invalid value `" + offsetStrategy + "` for configuration " +
name + ". The value must be either 'earliest', 'latest', 'none' or of the format 'by_duration:<PnDTnHnMn.nS.>'.");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

none is not permitted for a share group.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You still need to remove 'none' from this exception message because it is not a valid option here.

Copy link
Copy Markdown
Contributor

@apoorvmittal10 apoorvmittal10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. I ll have a second pass later today on PR.

Comment on lines +169 to +177
static long offsetForTimestamp(TopicIdPartition topicIdPartition, ReplicaManager replicaManager, long timestampToSearch, int leaderEpoch) {
Option<FileRecords.TimestampAndOffset> timestampAndOffset = replicaManager.fetchOffsetForTimestamp(
topicIdPartition.topicPartition(), timestampToSearch, Option.empty(), Optional.of(leaderEpoch), true).timestampAndOffsetOpt();
if (timestampAndOffset.isEmpty()) {
throw new OffsetNotAvailableException("Offset for timestamp: " + timestampToSearch + " not found for topic partition: " + topicIdPartition);
}
return timestampAndOffset.get().offset;
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we send the the isolation level as well instead of Option.empty()? Also we can have common method and send respective timestamps from: offsetForEarliestTimestamp, offsetForLatestTimestamp and offsetForTimestamp methods.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest READ_UNCOMMITTED isolation level is the best option here. When we add support for isolation levels in share groups, we can revisit. So far, share groups always use READ_UNCOMMITTED.

import java.util.Objects;
import java.util.Optional;

public class ShareGroupAutoOffsetResetStrategy {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have javadocs please on the class.

Copy link
Copy Markdown
Member

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this PR is almost ready to merge. Just a couple of outstanding issues (isolation level, javadoc).


if (offsetResetStrategy == GroupConfig.ShareGroupAutoOffsetReset.EARLIEST)
if (offsetResetStrategy.timestamp().isEmpty()) {
throw new Exception("The timestamp is not available for the share partition: " + topicIdPartition);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will translate to UnknownServerError to the client. Do we want to return that or specific exception?

Copy link
Copy Markdown
Member Author

@peterxcli peterxcli Dec 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, currently, this could only occur when BY_DURATION is used and duration somehow is not set. Would IllegalStateException be better to throw as there shouldn't be a case when offsetResetStrategy.timestamp() should ideally be empty?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can remove this exception here, because ShareGroupAutoOffsetResetStrategy already has the validation itself, so the .timestamp() must have value if its type is BY_DURATION. And we can add some comment to indicate this. Thoughts?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say it would be good to have a check as the method returns Optional. If it can never be empty then the method should never return Optional.

Comment thread core/src/main/java/kafka/server/share/SharePartition.java Outdated
Comment thread core/src/main/java/kafka/server/share/SharePartition.java Outdated
@peterxcli
Copy link
Copy Markdown
Member Author

peterxcli commented Dec 11, 2024

Thanks for your review! address some of the comments in 3d40b6e, PTAL. Thanks!

@peterxcli peterxcli force-pushed the k18014 branch 2 times, most recently from ebf9cac to 8f2ac44 Compare December 11, 2024 07:40
Copy link
Copy Markdown
Contributor

@apoorvmittal10 apoorvmittal10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for addressing the comments. I have replied to the question and took another pass.

Comment thread core/src/main/java/kafka/server/share/SharePartition.java Outdated
@peterxcli
Copy link
Copy Markdown
Member Author

Refactored ShareGroupAutoOffsetResetStrategy and remove returning optional from ShareGroupAutoOffsetResetStrategy#timestamp with cffdcf1

PTAL. Thanks!

Copy link
Copy Markdown
Member

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few additional comments. I think is getting close to being ready to merge.

Option<FileRecords.TimestampAndOffset> timestampAndOffset = replicaManager.fetchOffsetForTimestamp(
topicIdPartition.topicPartition(), timestampToSearch, Option.empty(), Optional.of(leaderEpoch), true).timestampAndOffsetOpt();
if (timestampAndOffset.isEmpty()) {
throw new OffsetNotAvailableException("offset for timestamp to search: " + timestampToSearch + " not found for topic partition: " + topicIdPartition);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the latest changes, this was not done as requested. Please can you change the wording.

Comment thread core/src/test/scala/unit/kafka/server/KafkaApisTest.scala Outdated
import java.util.Objects;
import java.util.Optional;

public class ShareGroupAutoOffsetResetStrategy {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't over-engineer this. It's a pretty trivial class and there's not much saved. Up to you in this case from my point of view.

fromString(offsetStrategy);
} catch (Exception e) {
throw new ConfigException(name, value, "Invalid value `" + offsetStrategy + "` for configuration " +
name + ". The value must be either 'earliest', 'latest', 'none' or of the format 'by_duration:<PnDTnHnMn.nS.>'.");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You still need to remove 'none' from this exception message because it is not a valid option here.

Option<FileRecords.TimestampAndOffset> timestampAndOffset = replicaManager.fetchOffsetForTimestamp(
topicIdPartition.topicPartition(), timestampToSearch, new Some<>(IsolationLevel.READ_UNCOMMITTED), Optional.of(leaderEpoch), true).timestampAndOffsetOpt();
if (timestampAndOffset.isEmpty()) {
throw new OffsetNotAvailableException("offset for timestamp " + timestampToSearch + " not found for topic partition: " + topicIdPartition);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Please capitalise the "Offset" for consistency with the offsetForEarliestTimestamp and offsetForLatestTimestamp methods. I know this is a trivial point but we have high standards for these things.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you very much for your careful review! 😁

Copy link
Copy Markdown
Member

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. I've run the code on my local environment and it works nicely. A couple of final points since the generated HTML documentation is not valid as the code stands. But apart from that, ready to merge.

public static final String SHARE_AUTO_OFFSET_RESET_DEFAULT = ShareGroupAutoOffsetReset.LATEST.toString();
public static final String SHARE_AUTO_OFFSET_RESET_DOC = "The strategy to initialize the share-partition start offset.";
public static final String SHARE_AUTO_OFFSET_RESET_DEFAULT = ShareGroupAutoOffsetResetStrategy.LATEST.name();
public static final String SHARE_AUTO_OFFSET_RESET_DOC = "The strategy to initialize the share-partition start offset. " +
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was looking into what actually happens with these documentation strings and they need to be valid for HTML generation. So, I've slightly tweaked the text to replace the < and > so they're not interpreted as tags. Please can you use this string exactly.

    public static final String SHARE_AUTO_OFFSET_RESET_DOC = "The strategy to initialize the share-partition start offset. " +
        "<ul><li>earliest: automatically reset the offset to the earliest offset</li>" +
        "<li>latest: automatically reset the offset to the latest offset</li>" +
        "<li>by_duration:&lt;duration&gt;: automatically reset the offset to a configured duration from the current timestamp. " +
        "&lt;duration&gt; must be specified in ISO8601 format (PnDTnHnMn.nS). " +
        "Negative duration is not allowed.</li>" +
        "<li>anything else: throw exception to the share consumer.</li></ul>";

}
}
}
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This validator needs a toString() method. Otherwise, the documentation will include the validator's object reference instead of a string for the valid options. I suggest:

   public String toString() {
            return "[earliest, latest, by_duration:PnDTnHnMn.nS]";
        }

That worked nicely in the generated HTML.

@AndrewJSchofield
Copy link
Copy Markdown
Member

There should also be an integration test but I've created a separate task https://issues.apache.org/jira/browse/KAFKA-18260. Feel free to pick this up @peterxcli if you're interested.

Copy link
Copy Markdown
Member

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. Looks good to me. Waiting for a green build before merging.

@AndrewJSchofield AndrewJSchofield merged commit 220c578 into apache:trunk Dec 16, 2024
tedyu pushed a commit to tedyu/kafka that referenced this pull request Jan 6, 2025
…apache#18096)

Kafka consumer supports auto.offset.reset config option, which is used when there is no initial offset in Kafka (or) if the current offset does not exist any more on the server. This config currently supports earliest/latest/none options. Currently consumer resets might force applications to reprocess large amounts of data from earlier offsets. With infinite storage, its beneficial to have a duration based offset reset strategy. This will allow applications to consume/initialise from a fixed duration when there is no initial offset in Kafka.

As part of KIP-932, we are adding support for share consumer groups. Share consumer groups supports dynamic group configuration property share.auto.offset.reset. This is used to set the initial Share-Partition Start Offset (SPSO) based on the share.auto.offset.reset configuration. Currently share.auto.offset.reset supports earliest and latest options to automatically reset the offset

Similar to the Kafka Consumer, we will add support for by_duration: config value for share.auto.offset.reset.

Reviewers: Andrew Schofield <[email protected]>, Apoorv Mittal <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ci-approved core Kafka Broker KIP-932 Queues for Kafka

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants