|
28 | 28 | import java.util.Objects; |
29 | 29 | import java.util.Optional; |
30 | 30 |
|
| 31 | +/** |
| 32 | + * Represents the strategy for resetting offsets in share consumer groups when no previous offset is found |
| 33 | + * for a partition or when an offset is out of range. |
| 34 | + * <p> |
| 35 | + * Supports three strategies: |
| 36 | + * <ul> |
| 37 | + * <li>{@code EARLIEST} - Reset the offset to the earliest available offset |
| 38 | + * <li>{@code LATEST} - Reset the offset to the latest available offset |
| 39 | + * <li>{@code BY_DURATION} - Reset the offset to a timestamp that is the specified duration before the current time |
| 40 | + * </ul> |
| 41 | + * <p> |
| 42 | + * The strategy can be configured using string values: |
| 43 | + * <ul> |
| 44 | + * <li>"earliest" for {@code EARLIEST} |
| 45 | + * <li>"latest" for {@code LATEST} |
| 46 | + * <li>"by_duration:<duration>" for {@code BY_DURATION}, where duration is in ISO-8601 format (e.g., "PT1H" for 1 hour) |
| 47 | + * </ul> |
| 48 | + */ |
31 | 49 | public class ShareGroupAutoOffsetResetStrategy { |
32 | 50 | public enum StrategyType { |
33 | 51 | LATEST, EARLIEST, BY_DURATION; |
@@ -115,15 +133,14 @@ public String name() { |
115 | 133 | * else return Optional.empty() |
116 | 134 | */ |
117 | 135 | public Optional<Long> timestamp() { |
118 | | - if (type == StrategyType.EARLIEST) |
119 | | - return Optional.of(ListOffsetsRequest.EARLIEST_TIMESTAMP); |
120 | | - else if (type == StrategyType.LATEST) |
121 | | - return Optional.of(ListOffsetsRequest.LATEST_TIMESTAMP); |
122 | | - else if (type == StrategyType.BY_DURATION && duration.isPresent()) { |
123 | | - Instant now = Instant.now(); |
124 | | - return Optional.of(now.minus(duration.get()).toEpochMilli()); |
125 | | - } else |
126 | | - return Optional.empty(); |
| 136 | + return switch (type) { |
| 137 | + case EARLIEST -> Optional.of(ListOffsetsRequest.EARLIEST_TIMESTAMP); |
| 138 | + case LATEST -> Optional.of(ListOffsetsRequest.LATEST_TIMESTAMP); |
| 139 | + case BY_DURATION -> duration.isPresent() |
| 140 | + ? Optional.of(Instant.now().minus(duration.get()).toEpochMilli()) |
| 141 | + : Optional.empty(); |
| 142 | + default -> Optional.empty(); |
| 143 | + }; |
127 | 144 | } |
128 | 145 |
|
129 | 146 | @Override |
|
0 commit comments