|
38 | 38 | import com.datastax.oss.driver.internal.core.metadata.schema.parsing.SchemaParserFactory; |
39 | 39 | import com.datastax.oss.driver.internal.core.metadata.schema.queries.SchemaQueriesFactory; |
40 | 40 | import com.datastax.oss.driver.internal.core.metrics.MetricsFactory; |
| 41 | +import com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures; |
41 | 42 | import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList; |
42 | 43 | import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableSet; |
43 | 44 | import io.netty.channel.DefaultEventLoopGroup; |
@@ -313,6 +314,97 @@ public void refreshSchema_should_work() { |
313 | 314 | assertThatStage(result).isFailed(t -> assertThat(t).isEqualTo(expectedException)); |
314 | 315 | } |
315 | 316 |
|
| 317 | + @Test |
| 318 | + public void refreshSchema_should_recover_after_newInstance_failure() { |
| 319 | + // Given |
| 320 | + IllegalStateException expectedException = new IllegalStateException("Error we're testing"); |
| 321 | + when(schemaQueriesFactory.newInstance()).thenThrow(expectedException); |
| 322 | + when(topologyMonitor.refreshNodeList()) |
| 323 | + .thenReturn(CompletableFuture.completedFuture(ImmutableList.of(mock(NodeInfo.class)))); |
| 324 | + when(topologyMonitor.checkSchemaAgreement()) |
| 325 | + .thenReturn(CompletableFuture.completedFuture(Boolean.TRUE)); |
| 326 | + when(controlConnection.init(anyBoolean(), anyBoolean(), anyBoolean())) |
| 327 | + .thenReturn(CompletableFuture.completedFuture(null)); |
| 328 | + metadataManager.refreshNodes(); |
| 329 | + waitForPendingAdminTasks(() -> metadataManager.refreshes.size() == 1); |
| 330 | + |
| 331 | + // First refresh fails |
| 332 | + CompletionStage<MetadataManager.RefreshSchemaResult> result1 = |
| 333 | + metadataManager.refreshSchema("foo", true, true); |
| 334 | + waitForPendingAdminTasks(() -> result1.toCompletableFuture().isDone()); |
| 335 | + assertThatStage(result1).isFailed(t -> assertThat(t).isEqualTo(expectedException)); |
| 336 | + |
| 337 | + // When - second refresh should not be stuck (uses same throwing mock) |
| 338 | + CompletionStage<MetadataManager.RefreshSchemaResult> result2 = |
| 339 | + metadataManager.refreshSchema("bar", true, true); |
| 340 | + |
| 341 | + // Then - should complete (not hang forever), proving currentSchemaRefresh was cleared |
| 342 | + waitForPendingAdminTasks(() -> result2.toCompletableFuture().isDone()); |
| 343 | + assertThatStage(result2).isFailed(t -> assertThat(t).isEqualTo(expectedException)); |
| 344 | + } |
| 345 | + |
| 346 | + @Test |
| 347 | + public void refreshSchema_should_recover_after_agreement_error() { |
| 348 | + // Given |
| 349 | + RuntimeException agreementException = new RuntimeException("Agreement check failed"); |
| 350 | + when(topologyMonitor.refreshNodeList()) |
| 351 | + .thenReturn(CompletableFuture.completedFuture(ImmutableList.of(mock(NodeInfo.class)))); |
| 352 | + when(topologyMonitor.checkSchemaAgreement()) |
| 353 | + .thenReturn(CompletableFutures.failedFuture(agreementException)); |
| 354 | + when(controlConnection.init(anyBoolean(), anyBoolean(), anyBoolean())) |
| 355 | + .thenReturn(CompletableFuture.completedFuture(null)); |
| 356 | + metadataManager.refreshNodes(); |
| 357 | + waitForPendingAdminTasks(() -> metadataManager.refreshes.size() == 1); |
| 358 | + |
| 359 | + // First refresh fails due to agreement error |
| 360 | + CompletionStage<MetadataManager.RefreshSchemaResult> result1 = |
| 361 | + metadataManager.refreshSchema("foo", true, true); |
| 362 | + waitForPendingAdminTasks(() -> result1.toCompletableFuture().isDone()); |
| 363 | + assertThatStage(result1).isFailed(); |
| 364 | + |
| 365 | + // When - second refresh should not be stuck (uses same failing mock) |
| 366 | + CompletionStage<MetadataManager.RefreshSchemaResult> result2 = |
| 367 | + metadataManager.refreshSchema("bar", true, true); |
| 368 | + |
| 369 | + // Then - should complete (not hang forever), proving currentSchemaRefresh was cleared |
| 370 | + waitForPendingAdminTasks(() -> result2.toCompletableFuture().isDone()); |
| 371 | + assertThatStage(result2).isFailed(); |
| 372 | + } |
| 373 | + |
| 374 | + @Test |
| 375 | + public void refreshSchema_should_drain_queued_refresh_after_newInstance_failure() { |
| 376 | + // Given |
| 377 | + IllegalStateException expectedException = new IllegalStateException("Error we're testing"); |
| 378 | + when(schemaQueriesFactory.newInstance()).thenThrow(expectedException); |
| 379 | + when(topologyMonitor.refreshNodeList()) |
| 380 | + .thenReturn(CompletableFuture.completedFuture(ImmutableList.of(mock(NodeInfo.class)))); |
| 381 | + // Use a future we control to introduce a delay in schema agreement check |
| 382 | + CompletableFuture<Boolean> agreementFuture = new CompletableFuture<>(); |
| 383 | + when(topologyMonitor.checkSchemaAgreement()).thenReturn(agreementFuture); |
| 384 | + when(controlConnection.init(anyBoolean(), anyBoolean(), anyBoolean())) |
| 385 | + .thenReturn(CompletableFuture.completedFuture(null)); |
| 386 | + metadataManager.refreshNodes(); |
| 387 | + waitForPendingAdminTasks(() -> metadataManager.refreshes.size() == 1); |
| 388 | + |
| 389 | + // Start first refresh (it will block on agreement check) |
| 390 | + CompletionStage<MetadataManager.RefreshSchemaResult> result1 = |
| 391 | + metadataManager.refreshSchema("foo", true, true); |
| 392 | + |
| 393 | + // Queue a second refresh while the first is in progress |
| 394 | + CompletionStage<MetadataManager.RefreshSchemaResult> result2 = |
| 395 | + metadataManager.refreshSchema("bar", true, true); |
| 396 | + |
| 397 | + // Now complete the agreement check - first refresh will fail at newInstance(), |
| 398 | + // and onSchemaRefreshComplete should drain the queued second refresh |
| 399 | + agreementFuture.complete(true); |
| 400 | + |
| 401 | + // Then both requests should complete (not hang forever) |
| 402 | + waitForPendingAdminTasks( |
| 403 | + () -> result1.toCompletableFuture().isDone() && result2.toCompletableFuture().isDone()); |
| 404 | + assertThatStage(result1).isFailed(t -> assertThat(t).isEqualTo(expectedException)); |
| 405 | + assertThatStage(result2).isFailed(t -> assertThat(t).isEqualTo(expectedException)); |
| 406 | + } |
| 407 | + |
316 | 408 | private static class TestMetadataManager extends MetadataManager { |
317 | 409 |
|
318 | 410 | private List<MetadataRefresh> refreshes = new CopyOnWriteArrayList<>(); |
|
0 commit comments