Skip to content

Commit 0969869

Browse files
[improve][broker] Support cleanup replication cluster and allowed cluster when cluster metadata teardown (apache#23561)
1 parent d7433d0 commit 0969869

File tree

2 files changed

+112
-5
lines changed

2 files changed

+112
-5
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@
1919
package org.apache.pulsar;
2020

2121
import com.google.protobuf.InvalidProtocolBufferException;
22+
import java.util.List;
2223
import java.util.Optional;
2324
import java.util.concurrent.CompletableFuture;
25+
import java.util.concurrent.ExecutionException;
2426
import java.util.stream.Collectors;
2527
import lombok.Cleanup;
2628
import org.apache.bookkeeper.client.BKException;
@@ -29,12 +31,18 @@
2931
import org.apache.bookkeeper.mledger.ManagedLedgerException;
3032
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
3133
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
34+
import org.apache.pulsar.broker.resources.NamespaceResources;
35+
import org.apache.pulsar.broker.resources.PulsarResources;
36+
import org.apache.pulsar.broker.resources.TenantResources;
3237
import org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator;
38+
import org.apache.pulsar.common.naming.NamespaceName;
3339
import org.apache.pulsar.common.naming.TopicName;
40+
import org.apache.pulsar.common.policies.data.TenantInfo;
3441
import org.apache.pulsar.common.util.FutureUtil;
3542
import org.apache.pulsar.docs.tools.CmdGenerateDocs;
3643
import org.apache.pulsar.metadata.api.MetadataStore;
3744
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
45+
import org.apache.pulsar.metadata.api.MetadataStoreException;
3846
import org.apache.pulsar.metadata.api.MetadataStoreFactory;
3947
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
4048
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
@@ -153,12 +161,45 @@ public static void main(String[] args) throws Exception {
153161
MetadataStoreConfig.builder().sessionTimeoutMillis(arguments.zkSessionTimeoutMillis)
154162
.configFilePath(arguments.configurationStoreConfigPath)
155163
.metadataStoreName(MetadataStoreConfig.CONFIGURATION_METADATA_STORE).build());
156-
deleteRecursively(configMetadataStore, "/admin/clusters/" + arguments.cluster).join();
164+
PulsarResources resources = new PulsarResources(metadataStore, configMetadataStore);
165+
// Cleanup replication cluster from all tenants and namespaces
166+
TenantResources tenantResources = resources.getTenantResources();
167+
NamespaceResources namespaceResources = resources.getNamespaceResources();
168+
List<String> tenants = tenantResources.listTenants();
169+
for (String tenant : tenants) {
170+
List<String> namespaces = namespaceResources.listNamespacesAsync(tenant).get();
171+
for (String namespace : namespaces) {
172+
namespaceResources.setPolicies(NamespaceName.get(tenant, namespace), policies -> {
173+
policies.replication_clusters.remove(arguments.cluster);
174+
return policies;
175+
});
176+
}
177+
removeCurrentClusterFromAllowedClusters(tenantResources, tenant, arguments.cluster);
178+
}
179+
try {
180+
resources.getClusterResources().deleteCluster(arguments.cluster);
181+
} catch (MetadataStoreException.NotFoundException ex) {
182+
// Ignore if the cluster does not exist
183+
log.info("Cluster metadata for '{}' does not exist.", arguments.cluster);
184+
}
157185
}
158186

159187
log.info("Cluster metadata for '{}' teardown.", arguments.cluster);
160188
}
161189

190+
private static void removeCurrentClusterFromAllowedClusters(
191+
TenantResources tenantResources, String tenant, String curCluster)
192+
throws MetadataStoreException, InterruptedException, ExecutionException {
193+
Optional<TenantInfo> tenantInfoOptional = tenantResources.getTenant(tenant);
194+
if (tenantInfoOptional.isEmpty()) {
195+
return;
196+
}
197+
tenantResources.updateTenantAsync(tenant, ti -> {
198+
ti.getAllowedClusters().remove(curCluster);
199+
return ti;
200+
}).get();
201+
}
202+
162203
private static CompletableFuture<Void> deleteRecursively(MetadataStore metadataStore, String path) {
163204
return metadataStore.getChildren(path)
164205
.thenCompose(children -> FutureUtil.waitForAll(

pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ClusterMetadataTeardownTest.java

Lines changed: 70 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static org.testng.Assert.assertEquals;
2222
import static org.testng.Assert.assertFalse;
2323
import static org.testng.Assert.assertNotNull;
24+
import static org.testng.Assert.assertTrue;
2425
import java.util.SortedMap;
2526
import org.apache.pulsar.PulsarClusterMetadataSetup;
2627
import org.apache.pulsar.PulsarClusterMetadataTeardown;
@@ -54,7 +55,7 @@ void cleanup() {
5455
@Test
5556
public void testSetupClusterMetadataAndTeardown() throws Exception {
5657
String[] args1 = {
57-
"--cluster", "testReSetupClusterMetadata-cluster",
58+
"--cluster", "cluster1",
5859
"--zookeeper", "127.0.0.1:" + localZkS.getZookeeperPort(),
5960
"--configuration-store", "127.0.0.1:" + localZkS.getZookeeperPort(),
6061
"--configuration-metadata-store-config-path", "src/test/resources/conf/zk_client_enable_sasl.conf",
@@ -65,7 +66,7 @@ public void testSetupClusterMetadataAndTeardown() throws Exception {
6566
};
6667
PulsarClusterMetadataSetup.main(args1);
6768
SortedMap<String, String> data1 = localZkS.dumpData();
68-
String clusterDataJson = data1.get("/admin/clusters/testReSetupClusterMetadata-cluster");
69+
String clusterDataJson = data1.get("/admin/clusters/cluster1");
6970
assertNotNull(clusterDataJson);
7071
ClusterData clusterData = ObjectMapperFactory
7172
.getMapper()
@@ -78,13 +79,78 @@ public void testSetupClusterMetadataAndTeardown() throws Exception {
7879
assertFalse(clusterData.isBrokerClientTlsEnabled());
7980

8081
String[] args2 = {
81-
"--cluster", "testReSetupClusterMetadata-cluster",
82+
"--cluster", "cluster1",
8283
"--zookeeper", "127.0.0.1:" + localZkS.getZookeeperPort(),
8384
"--configuration-store", "127.0.0.1:" + localZkS.getZookeeperPort(),
8485
"--configuration-metadata-store-config-path", "src/test/resources/conf/zk_client_enable_sasl.conf",
8586
};
8687
PulsarClusterMetadataTeardown.main(args2);
8788
SortedMap<String, String> data2 = localZkS.dumpData();
88-
assertFalse(data2.containsKey("/admin/clusters/testReSetupClusterMetadata-cluster"));
89+
assertFalse(data2.containsKey("/admin/clusters/cluster1"));
90+
}
91+
92+
@Test
93+
public void testSetupMultipleClusterMetadataAndTeardown() throws Exception {
94+
String[] cluster1Args = {
95+
"--cluster", "cluster1",
96+
"--zookeeper", "127.0.0.1:" + localZkS.getZookeeperPort(),
97+
"--configuration-store", "127.0.0.1:" + localZkS.getZookeeperPort(),
98+
"--configuration-metadata-store-config-path", "src/test/resources/conf/zk_client_enable_sasl.conf",
99+
"--web-service-url", "http://127.0.0.1:8080",
100+
"--web-service-url-tls", "https://127.0.0.1:8443",
101+
"--broker-service-url", "pulsar://127.0.0.1:6650",
102+
"--broker-service-url-tls", "pulsar+ssl://127.0.0.1:6651"
103+
};
104+
PulsarClusterMetadataSetup.main(cluster1Args);
105+
String[] cluster2Args = {
106+
"--cluster", "cluster2",
107+
"--zookeeper", "127.0.0.1:" + localZkS.getZookeeperPort(),
108+
"--configuration-store", "127.0.0.1:" + localZkS.getZookeeperPort(),
109+
"--configuration-metadata-store-config-path", "src/test/resources/conf/zk_client_enable_sasl.conf",
110+
"--web-service-url", "http://127.0.0.1:8081",
111+
"--web-service-url-tls", "https://127.0.0.1:8445",
112+
"--broker-service-url", "pulsar://127.0.0.1:6651",
113+
"--broker-service-url-tls", "pulsar+ssl://127.0.0.1:6652"
114+
};
115+
PulsarClusterMetadataSetup.main(cluster2Args);
116+
SortedMap<String, String> data1 = localZkS.dumpData();
117+
String clusterDataJson = data1.get("/admin/clusters/cluster1");
118+
assertNotNull(clusterDataJson);
119+
ClusterData clusterData = ObjectMapperFactory
120+
.getMapper()
121+
.reader()
122+
.readValue(clusterDataJson, ClusterData.class);
123+
assertEquals(clusterData.getServiceUrl(), "http://127.0.0.1:8080");
124+
assertEquals(clusterData.getServiceUrlTls(), "https://127.0.0.1:8443");
125+
assertEquals(clusterData.getBrokerServiceUrl(), "pulsar://127.0.0.1:6650");
126+
assertEquals(clusterData.getBrokerServiceUrlTls(), "pulsar+ssl://127.0.0.1:6651");
127+
assertFalse(clusterData.isBrokerClientTlsEnabled());
128+
129+
String[] args2 = {
130+
"--cluster", "cluster1",
131+
"--zookeeper", "127.0.0.1:" + localZkS.getZookeeperPort(),
132+
"--configuration-store", "127.0.0.1:" + localZkS.getZookeeperPort(),
133+
"--configuration-metadata-store-config-path", "src/test/resources/conf/zk_client_enable_sasl.conf",
134+
};
135+
PulsarClusterMetadataTeardown.main(args2);
136+
SortedMap<String, String> data2 = localZkS.dumpData();
137+
assertFalse(data2.containsKey("/admin/clusters/cluster1"));
138+
assertTrue(data2.containsKey("/admin/clusters/cluster2"));
139+
140+
assertTrue(data2.containsKey("/admin/policies/public"));
141+
assertFalse(data2.get("/admin/policies/public").contains("cluster1"));
142+
assertTrue(data2.get("/admin/policies/public").contains("cluster2"));
143+
144+
assertTrue(data2.containsKey("/admin/policies/pulsar"));
145+
assertFalse(data2.get("/admin/policies/pulsar").contains("cluster1"));
146+
assertTrue(data2.get("/admin/policies/pulsar").contains("cluster2"));
147+
148+
assertTrue(data2.containsKey("/admin/policies/public/default"));
149+
assertFalse(data2.get("/admin/policies/public/default").contains("cluster1"));
150+
assertTrue(data2.get("/admin/policies/public/default").contains("cluster2"));
151+
152+
assertTrue(data2.containsKey("/admin/policies/pulsar/system"));
153+
assertFalse(data2.get("/admin/policies/pulsar/system").contains("cluster1"));
154+
assertTrue(data2.get("/admin/policies/pulsar/system").contains("cluster2"));
89155
}
90156
}

0 commit comments

Comments
 (0)