Skip to content

Commit 220c578

Browse files
authored
KAFKA-18014: Add duration based offset reset option for ShareConsumer (#18096)
Kafka consumer supports auto.offset.reset config option, which is used when there is no initial offset in Kafka (or) if the current offset does not exist any more on the server. This config currently supports earliest/latest/none options. Currently consumer resets might force applications to reprocess large amounts of data from earlier offsets. With infinite storage, its beneficial to have a duration based offset reset strategy. This will allow applications to consume/initialise from a fixed duration when there is no initial offset in Kafka. As part of KIP-932, we are adding support for share consumer groups. Share consumer groups supports dynamic group configuration property share.auto.offset.reset. This is used to set the initial Share-Partition Start Offset (SPSO) based on the share.auto.offset.reset configuration. Currently share.auto.offset.reset supports earliest and latest options to automatically reset the offset Similar to the Kafka Consumer, we will add support for by_duration: config value for share.auto.offset.reset. Reviewers: Andrew Schofield <[email protected]>, Apoorv Mittal <[email protected]>
1 parent ca7b1dc commit 220c578

8 files changed

Lines changed: 440 additions & 26 deletions

File tree

core/src/main/java/kafka/server/share/ShareFetchUtils.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,20 @@ static long offsetForLatestTimestamp(TopicIdPartition topicIdPartition, ReplicaM
161161
return timestampAndOffset.get().offset;
162162
}
163163

164+
/**
165+
* The method is used to get the offset for the given timestamp for the topic-partition.
166+
*
167+
* @return The offset for the given timestamp.
168+
*/
169+
static long offsetForTimestamp(TopicIdPartition topicIdPartition, ReplicaManager replicaManager, long timestampToSearch, int leaderEpoch) {
170+
Option<FileRecords.TimestampAndOffset> timestampAndOffset = replicaManager.fetchOffsetForTimestamp(
171+
topicIdPartition.topicPartition(), timestampToSearch, new Some<>(IsolationLevel.READ_UNCOMMITTED), Optional.of(leaderEpoch), true).timestampAndOffsetOpt();
172+
if (timestampAndOffset.isEmpty()) {
173+
throw new OffsetNotAvailableException("Offset for timestamp " + timestampToSearch + " not found for topic partition: " + topicIdPartition);
174+
}
175+
return timestampAndOffset.get().offset;
176+
}
177+
164178
static int leaderEpoch(ReplicaManager replicaManager, TopicPartition tp) {
165179
return partition(replicaManager, tp).getLeaderEpoch();
166180
}

core/src/main/java/kafka/server/share/SharePartition.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.kafka.common.utils.Time;
3838
import org.apache.kafka.coordinator.group.GroupConfig;
3939
import org.apache.kafka.coordinator.group.GroupConfigManager;
40+
import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy;
4041
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
4142
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
4243
import org.apache.kafka.server.share.fetch.DelayedShareFetchKey;
@@ -76,6 +77,7 @@
7677

7778
import static kafka.server.share.ShareFetchUtils.offsetForEarliestTimestamp;
7879
import static kafka.server.share.ShareFetchUtils.offsetForLatestTimestamp;
80+
import static kafka.server.share.ShareFetchUtils.offsetForTimestamp;
7981

8082
/**
8183
* The SharePartition is used to track the state of a partition that is shared between multiple
@@ -2093,16 +2095,21 @@ private long startOffsetDuringInitialization(long partitionDataStartOffset) thro
20932095
if (partitionDataStartOffset != PartitionFactory.UNINITIALIZED_START_OFFSET) {
20942096
return partitionDataStartOffset;
20952097
}
2096-
GroupConfig.ShareGroupAutoOffsetReset offsetResetStrategy;
2098+
ShareGroupAutoOffsetResetStrategy offsetResetStrategy;
20972099
if (groupConfigManager.groupConfig(groupId).isPresent()) {
20982100
offsetResetStrategy = groupConfigManager.groupConfig(groupId).get().shareAutoOffsetReset();
20992101
} else {
21002102
offsetResetStrategy = GroupConfig.defaultShareAutoOffsetReset();
21012103
}
21022104

2103-
if (offsetResetStrategy == GroupConfig.ShareGroupAutoOffsetReset.EARLIEST)
2105+
if (offsetResetStrategy.type() == ShareGroupAutoOffsetResetStrategy.StrategyType.LATEST) {
2106+
return offsetForLatestTimestamp(topicIdPartition, replicaManager, leaderEpoch);
2107+
} else if (offsetResetStrategy.type() == ShareGroupAutoOffsetResetStrategy.StrategyType.EARLIEST) {
21042108
return offsetForEarliestTimestamp(topicIdPartition, replicaManager, leaderEpoch);
2105-
return offsetForLatestTimestamp(topicIdPartition, replicaManager, leaderEpoch);
2109+
} else {
2110+
// offsetResetStrategy type is BY_DURATION
2111+
return offsetForTimestamp(topicIdPartition, replicaManager, offsetResetStrategy.timestamp(), leaderEpoch);
2112+
}
21062113
}
21072114

21082115
// Visible for testing. Should only be used for testing purposes.

core/src/test/java/kafka/server/share/SharePartitionTest.java

Lines changed: 115 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.apache.kafka.common.utils.Time;
4747
import org.apache.kafka.coordinator.group.GroupConfig;
4848
import org.apache.kafka.coordinator.group.GroupConfigManager;
49+
import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy;
4950
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
5051
import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
5152
import org.apache.kafka.server.share.persister.NoOpShareStatePersister;
@@ -215,7 +216,7 @@ public void testMaybeInitializeDefaultStartEpochGroupConfigReturnsEarliest() {
215216
GroupConfigManager groupConfigManager = Mockito.mock(GroupConfigManager.class);
216217
GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
217218
Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
218-
Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(GroupConfig.ShareGroupAutoOffsetReset.EARLIEST);
219+
Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(ShareGroupAutoOffsetResetStrategy.EARLIEST);
219220

220221
ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
221222

@@ -265,7 +266,7 @@ public void testMaybeInitializeDefaultStartEpochGroupConfigReturnsLatest() {
265266
GroupConfigManager groupConfigManager = Mockito.mock(GroupConfigManager.class);
266267
GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
267268
Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
268-
Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(GroupConfig.ShareGroupAutoOffsetReset.LATEST);
269+
Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(ShareGroupAutoOffsetResetStrategy.LATEST);
269270

270271
ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
271272

@@ -298,6 +299,64 @@ public void testMaybeInitializeDefaultStartEpochGroupConfigReturnsLatest() {
298299
assertEquals(PartitionFactory.DEFAULT_STATE_EPOCH, sharePartition.stateEpoch());
299300
}
300301

302+
@Test
303+
public void testMaybeInitializeDefaultStartEpochGroupConfigReturnsByDuration() {
304+
Persister persister = Mockito.mock(Persister.class);
305+
ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class);
306+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
307+
new TopicData<>(TOPIC_ID_PARTITION.topicId(), Collections.singletonList(
308+
PartitionFactory.newPartitionAllData(
309+
0, PartitionFactory.DEFAULT_STATE_EPOCH,
310+
PartitionFactory.UNINITIALIZED_START_OFFSET,
311+
PartitionFactory.DEFAULT_ERROR_CODE,
312+
PartitionFactory.DEFAULT_ERR_MESSAGE,
313+
Collections.emptyList())))));
314+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
315+
316+
GroupConfigManager groupConfigManager = Mockito.mock(GroupConfigManager.class);
317+
GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
318+
Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
319+
320+
// Since the timestamp() of duration based strategy is not deterministic, we need to mock the ShareGroupAutoOffsetResetStrategy.
321+
// mock: final ShareGroupAutoOffsetResetStrategy resetStrategy = ShareGroupAutoOffsetResetStrategy.fromString("by_duration:PT1H");
322+
final ShareGroupAutoOffsetResetStrategy resetStrategy = Mockito.mock(ShareGroupAutoOffsetResetStrategy.class);
323+
final long expectedTimestamp = MOCK_TIME.milliseconds() - TimeUnit.HOURS.toMillis(1);
324+
Mockito.when(resetStrategy.type()).thenReturn(ShareGroupAutoOffsetResetStrategy.StrategyType.BY_DURATION);
325+
Mockito.when(resetStrategy.timestamp()).thenReturn(expectedTimestamp);
326+
327+
Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(resetStrategy);
328+
329+
ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
330+
331+
FileRecords.TimestampAndOffset timestampAndOffset = new FileRecords.TimestampAndOffset(MOCK_TIME.milliseconds() - TimeUnit.HOURS.toMillis(1), 15L, Optional.empty());
332+
Mockito.doReturn(new OffsetResultHolder(Option.apply(timestampAndOffset), Option.empty())).
333+
when(replicaManager).fetchOffsetForTimestamp(Mockito.any(TopicPartition.class), Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean());
334+
335+
SharePartition sharePartition = SharePartitionBuilder.builder()
336+
.withPersister(persister)
337+
.withGroupConfigManager(groupConfigManager)
338+
.withReplicaManager(replicaManager)
339+
.build();
340+
341+
CompletableFuture<Void> result = sharePartition.maybeInitialize();
342+
assertTrue(result.isDone());
343+
assertFalse(result.isCompletedExceptionally());
344+
345+
// replicaManager.fetchOffsetForTimestamp should be called with the (current time - 1 hour)
346+
Mockito.verify(replicaManager).fetchOffsetForTimestamp(
347+
Mockito.any(TopicPartition.class),
348+
Mockito.eq(expectedTimestamp),
349+
Mockito.any(),
350+
Mockito.any(),
351+
Mockito.anyBoolean()
352+
);
353+
354+
assertEquals(SharePartitionState.ACTIVE, sharePartition.partitionState());
355+
assertEquals(15, sharePartition.startOffset());
356+
assertEquals(15, sharePartition.endOffset());
357+
assertEquals(PartitionFactory.DEFAULT_STATE_EPOCH, sharePartition.stateEpoch());
358+
}
359+
301360
@Test
302361
public void testMaybeInitializeDefaultStartEpochGroupConfigNotPresent() {
303362
Persister persister = Mockito.mock(Persister.class);
@@ -407,7 +466,7 @@ public void testMaybeInitializeFetchOffsetForEarliestTimestampThrowsError() {
407466
GroupConfigManager groupConfigManager = Mockito.mock(GroupConfigManager.class);
408467
GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
409468
Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
410-
Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(GroupConfig.ShareGroupAutoOffsetReset.EARLIEST);
469+
Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(ShareGroupAutoOffsetResetStrategy.EARLIEST);
411470

412471
ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
413472

@@ -436,6 +495,59 @@ public void testMaybeInitializeFetchOffsetForEarliestTimestampThrowsError() {
436495
assertEquals(SharePartitionState.FAILED, sharePartition.partitionState());
437496
}
438497

498+
@Test
499+
public void testMaybeInitializeFetchOffsetForByDurationThrowsError() {
500+
Persister persister = Mockito.mock(Persister.class);
501+
ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class);
502+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
503+
new TopicData<>(TOPIC_ID_PARTITION.topicId(), Collections.singletonList(
504+
PartitionFactory.newPartitionAllData(
505+
0, PartitionFactory.DEFAULT_STATE_EPOCH,
506+
PartitionFactory.UNINITIALIZED_START_OFFSET,
507+
PartitionFactory.DEFAULT_ERROR_CODE,
508+
PartitionFactory.DEFAULT_ERR_MESSAGE,
509+
Collections.emptyList())))));
510+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
511+
512+
GroupConfigManager groupConfigManager = Mockito.mock(GroupConfigManager.class);
513+
GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
514+
Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
515+
516+
// We need to mock the ShareGroupAutoOffsetResetStrategy as the timestamp() of duration based strategy is not deterministic.
517+
// final ShareGroupAutoOffsetResetStrategy resetStrategy = ShareGroupAutoOffsetResetStrategy.fromString("by_duration:PT1H");
518+
final ShareGroupAutoOffsetResetStrategy resetStrategy = Mockito.mock(ShareGroupAutoOffsetResetStrategy.class);
519+
final long expectedTimestamp = MOCK_TIME.milliseconds() - TimeUnit.HOURS.toMillis(1);
520+
Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(resetStrategy);
521+
522+
Mockito.when(resetStrategy.type()).thenReturn(ShareGroupAutoOffsetResetStrategy.StrategyType.BY_DURATION);
523+
Mockito.when(resetStrategy.timestamp()).thenReturn(expectedTimestamp);
524+
525+
ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
526+
527+
Mockito.when(replicaManager.fetchOffsetForTimestamp(Mockito.any(TopicPartition.class), Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean()))
528+
.thenThrow(new RuntimeException("fetch offsets exception"));
529+
530+
SharePartition sharePartition = SharePartitionBuilder.builder()
531+
.withPersister(persister)
532+
.withGroupConfigManager(groupConfigManager)
533+
.withReplicaManager(replicaManager)
534+
.build();
535+
536+
CompletableFuture<Void> result = sharePartition.maybeInitialize();
537+
assertTrue(result.isDone());
538+
assertTrue(result.isCompletedExceptionally());
539+
540+
Mockito.verify(replicaManager).fetchOffsetForTimestamp(
541+
Mockito.any(TopicPartition.class),
542+
Mockito.eq(expectedTimestamp),
543+
Mockito.any(),
544+
Mockito.any(),
545+
Mockito.anyBoolean()
546+
);
547+
548+
assertEquals(SharePartitionState.FAILED, sharePartition.partitionState());
549+
}
550+
439551
@Test
440552
public void testMaybeInitializeSharePartitionAgain() {
441553
Persister persister = Mockito.mock(Persister.class);

core/src/test/scala/unit/kafka/server/KafkaApisTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -585,7 +585,7 @@ class KafkaApisTest extends Logging {
585585
cgConfigs.put(SHARE_SESSION_TIMEOUT_MS_CONFIG, GroupCoordinatorConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_DEFAULT.toString)
586586
cgConfigs.put(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, GroupCoordinatorConfig.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT.toString)
587587
cgConfigs.put(SHARE_RECORD_LOCK_DURATION_MS_CONFIG, ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_DEFAULT.toString)
588-
cgConfigs.put(SHARE_AUTO_OFFSET_RESET_CONFIG, GroupConfig.defaultShareAutoOffsetReset.toString)
588+
cgConfigs.put(SHARE_AUTO_OFFSET_RESET_CONFIG, GroupConfig.SHARE_AUTO_OFFSET_RESET_DEFAULT)
589589
when(configRepository.groupConfig(consumerGroupId)).thenReturn(cgConfigs)
590590

591591
val describeConfigsRequest = new DescribeConfigsRequest.Builder(new DescribeConfigsRequestData()

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java

Lines changed: 13 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,8 @@
2121
import org.apache.kafka.common.config.ConfigDef;
2222
import org.apache.kafka.common.config.ConfigDef.Type;
2323
import org.apache.kafka.common.errors.InvalidConfigurationException;
24-
import org.apache.kafka.common.utils.Utils;
2524
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig;
2625

27-
import java.util.Locale;
2826
import java.util.Map;
2927
import java.util.Optional;
3028
import java.util.Properties;
@@ -34,7 +32,6 @@
3432
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
3533
import static org.apache.kafka.common.config.ConfigDef.Type.INT;
3634
import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
37-
import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
3835

3936
/**
4037
* Group configuration related parameters and supporting methods like validation, etc. are
@@ -53,8 +50,14 @@ public final class GroupConfig extends AbstractConfig {
5350
public static final String SHARE_RECORD_LOCK_DURATION_MS_CONFIG = "share.record.lock.duration.ms";
5451

5552
public static final String SHARE_AUTO_OFFSET_RESET_CONFIG = "share.auto.offset.reset";
56-
public static final String SHARE_AUTO_OFFSET_RESET_DEFAULT = ShareGroupAutoOffsetReset.LATEST.toString();
57-
public static final String SHARE_AUTO_OFFSET_RESET_DOC = "The strategy to initialize the share-partition start offset.";
53+
public static final String SHARE_AUTO_OFFSET_RESET_DEFAULT = ShareGroupAutoOffsetResetStrategy.LATEST.name();
54+
public static final String SHARE_AUTO_OFFSET_RESET_DOC = "The strategy to initialize the share-partition start offset. " +
55+
"<ul><li>earliest: automatically reset the offset to the earliest offset</li>" +
56+
"<li>latest: automatically reset the offset to the latest offset</li>" +
57+
"<li>by_duration:&lt;duration&gt;: automatically reset the offset to a configured duration from the current timestamp. " +
58+
"&lt;duration&gt; must be specified in ISO8601 format (PnDTnHnMn.nS). " +
59+
"Negative duration is not allowed.</li>" +
60+
"<li>anything else: throw exception to the share consumer.</li></ul>";
5861

5962
public final int consumerSessionTimeoutMs;
6063

@@ -102,7 +105,7 @@ public final class GroupConfig extends AbstractConfig {
102105
.define(SHARE_AUTO_OFFSET_RESET_CONFIG,
103106
STRING,
104107
SHARE_AUTO_OFFSET_RESET_DEFAULT,
105-
in(Utils.enumOptions(ShareGroupAutoOffsetReset.class)),
108+
new ShareGroupAutoOffsetResetStrategy.Validator(),
106109
MEDIUM,
107110
SHARE_AUTO_OFFSET_RESET_DOC);
108111

@@ -223,8 +226,8 @@ public static GroupConfig fromProps(Map<?, ?> defaults, Properties overrides) {
223226
/**
224227
* The default share group auto offset reset strategy.
225228
*/
226-
public static ShareGroupAutoOffsetReset defaultShareAutoOffsetReset() {
227-
return ShareGroupAutoOffsetReset.valueOf(SHARE_AUTO_OFFSET_RESET_DEFAULT.toUpperCase(Locale.ROOT));
229+
public static ShareGroupAutoOffsetResetStrategy defaultShareAutoOffsetReset() {
230+
return ShareGroupAutoOffsetResetStrategy.fromString(SHARE_AUTO_OFFSET_RESET_DEFAULT);
228231
}
229232

230233
/**
@@ -265,16 +268,7 @@ public int shareRecordLockDurationMs() {
265268
/**
266269
* The share group auto offset reset strategy.
267270
*/
268-
public ShareGroupAutoOffsetReset shareAutoOffsetReset() {
269-
return ShareGroupAutoOffsetReset.valueOf(shareAutoOffsetReset.toUpperCase(Locale.ROOT));
270-
}
271-
272-
public enum ShareGroupAutoOffsetReset {
273-
LATEST, EARLIEST;
274-
275-
@Override
276-
public String toString() {
277-
return super.toString().toLowerCase(Locale.ROOT);
278-
}
271+
public ShareGroupAutoOffsetResetStrategy shareAutoOffsetReset() {
272+
return ShareGroupAutoOffsetResetStrategy.fromString(shareAutoOffsetReset);
279273
}
280274
}

0 commit comments

Comments
 (0)