Skip to content

Commit 3d40b6e

Browse files
committed
Address Andrew's and Apoorv's comments
1 parent ebf9cac commit 3d40b6e

3 files changed

Lines changed: 32 additions & 17 deletions

File tree

core/src/main/java/kafka/server/share/ShareFetchUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ static long offsetForLatestTimestamp(TopicIdPartition topicIdPartition, ReplicaM
168168
*/
169169
static long offsetForTimestamp(TopicIdPartition topicIdPartition, ReplicaManager replicaManager, long timestampToSearch, int leaderEpoch) {
170170
Option<FileRecords.TimestampAndOffset> timestampAndOffset = replicaManager.fetchOffsetForTimestamp(
171-
topicIdPartition.topicPartition(), timestampToSearch, Option.empty(), Optional.of(leaderEpoch), true).timestampAndOffsetOpt();
171+
topicIdPartition.topicPartition(), timestampToSearch, new Some<>(IsolationLevel.READ_UNCOMMITTED), Optional.of(leaderEpoch), true).timestampAndOffsetOpt();
172172
if (timestampAndOffset.isEmpty()) {
173173
throw new OffsetNotAvailableException("offset for timestamp to search: " + timestampToSearch + " not found for topic partition: " + topicIdPartition);
174174
}

core/src/main/java/kafka/server/share/SharePartition.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import kafka.server.ReplicaManager;
2020
import kafka.server.share.SharePartitionManager.SharePartitionListener;
2121

22+
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
2223
import org.apache.kafka.common.KafkaException;
2324
import org.apache.kafka.common.TopicIdPartition;
2425
import org.apache.kafka.common.Uuid;
@@ -34,7 +35,6 @@
3435
import org.apache.kafka.common.message.ShareFetchResponseData.AcquiredRecords;
3536
import org.apache.kafka.common.protocol.Errors;
3637
import org.apache.kafka.common.record.RecordBatch;
37-
import org.apache.kafka.common.requests.ListOffsetsRequest;
3838
import org.apache.kafka.common.utils.Time;
3939
import org.apache.kafka.coordinator.group.GroupConfig;
4040
import org.apache.kafka.coordinator.group.GroupConfigManager;
@@ -2135,14 +2135,12 @@ private long startOffsetDuringInitialization(long partitionDataStartOffset) thro
21352135
offsetResetStrategy = GroupConfig.defaultShareAutoOffsetReset();
21362136
}
21372137

2138-
if (offsetResetStrategy.timestamp().isEmpty()) {
2139-
throw new Exception("The timestamp is not available for the share partition: " + topicIdPartition);
2140-
}
2141-
final long timestamp = offsetResetStrategy.timestamp().get();
2138+
final long timestamp = offsetResetStrategy.timestamp()
2139+
.orElseThrow(() -> new NoOffsetForPartitionException(topicIdPartition.topicPartition()));
21422140

2143-
if (timestamp == ListOffsetsRequest.LATEST_TIMESTAMP) {
2141+
if (offsetResetStrategy.type() == ShareGroupAutoOffsetResetStrategy.StrategyType.LATEST) {
21442142
return offsetForLatestTimestamp(topicIdPartition, replicaManager, leaderEpoch);
2145-
} else if (timestamp == ListOffsetsRequest.EARLIEST_TIMESTAMP) {
2143+
} else if (offsetResetStrategy.type() == ShareGroupAutoOffsetResetStrategy.StrategyType.EARLIEST) {
21462144
return offsetForEarliestTimestamp(topicIdPartition, replicaManager, leaderEpoch);
21472145
} else {
21482146
return offsetForTimestamp(topicIdPartition, replicaManager, timestamp, leaderEpoch);

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ShareGroupAutoOffsetResetStrategy.java

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,24 @@
2828
import java.util.Objects;
2929
import java.util.Optional;
3030

31+
/**
32+
* Represents the strategy for resetting offsets in share consumer groups when no previous offset is found
33+
* for a partition or when an offset is out of range.
34+
* <p>
35+
* Supports three strategies:
36+
* <ul>
37+
* <li>{@code EARLIEST} - Reset the offset to the earliest available offset
38+
* <li>{@code LATEST} - Reset the offset to the latest available offset
39+
* <li>{@code BY_DURATION} - Reset the offset to a timestamp that is the specified duration before the current time
40+
* </ul>
41+
* <p>
42+
* The strategy can be configured using string values:
43+
* <ul>
44+
* <li>"earliest" for {@code EARLIEST}
45+
* <li>"latest" for {@code LATEST}
46+
* <li>"by_duration:&lt;duration&gt;" for {@code BY_DURATION}, where duration is in ISO-8601 format (e.g., "PT1H" for 1 hour)
47+
* </ul>
48+
*/
3149
public class ShareGroupAutoOffsetResetStrategy {
3250
public enum StrategyType {
3351
LATEST, EARLIEST, BY_DURATION;
@@ -115,15 +133,14 @@ public String name() {
115133
* else return Optional.empty()
116134
*/
117135
public Optional<Long> timestamp() {
118-
if (type == StrategyType.EARLIEST)
119-
return Optional.of(ListOffsetsRequest.EARLIEST_TIMESTAMP);
120-
else if (type == StrategyType.LATEST)
121-
return Optional.of(ListOffsetsRequest.LATEST_TIMESTAMP);
122-
else if (type == StrategyType.BY_DURATION && duration.isPresent()) {
123-
Instant now = Instant.now();
124-
return Optional.of(now.minus(duration.get()).toEpochMilli());
125-
} else
126-
return Optional.empty();
136+
return switch (type) {
137+
case EARLIEST -> Optional.of(ListOffsetsRequest.EARLIEST_TIMESTAMP);
138+
case LATEST -> Optional.of(ListOffsetsRequest.LATEST_TIMESTAMP);
139+
case BY_DURATION -> duration.isPresent()
140+
? Optional.of(Instant.now().minus(duration.get()).toEpochMilli())
141+
: Optional.empty();
142+
default -> Optional.empty();
143+
};
127144
}
128145

129146
@Override

0 commit comments

Comments
 (0)