|
46 | 46 | import org.apache.kafka.common.utils.Time; |
47 | 47 | import org.apache.kafka.coordinator.group.GroupConfig; |
48 | 48 | import org.apache.kafka.coordinator.group.GroupConfigManager; |
| 49 | +import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy; |
49 | 50 | import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch; |
50 | 51 | import org.apache.kafka.server.share.fetch.ShareAcquiredRecords; |
51 | 52 | import org.apache.kafka.server.share.persister.NoOpShareStatePersister; |
@@ -215,7 +216,7 @@ public void testMaybeInitializeDefaultStartEpochGroupConfigReturnsEarliest() { |
215 | 216 | GroupConfigManager groupConfigManager = Mockito.mock(GroupConfigManager.class); |
216 | 217 | GroupConfig groupConfig = Mockito.mock(GroupConfig.class); |
217 | 218 | Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig)); |
218 | | - Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(GroupConfig.ShareGroupAutoOffsetReset.EARLIEST); |
| 219 | + Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(ShareGroupAutoOffsetResetStrategy.EARLIEST); |
219 | 220 |
|
220 | 221 | ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class); |
221 | 222 |
|
@@ -265,7 +266,7 @@ public void testMaybeInitializeDefaultStartEpochGroupConfigReturnsLatest() { |
265 | 266 | GroupConfigManager groupConfigManager = Mockito.mock(GroupConfigManager.class); |
266 | 267 | GroupConfig groupConfig = Mockito.mock(GroupConfig.class); |
267 | 268 | Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig)); |
268 | | - Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(GroupConfig.ShareGroupAutoOffsetReset.LATEST); |
| 269 | + Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(ShareGroupAutoOffsetResetStrategy.LATEST); |
269 | 270 |
|
270 | 271 | ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class); |
271 | 272 |
|
@@ -298,6 +299,64 @@ public void testMaybeInitializeDefaultStartEpochGroupConfigReturnsLatest() { |
298 | 299 | assertEquals(PartitionFactory.DEFAULT_STATE_EPOCH, sharePartition.stateEpoch()); |
299 | 300 | } |
300 | 301 |
|
| 302 | + @Test |
| 303 | + public void testMaybeInitializeDefaultStartEpochGroupConfigReturnsByDuration() { |
| 304 | + Persister persister = Mockito.mock(Persister.class); |
| 305 | + ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class); |
| 306 | + Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList( |
| 307 | + new TopicData<>(TOPIC_ID_PARTITION.topicId(), Collections.singletonList( |
| 308 | + PartitionFactory.newPartitionAllData( |
| 309 | + 0, PartitionFactory.DEFAULT_STATE_EPOCH, |
| 310 | + PartitionFactory.UNINITIALIZED_START_OFFSET, |
| 311 | + PartitionFactory.DEFAULT_ERROR_CODE, |
| 312 | + PartitionFactory.DEFAULT_ERR_MESSAGE, |
| 313 | + Collections.emptyList()))))); |
| 314 | + Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult)); |
| 315 | + |
| 316 | + GroupConfigManager groupConfigManager = Mockito.mock(GroupConfigManager.class); |
| 317 | + GroupConfig groupConfig = Mockito.mock(GroupConfig.class); |
| 318 | + Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig)); |
| 319 | + |
| 320 | + // Since the timestamp() of duration based strategy is not deterministic, we need to mock the ShareGroupAutoOffsetResetStrategy. |
| 321 | + // mock: final ShareGroupAutoOffsetResetStrategy resetStrategy = ShareGroupAutoOffsetResetStrategy.fromString("by_duration:PT1H"); |
| 322 | + final ShareGroupAutoOffsetResetStrategy resetStrategy = Mockito.mock(ShareGroupAutoOffsetResetStrategy.class); |
| 323 | + final long expectedTimestamp = MOCK_TIME.milliseconds() - TimeUnit.HOURS.toMillis(1); |
| 324 | + Mockito.when(resetStrategy.type()).thenReturn(ShareGroupAutoOffsetResetStrategy.StrategyType.BY_DURATION); |
| 325 | + Mockito.when(resetStrategy.timestamp()).thenReturn(expectedTimestamp); |
| 326 | + |
| 327 | + Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(resetStrategy); |
| 328 | + |
| 329 | + ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class); |
| 330 | + |
| 331 | + FileRecords.TimestampAndOffset timestampAndOffset = new FileRecords.TimestampAndOffset(MOCK_TIME.milliseconds() - TimeUnit.HOURS.toMillis(1), 15L, Optional.empty()); |
| 332 | + Mockito.doReturn(new OffsetResultHolder(Option.apply(timestampAndOffset), Option.empty())). |
| 333 | + when(replicaManager).fetchOffsetForTimestamp(Mockito.any(TopicPartition.class), Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean()); |
| 334 | + |
| 335 | + SharePartition sharePartition = SharePartitionBuilder.builder() |
| 336 | + .withPersister(persister) |
| 337 | + .withGroupConfigManager(groupConfigManager) |
| 338 | + .withReplicaManager(replicaManager) |
| 339 | + .build(); |
| 340 | + |
| 341 | + CompletableFuture<Void> result = sharePartition.maybeInitialize(); |
| 342 | + assertTrue(result.isDone()); |
| 343 | + assertFalse(result.isCompletedExceptionally()); |
| 344 | + |
| 345 | + // replicaManager.fetchOffsetForTimestamp should be called with the (current time - 1 hour) |
| 346 | + Mockito.verify(replicaManager).fetchOffsetForTimestamp( |
| 347 | + Mockito.any(TopicPartition.class), |
| 348 | + Mockito.eq(expectedTimestamp), |
| 349 | + Mockito.any(), |
| 350 | + Mockito.any(), |
| 351 | + Mockito.anyBoolean() |
| 352 | + ); |
| 353 | + |
| 354 | + assertEquals(SharePartitionState.ACTIVE, sharePartition.partitionState()); |
| 355 | + assertEquals(15, sharePartition.startOffset()); |
| 356 | + assertEquals(15, sharePartition.endOffset()); |
| 357 | + assertEquals(PartitionFactory.DEFAULT_STATE_EPOCH, sharePartition.stateEpoch()); |
| 358 | + } |
| 359 | + |
301 | 360 | @Test |
302 | 361 | public void testMaybeInitializeDefaultStartEpochGroupConfigNotPresent() { |
303 | 362 | Persister persister = Mockito.mock(Persister.class); |
@@ -407,7 +466,7 @@ public void testMaybeInitializeFetchOffsetForEarliestTimestampThrowsError() { |
407 | 466 | GroupConfigManager groupConfigManager = Mockito.mock(GroupConfigManager.class); |
408 | 467 | GroupConfig groupConfig = Mockito.mock(GroupConfig.class); |
409 | 468 | Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig)); |
410 | | - Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(GroupConfig.ShareGroupAutoOffsetReset.EARLIEST); |
| 469 | + Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(ShareGroupAutoOffsetResetStrategy.EARLIEST); |
411 | 470 |
|
412 | 471 | ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class); |
413 | 472 |
|
@@ -436,6 +495,59 @@ public void testMaybeInitializeFetchOffsetForEarliestTimestampThrowsError() { |
436 | 495 | assertEquals(SharePartitionState.FAILED, sharePartition.partitionState()); |
437 | 496 | } |
438 | 497 |
|
| 498 | + @Test |
| 499 | + public void testMaybeInitializeFetchOffsetForByDurationThrowsError() { |
| 500 | + Persister persister = Mockito.mock(Persister.class); |
| 501 | + ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class); |
| 502 | + Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList( |
| 503 | + new TopicData<>(TOPIC_ID_PARTITION.topicId(), Collections.singletonList( |
| 504 | + PartitionFactory.newPartitionAllData( |
| 505 | + 0, PartitionFactory.DEFAULT_STATE_EPOCH, |
| 506 | + PartitionFactory.UNINITIALIZED_START_OFFSET, |
| 507 | + PartitionFactory.DEFAULT_ERROR_CODE, |
| 508 | + PartitionFactory.DEFAULT_ERR_MESSAGE, |
| 509 | + Collections.emptyList()))))); |
| 510 | + Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult)); |
| 511 | + |
| 512 | + GroupConfigManager groupConfigManager = Mockito.mock(GroupConfigManager.class); |
| 513 | + GroupConfig groupConfig = Mockito.mock(GroupConfig.class); |
| 514 | + Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig)); |
| 515 | + |
| 516 | + // We need to mock the ShareGroupAutoOffsetResetStrategy as the timestamp() of duration based strategy is not deterministic. |
| 517 | + // final ShareGroupAutoOffsetResetStrategy resetStrategy = ShareGroupAutoOffsetResetStrategy.fromString("by_duration:PT1H"); |
| 518 | + final ShareGroupAutoOffsetResetStrategy resetStrategy = Mockito.mock(ShareGroupAutoOffsetResetStrategy.class); |
| 519 | + final long expectedTimestamp = MOCK_TIME.milliseconds() - TimeUnit.HOURS.toMillis(1); |
| 520 | + Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(resetStrategy); |
| 521 | + |
| 522 | + Mockito.when(resetStrategy.type()).thenReturn(ShareGroupAutoOffsetResetStrategy.StrategyType.BY_DURATION); |
| 523 | + Mockito.when(resetStrategy.timestamp()).thenReturn(expectedTimestamp); |
| 524 | + |
| 525 | + ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class); |
| 526 | + |
| 527 | + Mockito.when(replicaManager.fetchOffsetForTimestamp(Mockito.any(TopicPartition.class), Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean())) |
| 528 | + .thenThrow(new RuntimeException("fetch offsets exception")); |
| 529 | + |
| 530 | + SharePartition sharePartition = SharePartitionBuilder.builder() |
| 531 | + .withPersister(persister) |
| 532 | + .withGroupConfigManager(groupConfigManager) |
| 533 | + .withReplicaManager(replicaManager) |
| 534 | + .build(); |
| 535 | + |
| 536 | + CompletableFuture<Void> result = sharePartition.maybeInitialize(); |
| 537 | + assertTrue(result.isDone()); |
| 538 | + assertTrue(result.isCompletedExceptionally()); |
| 539 | + |
| 540 | + Mockito.verify(replicaManager).fetchOffsetForTimestamp( |
| 541 | + Mockito.any(TopicPartition.class), |
| 542 | + Mockito.eq(expectedTimestamp), |
| 543 | + Mockito.any(), |
| 544 | + Mockito.any(), |
| 545 | + Mockito.anyBoolean() |
| 546 | + ); |
| 547 | + |
| 548 | + assertEquals(SharePartitionState.FAILED, sharePartition.partitionState()); |
| 549 | + } |
| 550 | + |
439 | 551 | @Test |
440 | 552 | public void testMaybeInitializeSharePartitionAgain() { |
441 | 553 | Persister persister = Mockito.mock(Persister.class); |
|
0 commit comments