Skip to content

Commit e30a07d

Browse files
committed
fix: prevent schema refresh deadlock when newInstance() or agreement check fails
When SchemaQueries.newInstance() throws or schema agreement check fails, the cleanup code (resetting currentSchemaRefresh, completing firstSchemaRefreshFuture, draining queuedSchemaRefresh) was never executed, permanently blocking all future schema refreshes for the session. Extract cleanup into onSchemaRefreshComplete() and call it from all three completion paths: agreement error, newInstance() exception, and normal completion. Fixes: #841
1 parent 3b9f989 commit e30a07d

2 files changed

Lines changed: 107 additions & 11 deletions

File tree

core/src/main/java/com/datastax/oss/driver/internal/core/metadata/MetadataManager.java

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -446,6 +446,7 @@ private void startSchemaRequest(CompletableFuture<RefreshSchemaResult> refreshFu
446446
(schemaInAgreement, agreementError) -> {
447447
if (agreementError != null) {
448448
refreshFuture.completeExceptionally(agreementError);
449+
onSchemaRefreshComplete();
449450
} else {
450451
try {
451452
schemaQueriesFactory
@@ -460,21 +461,12 @@ private void startSchemaRequest(CompletableFuture<RefreshSchemaResult> refreshFu
460461
refreshFuture.complete(
461462
new RefreshSchemaResult(newMetadata, schemaInAgreement));
462463
}
463-
464-
firstSchemaRefreshFuture.complete(null);
465-
466-
currentSchemaRefresh = null;
467-
// If another refresh was enqueued during this one, run it now
468-
if (queuedSchemaRefresh != null) {
469-
CompletableFuture<RefreshSchemaResult> tmp =
470-
this.queuedSchemaRefresh;
471-
this.queuedSchemaRefresh = null;
472-
startSchemaRequest(tmp);
473-
}
464+
onSchemaRefreshComplete();
474465
});
475466
} catch (Throwable t) {
476467
LOG.debug("[{}] Exception getting new metadata", logPrefix, t);
477468
refreshFuture.completeExceptionally(t);
469+
onSchemaRefreshComplete();
478470
}
479471
}
480472
});
@@ -486,6 +478,18 @@ private void startSchemaRequest(CompletableFuture<RefreshSchemaResult> refreshFu
486478
}
487479
}
488480

481+
private void onSchemaRefreshComplete() {
482+
assert adminExecutor.inEventLoop();
483+
firstSchemaRefreshFuture.complete(null);
484+
currentSchemaRefresh = null;
485+
// If another refresh was enqueued during this one, run it now
486+
if (queuedSchemaRefresh != null) {
487+
CompletableFuture<RefreshSchemaResult> tmp = this.queuedSchemaRefresh;
488+
this.queuedSchemaRefresh = null;
489+
startSchemaRequest(tmp);
490+
}
491+
}
492+
489493
// To query schema tables, we need the control connection.
490494
// Normally that the topology monitor has already initialized it to query node tables. But if a
491495
// custom topology monitor is in place, it might not use the control connection at all.

core/src/test/java/com/datastax/oss/driver/internal/core/metadata/MetadataManagerTest.java

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import com.datastax.oss.driver.internal.core.metadata.schema.parsing.SchemaParserFactory;
3939
import com.datastax.oss.driver.internal.core.metadata.schema.queries.SchemaQueriesFactory;
4040
import com.datastax.oss.driver.internal.core.metrics.MetricsFactory;
41+
import com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures;
4142
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList;
4243
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableSet;
4344
import io.netty.channel.DefaultEventLoopGroup;
@@ -313,6 +314,97 @@ public void refreshSchema_should_work() {
313314
assertThatStage(result).isFailed(t -> assertThat(t).isEqualTo(expectedException));
314315
}
315316

317+
@Test
318+
public void refreshSchema_should_recover_after_newInstance_failure() {
319+
// Given
320+
IllegalStateException expectedException = new IllegalStateException("Error we're testing");
321+
when(schemaQueriesFactory.newInstance()).thenThrow(expectedException);
322+
when(topologyMonitor.refreshNodeList())
323+
.thenReturn(CompletableFuture.completedFuture(ImmutableList.of(mock(NodeInfo.class))));
324+
when(topologyMonitor.checkSchemaAgreement())
325+
.thenReturn(CompletableFuture.completedFuture(Boolean.TRUE));
326+
when(controlConnection.init(anyBoolean(), anyBoolean(), anyBoolean()))
327+
.thenReturn(CompletableFuture.completedFuture(null));
328+
metadataManager.refreshNodes();
329+
waitForPendingAdminTasks(() -> metadataManager.refreshes.size() == 1);
330+
331+
// First refresh fails
332+
CompletionStage<MetadataManager.RefreshSchemaResult> result1 =
333+
metadataManager.refreshSchema("foo", true, true);
334+
waitForPendingAdminTasks(() -> result1.toCompletableFuture().isDone());
335+
assertThatStage(result1).isFailed(t -> assertThat(t).isEqualTo(expectedException));
336+
337+
// When - second refresh should not be stuck (uses same throwing mock)
338+
CompletionStage<MetadataManager.RefreshSchemaResult> result2 =
339+
metadataManager.refreshSchema("bar", true, true);
340+
341+
// Then - should complete (not hang forever), proving currentSchemaRefresh was cleared
342+
waitForPendingAdminTasks(() -> result2.toCompletableFuture().isDone());
343+
assertThatStage(result2).isFailed(t -> assertThat(t).isEqualTo(expectedException));
344+
}
345+
346+
@Test
347+
public void refreshSchema_should_recover_after_agreement_error() {
348+
// Given
349+
RuntimeException agreementException = new RuntimeException("Agreement check failed");
350+
when(topologyMonitor.refreshNodeList())
351+
.thenReturn(CompletableFuture.completedFuture(ImmutableList.of(mock(NodeInfo.class))));
352+
when(topologyMonitor.checkSchemaAgreement())
353+
.thenReturn(CompletableFutures.failedFuture(agreementException));
354+
when(controlConnection.init(anyBoolean(), anyBoolean(), anyBoolean()))
355+
.thenReturn(CompletableFuture.completedFuture(null));
356+
metadataManager.refreshNodes();
357+
waitForPendingAdminTasks(() -> metadataManager.refreshes.size() == 1);
358+
359+
// First refresh fails due to agreement error
360+
CompletionStage<MetadataManager.RefreshSchemaResult> result1 =
361+
metadataManager.refreshSchema("foo", true, true);
362+
waitForPendingAdminTasks(() -> result1.toCompletableFuture().isDone());
363+
assertThatStage(result1).isFailed();
364+
365+
// When - second refresh should not be stuck (uses same failing mock)
366+
CompletionStage<MetadataManager.RefreshSchemaResult> result2 =
367+
metadataManager.refreshSchema("bar", true, true);
368+
369+
// Then - should complete (not hang forever), proving currentSchemaRefresh was cleared
370+
waitForPendingAdminTasks(() -> result2.toCompletableFuture().isDone());
371+
assertThatStage(result2).isFailed();
372+
}
373+
374+
@Test
375+
public void refreshSchema_should_drain_queued_refresh_after_newInstance_failure() {
376+
// Given
377+
IllegalStateException expectedException = new IllegalStateException("Error we're testing");
378+
when(schemaQueriesFactory.newInstance()).thenThrow(expectedException);
379+
when(topologyMonitor.refreshNodeList())
380+
.thenReturn(CompletableFuture.completedFuture(ImmutableList.of(mock(NodeInfo.class))));
381+
// Use a future we control to introduce a delay in schema agreement check
382+
CompletableFuture<Boolean> agreementFuture = new CompletableFuture<>();
383+
when(topologyMonitor.checkSchemaAgreement()).thenReturn(agreementFuture);
384+
when(controlConnection.init(anyBoolean(), anyBoolean(), anyBoolean()))
385+
.thenReturn(CompletableFuture.completedFuture(null));
386+
metadataManager.refreshNodes();
387+
waitForPendingAdminTasks(() -> metadataManager.refreshes.size() == 1);
388+
389+
// Start first refresh (it will block on agreement check)
390+
CompletionStage<MetadataManager.RefreshSchemaResult> result1 =
391+
metadataManager.refreshSchema("foo", true, true);
392+
393+
// Queue a second refresh while the first is in progress
394+
CompletionStage<MetadataManager.RefreshSchemaResult> result2 =
395+
metadataManager.refreshSchema("bar", true, true);
396+
397+
// Now complete the agreement check - first refresh will fail at newInstance(),
398+
// and onSchemaRefreshComplete should drain the queued second refresh
399+
agreementFuture.complete(true);
400+
401+
// Then both requests should complete (not hang forever)
402+
waitForPendingAdminTasks(
403+
() -> result1.toCompletableFuture().isDone() && result2.toCompletableFuture().isDone());
404+
assertThatStage(result1).isFailed(t -> assertThat(t).isEqualTo(expectedException));
405+
assertThatStage(result2).isFailed(t -> assertThat(t).isEqualTo(expectedException));
406+
}
407+
316408
private static class TestMetadataManager extends MetadataManager {
317409

318410
private List<MetadataRefresh> refreshes = new CopyOnWriteArrayList<>();

0 commit comments

Comments
 (0)