Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ public class ConsumerConfig extends AbstractConfig {
"(e.g. because that data has been deleted): " +
"<ul><li>earliest: automatically reset the offset to the earliest offset" +
"<li>latest: automatically reset the offset to the latest offset</li>" +
"<li>by_duration:<duration>: automatically reset the offset to a configured <duration> from the current timestamp. <duration> must be specified in ISO8601 format (PnDTnHnMn.nS). " +
"Negative duration is not allowed.</li>" +
"<li>none: throw exception to the consumer if no previous offset is found for the consumer's group</li>" +
"<li>anything else: throw exception to the consumer.</li></ul>" +
"<p>Note that altering partition numbers while setting this config to latest may cause message delivery loss since " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
private final SubscriptionState subscriptions;
private final Map<TopicPartition, Long> beginningOffsets;
private final Map<TopicPartition, Long> endOffsets;
private final Map<TopicPartition, Long> durationResetOffsets;
private final Map<TopicPartition, OffsetAndMetadata> committed;
private final Queue<Runnable> pollTasks;
private final Set<TopicPartition> paused;
Expand Down Expand Up @@ -104,6 +105,7 @@ private MockConsumer(AutoOffsetResetStrategy offsetResetStrategy) {
this.closed = false;
this.beginningOffsets = new HashMap<>();
this.endOffsets = new HashMap<>();
this.durationResetOffsets = new HashMap<>();
this.pollTasks = new LinkedList<>();
this.pollException = null;
this.wakeup = new AtomicBoolean(false);
Expand Down Expand Up @@ -433,6 +435,10 @@ public synchronized void updateEndOffsets(final Map<TopicPartition, Long> newOff
endOffsets.putAll(newOffsets);
}

public synchronized void updateDurationOffsets(final Map<TopicPartition, Long> newOffsets) {
durationResetOffsets.putAll(newOffsets);
}

public void disableTelemetry() {
telemetryDisabled = true;
}
Expand Down Expand Up @@ -610,6 +616,10 @@ private void resetOffsetPosition(TopicPartition tp) {
offset = endOffsets.get(tp);
if (offset == null)
throw new IllegalStateException("MockConsumer didn't have end offset specified, but tried to seek to end");
} else if (strategy.type() == AutoOffsetResetStrategy.StrategyType.BY_DURATION) {
offset = durationResetOffsets.get(tp);
if (offset == null)
throw new IllegalStateException("MockConsumer didn't have duration offset specified, but tried to seek to timestamp");
} else {
throw new NoOffsetForPartitionException(tp);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.utils.Utils;

import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Locale;
import java.util.Objects;
import java.util.Optional;

public class AutoOffsetResetStrategy {
private enum StrategyType {
LATEST, EARLIEST, NONE;
public enum StrategyType {
LATEST, EARLIEST, NONE, BY_DURATION;

@Override
public String toString() {
Expand All @@ -39,30 +43,65 @@ public String toString() {
public static final AutoOffsetResetStrategy NONE = new AutoOffsetResetStrategy(StrategyType.NONE);

private final StrategyType type;
private final Optional<Duration> duration;

private AutoOffsetResetStrategy(StrategyType type) {
this.type = type;
this.duration = Optional.empty();
}

public static boolean isValid(String offsetStrategy) {
return Arrays.asList(Utils.enumOptions(StrategyType.class)).contains(offsetStrategy);
private AutoOffsetResetStrategy(Duration duration) {
this.type = StrategyType.BY_DURATION;
this.duration = Optional.of(duration);
}

/**
* Returns the AutoOffsetResetStrategy from the given string.
*/
public static AutoOffsetResetStrategy fromString(String offsetStrategy) {
if (offsetStrategy == null || !isValid(offsetStrategy)) {
throw new IllegalArgumentException("Unknown auto offset reset strategy: " + offsetStrategy);
if (offsetStrategy == null) {
throw new IllegalArgumentException("Auto offset reset strategy is null");
}
StrategyType type = StrategyType.valueOf(offsetStrategy.toUpperCase(Locale.ROOT));
switch (type) {
case EARLIEST:
return EARLIEST;
case LATEST:
return LATEST;
case NONE:
return NONE;
default:
throw new IllegalArgumentException("Unknown auto offset reset strategy: " + offsetStrategy);

if (StrategyType.BY_DURATION.toString().equals(offsetStrategy)) {
throw new IllegalArgumentException("<:duration> part is missing in by_duration auto offset reset strategy.");
}

if (Arrays.asList(Utils.enumOptions(StrategyType.class)).contains(offsetStrategy)) {
StrategyType type = StrategyType.valueOf(offsetStrategy.toUpperCase(Locale.ROOT));
switch (type) {
case EARLIEST:
return EARLIEST;
case LATEST:
return LATEST;
case NONE:
return NONE;
default:
throw new IllegalArgumentException("Unknown auto offset reset strategy: " + offsetStrategy);
}
}

if (offsetStrategy.startsWith(StrategyType.BY_DURATION + ":")) {
String isoDuration = offsetStrategy.substring(StrategyType.BY_DURATION.toString().length() + 1);
try {
Duration duration = Duration.parse(isoDuration);
if (duration.isNegative()) {
throw new IllegalArgumentException("Negative duration is not supported in by_duration offset reset strategy.");
}
return new AutoOffsetResetStrategy(duration);
} catch (Exception e) {
throw new IllegalArgumentException("Unable to parse duration string in by_duration offset reset strategy.", e);
}
}

throw new IllegalArgumentException("Unknown auto offset reset strategy: " + offsetStrategy);
}

/**
* Returns the offset reset strategy type.
*/
public StrategyType type() {
return type;
}

/**
Expand All @@ -72,33 +111,54 @@ public String name() {
return type.toString();
}

/**
* Return the timestamp to be used for the ListOffsetsRequest.
* @return the timestamp for the OffsetResetStrategy,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello @omkreddy , this doc should this be AutoOffsetResetStrategy? Please take a look—I'm glad to open a minor PR to fix it if needed.

* if the strategy is EARLIEST or LATEST or duration is provided
* else return Optional.empty()
*/
public Optional<Long> timestamp() {
if (type == StrategyType.EARLIEST)
return Optional.of(ListOffsetsRequest.EARLIEST_TIMESTAMP);
else if (type == StrategyType.LATEST)
return Optional.of(ListOffsetsRequest.LATEST_TIMESTAMP);
else if (type == StrategyType.BY_DURATION && duration.isPresent()) {
Instant now = Instant.now();
return Optional.of(now.minus(duration.get()).toEpochMilli());
} else
return Optional.empty();
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AutoOffsetResetStrategy that = (AutoOffsetResetStrategy) o;
return Objects.equals(type, that.type);
return type == that.type && Objects.equals(duration, that.duration);
}

@Override
public int hashCode() {
return Objects.hashCode(type);
return Objects.hash(type, duration);
}

@Override
public String toString() {
return "AutoOffsetResetStrategy{" +
"type='" + type + '\'' +
"type=" + type +
(duration.map(value -> ", duration=" + value).orElse("")) +
'}';
}

public static class Validator implements ConfigDef.Validator {
@Override
public void ensureValid(String name, Object value) {
String strategy = (String) value;
if (!AutoOffsetResetStrategy.isValid(strategy)) {
throw new ConfigException(name, value, "Invalid value " + strategy + " for configuration " +
name + ": the value must be either 'earliest', 'latest', or 'none'.");
String offsetStrategy = (String) value;
try {
fromString(offsetStrategy);
} catch (Exception e) {
throw new ConfigException(name, value, "Invalid value `" + offsetStrategy + "` for configuration " +
name + ". The value must be either 'earliest', 'latest', 'none' or of the format 'by_duration:<PnDTnHnMn.nS.>'.");
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,13 @@ public OffsetFetcher(LogContext logContext,
* and one or more partitions aren't awaiting a seekToBeginning() or seekToEnd().
*/
public void resetPositionsIfNeeded() {
Map<TopicPartition, Long> offsetResetTimestamps = offsetFetcherUtils.getOffsetResetTimestamp();
Map<TopicPartition, AutoOffsetResetStrategy> partitionAutoOffsetResetStrategyMap =
offsetFetcherUtils.getOffsetResetStrategyForPartitions();

if (offsetResetTimestamps.isEmpty())
if (partitionAutoOffsetResetStrategyMap.isEmpty())
return;

resetPositionsAsync(offsetResetTimestamps);
resetPositionsAsync(partitionAutoOffsetResetStrategyMap);
}

/**
Expand Down Expand Up @@ -209,7 +210,9 @@ private Map<TopicPartition, Long> beginningOrEndOffset(Collection<TopicPartition
}
}

private void resetPositionsAsync(Map<TopicPartition, Long> partitionResetTimestamps) {
private void resetPositionsAsync(Map<TopicPartition, AutoOffsetResetStrategy> partitionAutoOffsetResetStrategyMap) {
Map<TopicPartition, Long> partitionResetTimestamps = partitionAutoOffsetResetStrategyMap.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().timestamp().get()));
Map<Node, Map<TopicPartition, ListOffsetsPartition>> timestampsToSearchByNode =
groupListOffsetRequests(partitionResetTimestamps, new HashSet<>());
for (Map.Entry<Node, Map<TopicPartition, ListOffsetsPartition>> entry : timestampsToSearchByNode.entrySet()) {
Expand All @@ -221,7 +224,7 @@ private void resetPositionsAsync(Map<TopicPartition, Long> partitionResetTimesta
future.addListener(new RequestFutureListener<>() {
@Override
public void onSuccess(ListOffsetResult result) {
offsetFetcherUtils.onSuccessfulResponseForResettingPositions(resetTimestamps, result);
offsetFetcherUtils.onSuccessfulResponseForResettingPositions(result, partitionAutoOffsetResetStrategyMap);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.kafka.common.message.ListOffsetsResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.requests.ListOffsetsResponse;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
import org.apache.kafka.common.utils.LogContext;
Expand Down Expand Up @@ -224,19 +223,22 @@ void validatePositionsOnMetadataChange() {
}
}

Map<TopicPartition, Long> getOffsetResetTimestamp() {
/**
* get OffsetResetStrategy for all assigned partitions
*/
Map<TopicPartition, AutoOffsetResetStrategy> getOffsetResetStrategyForPartitions() {
// Raise exception from previous offset fetch if there is one
RuntimeException exception = cachedResetPositionsException.getAndSet(null);
if (exception != null)
throw exception;

Set<TopicPartition> partitions = subscriptionState.partitionsNeedingReset(time.milliseconds());
final Map<TopicPartition, Long> offsetResetTimestamps = new HashMap<>();
final Map<TopicPartition, AutoOffsetResetStrategy> partitionAutoOffsetResetStrategyMap = new HashMap<>();
for (final TopicPartition partition : partitions) {
offsetResetTimestamps.put(partition, offsetResetStrategyTimestamp(partition));
partitionAutoOffsetResetStrategyMap.put(partition, offsetResetStrategyWithValidTimestamp(partition));
}

return offsetResetTimestamps;
return partitionAutoOffsetResetStrategyMap;
}

static Map<TopicPartition, OffsetAndTimestamp> buildListOffsetsResult(
Expand Down Expand Up @@ -283,14 +285,13 @@ static Map<TopicPartition, OffsetAndTimestampInternal> buildOffsetsForTimeIntern
return offsetsResults;
}

private long offsetResetStrategyTimestamp(final TopicPartition partition) {
private AutoOffsetResetStrategy offsetResetStrategyWithValidTimestamp(final TopicPartition partition) {
AutoOffsetResetStrategy strategy = subscriptionState.resetStrategy(partition);
if (strategy == AutoOffsetResetStrategy.EARLIEST)
return ListOffsetsRequest.EARLIEST_TIMESTAMP;
else if (strategy == AutoOffsetResetStrategy.LATEST)
return ListOffsetsRequest.LATEST_TIMESTAMP;
else
if (strategy.timestamp().isPresent()) {
return strategy;
} else {
throw new NoOffsetForPartitionException(partition);
}
}

static Set<String> topicsForPartitions(Collection<TopicPartition> partitions) {
Expand Down Expand Up @@ -319,18 +320,9 @@ void updateSubscriptionState(Map<TopicPartition, OffsetFetcherUtils.ListOffsetDa
}
}

static AutoOffsetResetStrategy timestampToOffsetResetStrategy(long timestamp) {
if (timestamp == ListOffsetsRequest.EARLIEST_TIMESTAMP)
return AutoOffsetResetStrategy.EARLIEST;
else if (timestamp == ListOffsetsRequest.LATEST_TIMESTAMP)
return AutoOffsetResetStrategy.LATEST;
else
return null;
}

void onSuccessfulResponseForResettingPositions(
final Map<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition> resetTimestamps,
final ListOffsetResult result) {
final ListOffsetResult result,
final Map<TopicPartition, AutoOffsetResetStrategy> partitionAutoOffsetResetStrategyMap) {
if (!result.partitionsToRetry.isEmpty()) {
subscriptionState.requestFailed(result.partitionsToRetry, time.milliseconds() + retryBackoffMs);
metadata.requestUpdate(false);
Expand All @@ -339,10 +331,9 @@ void onSuccessfulResponseForResettingPositions(
for (Map.Entry<TopicPartition, ListOffsetData> fetchedOffset : result.fetchedOffsets.entrySet()) {
TopicPartition partition = fetchedOffset.getKey();
ListOffsetData offsetData = fetchedOffset.getValue();
ListOffsetsRequestData.ListOffsetsPartition requestedReset = resetTimestamps.get(partition);
resetPositionIfNeeded(
partition,
timestampToOffsetResetStrategy(requestedReset.timestamp()),
partitionAutoOffsetResetStrategyMap.get(partition),
offsetData);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -472,20 +472,20 @@ private boolean canReusePendingOffsetFetchEvent(Set<TopicPartition> partitions)
* this function (ex. {@link org.apache.kafka.common.errors.TopicAuthorizationException})
*/
CompletableFuture<Void> resetPositionsIfNeeded() {
Map<TopicPartition, Long> offsetResetTimestamps;
Map<TopicPartition, AutoOffsetResetStrategy> partitionAutoOffsetResetStrategyMap;

try {
offsetResetTimestamps = offsetFetcherUtils.getOffsetResetTimestamp();
partitionAutoOffsetResetStrategyMap = offsetFetcherUtils.getOffsetResetStrategyForPartitions();
} catch (Exception e) {
CompletableFuture<Void> result = new CompletableFuture<>();
result.completeExceptionally(e);
return result;
}

if (offsetResetTimestamps.isEmpty())
if (partitionAutoOffsetResetStrategyMap.isEmpty())
return CompletableFuture.completedFuture(null);

return sendListOffsetsRequestsAndResetPositions(offsetResetTimestamps);
return sendListOffsetsRequestsAndResetPositions(partitionAutoOffsetResetStrategyMap);
}

/**
Expand Down Expand Up @@ -652,12 +652,14 @@ private CompletableFuture<ListOffsetResult> buildListOffsetRequestToNode(
* partitions. Use the retrieved offsets to reset positions in the subscription state.
* This also adds the request to the list of unsentRequests.
*
* @param timestampsToSearch the mapping between partitions and target time
* @param partitionAutoOffsetResetStrategyMap the mapping between partitions and AutoOffsetResetStrategy
* @return A {@link CompletableFuture} which completes when the requests are
* complete.
*/
private CompletableFuture<Void> sendListOffsetsRequestsAndResetPositions(
final Map<TopicPartition, Long> timestampsToSearch) {
final Map<TopicPartition, AutoOffsetResetStrategy> partitionAutoOffsetResetStrategyMap) {
Map<TopicPartition, Long> timestampsToSearch = partitionAutoOffsetResetStrategyMap.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().timestamp().get()));
Map<Node, Map<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition>> timestampsToSearchByNode =
groupListOffsetRequests(timestampsToSearch, Optional.empty());

Expand All @@ -677,8 +679,8 @@ private CompletableFuture<Void> sendListOffsetsRequestsAndResetPositions(

partialResult.whenComplete((result, error) -> {
if (error == null) {
offsetFetcherUtils.onSuccessfulResponseForResettingPositions(resetTimestamps,
result);
offsetFetcherUtils.onSuccessfulResponseForResettingPositions(result,
partitionAutoOffsetResetStrategyMap);
} else {
RuntimeException e;
if (error instanceof RuntimeException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ synchronized void maybeSeekUnvalidated(TopicPartition tp, FetchPosition position
log.debug("Skipping reset of partition {} since it is no longer assigned", tp);
} else if (!state.awaitingReset()) {
log.debug("Skipping reset of partition {} since reset is no longer needed", tp);
} else if (requestedResetStrategy != state.resetStrategy) {
} else if (requestedResetStrategy != null && !requestedResetStrategy.equals(state.resetStrategy)) {
log.debug("Skipping reset of partition {} since an alternative reset has been requested", tp);
} else {
log.info("Resetting offset for partition {} to position {}.", tp, position);
Expand Down
Loading