@@ -303,8 +303,12 @@ protected void cleanupPulsarResources() throws Exception {
303303 // delete namespaces.
304304 waitChangeEventsInit (replicatedNamespace );
305305 admin1 .namespaces ().setNamespaceReplicationClusters (replicatedNamespace , Sets .newHashSet (cluster1 ), true );
306+ admin1 .namespaces ().setNamespaceReplicationClusters (
307+ sourceClusterAlwaysSchemaCompatibleNamespace , Sets .newHashSet (cluster1 ), true );
306308 if (!usingGlobalZK ) {
307309 admin2 .namespaces ().setNamespaceReplicationClusters (replicatedNamespace , Sets .newHashSet (cluster2 ), true );
310+ admin2 .namespaces ().setNamespaceReplicationClusters (
311+ sourceClusterAlwaysSchemaCompatibleNamespace , Sets .newHashSet (cluster2 ), true );
308312 }
309313 // When using global ZK, reducing replication clusters triggers async topic cleanup on removed clusters.
310314 // Retry namespace deletion to handle topics that may be in a transitional state.
@@ -314,16 +318,33 @@ protected void cleanupPulsarResources() throws Exception {
314318 Awaitility .await ().atMost (Duration .ofSeconds (30 )).ignoreExceptions ().untilAsserted (() -> {
315319 admin1 .namespaces ().deleteNamespace (nonReplicatedNamespace , true );
316320 });
321+ Awaitility .await ().atMost (Duration .ofSeconds (30 )).ignoreExceptions ().untilAsserted (() -> {
322+ admin1 .namespaces ().deleteNamespace (sourceClusterAlwaysSchemaCompatibleNamespace , true );
323+ });
317324 if (!usingGlobalZK ) {
318- admin2 .namespaces ().deleteNamespace (replicatedNamespace , true );
319- admin2 .namespaces ().deleteNamespace (nonReplicatedNamespace , true );
325+ Awaitility .await ().atMost (Duration .ofSeconds (30 )).ignoreExceptions ().untilAsserted (() -> {
326+ admin2 .namespaces ().deleteNamespace (replicatedNamespace , true );
327+ });
328+ Awaitility .await ().atMost (Duration .ofSeconds (30 )).ignoreExceptions ().untilAsserted (() -> {
329+ admin2 .namespaces ().deleteNamespace (nonReplicatedNamespace , true );
330+ });
331+ Awaitility .await ().atMost (Duration .ofSeconds (30 )).ignoreExceptions ().untilAsserted (() -> {
332+ admin2 .namespaces ().deleteNamespace (sourceClusterAlwaysSchemaCompatibleNamespace , true );
333+ });
320334 }
321335 }
322336
323337 @ Override
324338 protected void cleanup () throws Exception {
325339 // cleanup pulsar resources.
326- cleanupPulsarResources ();
340+ // Wrap in try-catch to ensure brokers, ZK, and BK are always shut down even if
341+ // namespace deletion fails (e.g., topics in transitional state during async replication cleanup).
342+ try {
343+ cleanupPulsarResources ();
344+ } catch (Exception e ) {
345+ log .warn ("Failed to cleanup Pulsar resources during shutdown, "
346+ + "continuing with broker/ZK/BK shutdown" , e );
347+ }
327348
328349 // shutdown.
329350 markCurrentSetupNumberCleaned ();
0 commit comments