Skip to content

Commit ce3429c

Browse files
lhotariclaude
andauthored
[fix][test] Fix flaky OneWayReplicatorUsingGlobalZKTest.cleanup (#25389)
Co-authored-by: Claude Opus 4.6 (1M context) <[email protected]>
1 parent ba851d3 commit ce3429c

1 file changed

Lines changed: 24 additions & 3 deletions

File tree

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -303,8 +303,12 @@ protected void cleanupPulsarResources() throws Exception {
303303
// delete namespaces.
304304
waitChangeEventsInit(replicatedNamespace);
305305
admin1.namespaces().setNamespaceReplicationClusters(replicatedNamespace, Sets.newHashSet(cluster1), true);
306+
admin1.namespaces().setNamespaceReplicationClusters(
307+
sourceClusterAlwaysSchemaCompatibleNamespace, Sets.newHashSet(cluster1), true);
306308
if (!usingGlobalZK) {
307309
admin2.namespaces().setNamespaceReplicationClusters(replicatedNamespace, Sets.newHashSet(cluster2), true);
310+
admin2.namespaces().setNamespaceReplicationClusters(
311+
sourceClusterAlwaysSchemaCompatibleNamespace, Sets.newHashSet(cluster2), true);
308312
}
309313
// When using global ZK, reducing replication clusters triggers async topic cleanup on removed clusters.
310314
// Retry namespace deletion to handle topics that may be in a transitional state.
@@ -314,16 +318,33 @@ protected void cleanupPulsarResources() throws Exception {
314318
Awaitility.await().atMost(Duration.ofSeconds(30)).ignoreExceptions().untilAsserted(() -> {
315319
admin1.namespaces().deleteNamespace(nonReplicatedNamespace, true);
316320
});
321+
Awaitility.await().atMost(Duration.ofSeconds(30)).ignoreExceptions().untilAsserted(() -> {
322+
admin1.namespaces().deleteNamespace(sourceClusterAlwaysSchemaCompatibleNamespace, true);
323+
});
317324
if (!usingGlobalZK) {
318-
admin2.namespaces().deleteNamespace(replicatedNamespace, true);
319-
admin2.namespaces().deleteNamespace(nonReplicatedNamespace, true);
325+
Awaitility.await().atMost(Duration.ofSeconds(30)).ignoreExceptions().untilAsserted(() -> {
326+
admin2.namespaces().deleteNamespace(replicatedNamespace, true);
327+
});
328+
Awaitility.await().atMost(Duration.ofSeconds(30)).ignoreExceptions().untilAsserted(() -> {
329+
admin2.namespaces().deleteNamespace(nonReplicatedNamespace, true);
330+
});
331+
Awaitility.await().atMost(Duration.ofSeconds(30)).ignoreExceptions().untilAsserted(() -> {
332+
admin2.namespaces().deleteNamespace(sourceClusterAlwaysSchemaCompatibleNamespace, true);
333+
});
320334
}
321335
}
322336

323337
@Override
324338
protected void cleanup() throws Exception {
325339
// cleanup pulsar resources.
326-
cleanupPulsarResources();
340+
// Wrap in try-catch to ensure brokers, ZK, and BK are always shut down even if
341+
// namespace deletion fails (e.g., topics in transitional state during async replication cleanup).
342+
try {
343+
cleanupPulsarResources();
344+
} catch (Exception e) {
345+
log.warn("Failed to cleanup Pulsar resources during shutdown, "
346+
+ "continuing with broker/ZK/BK shutdown", e);
347+
}
327348

328349
// shutdown.
329350
markCurrentSetupNumberCleaned();

0 commit comments

Comments
 (0)