Skip to content

Commit 3604c67

Browse files
authored
make namespaces policy update take effect on time (#8976)
### Motivation The change of namespaces isolation policy takes effect only when load-manager re-assign the bundles to brokers again. This change tries to make the isolation policy takes effect on time. ### Modifications - change setNamespaceIsolationPolicy method into async. - add parameter to enable this feature: enableNamespaceIsolationUpdateOnTime. - add test to cover this feature. ### Verifying this change tests passed
1 parent a292b0a commit 3604c67

File tree

8 files changed

+199
-10
lines changed

8 files changed

+199
-10
lines changed

pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -882,6 +882,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
882882
doc = "List of interceptors for entry metadata.")
883883
private Set<String> brokerEntryMetadataInterceptors = new HashSet<>();
884884

885+
@FieldContext(
886+
category = CATEGORY_SERVER,
887+
doc = "Enable namespaceIsolation policy update take effect ontime or not," +
888+
" if set to ture, then the related namespaces will be unloaded after reset policy to make it take effect."
889+
)
890+
private boolean enableNamespaceIsolationUpdateOnTime = false;
891+
885892
/***** --- TLS --- ****/
886893
@FieldContext(
887894
category = CATEGORY_TLS,

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java

Lines changed: 106 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,19 +37,25 @@
3737
import java.util.Map;
3838
import java.util.Optional;
3939
import java.util.Set;
40+
import java.util.concurrent.CompletableFuture;
41+
import java.util.concurrent.atomic.AtomicInteger;
4042
import java.util.stream.Collectors;
4143
import javax.ws.rs.DELETE;
4244
import javax.ws.rs.GET;
4345
import javax.ws.rs.POST;
4446
import javax.ws.rs.PUT;
4547
import javax.ws.rs.Path;
4648
import javax.ws.rs.PathParam;
49+
import javax.ws.rs.container.AsyncResponse;
50+
import javax.ws.rs.container.Suspended;
4751
import javax.ws.rs.core.MediaType;
52+
import javax.ws.rs.core.Response;
4853
import javax.ws.rs.core.Response.Status;
4954
import org.apache.bookkeeper.util.ZkUtils;
5055
import org.apache.pulsar.broker.admin.AdminResource;
5156
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
5257
import org.apache.pulsar.broker.web.RestException;
58+
import org.apache.pulsar.client.admin.Namespaces;
5359
import org.apache.pulsar.common.naming.Constants;
5460
import org.apache.pulsar.common.naming.NamedEntity;
5561
import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData;
@@ -58,6 +64,7 @@
5864
import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
5965
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
6066
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicyImpl;
67+
import org.apache.pulsar.common.util.FutureUtil;
6168
import org.apache.pulsar.common.util.ObjectMapperFactory;
6269
import org.apache.zookeeper.CreateMode;
6370
import org.apache.zookeeper.KeeperException;
@@ -675,6 +682,7 @@ public BrokerNamespaceIsolationData getBrokerWithNamespaceIsolationPolicy(
675682
@ApiResponse(code = 500, message = "Internal server error.")
676683
})
677684
public void setNamespaceIsolationPolicy(
685+
@Suspended final AsyncResponse asyncResponse,
678686
@ApiParam(
679687
value = "The cluster name",
680688
required = true
@@ -690,14 +698,16 @@ public void setNamespaceIsolationPolicy(
690698
required = true
691699
)
692700
NamespaceIsolationData policyData
693-
) throws Exception {
701+
) {
694702
validateSuperUserAccess();
695703
validateClusterExists(cluster);
696704
validatePoliciesReadOnlyAccess();
697705

706+
String jsonInput = null;
698707
try {
699708
// validate the policy data before creating the node
700709
policyData.validate();
710+
jsonInput = ObjectMapperFactory.create().writeValueAsString(policyData);
701711

702712
String nsIsolationPolicyPath = path("clusters", cluster, NAMESPACE_ISOLATION_POLICIES);
703713
NamespaceIsolationPolicies nsIsolationPolicies = namespaceIsolationPoliciesCache()
@@ -715,24 +725,113 @@ public void setNamespaceIsolationPolicy(
715725
-1);
716726
// make sure that the cache content will be refreshed for the next read access
717727
namespaceIsolationPoliciesCache().invalidate(nsIsolationPolicyPath);
728+
729+
// whether or not make the isolation update on time.
730+
if (pulsar().getConfiguration().isEnableNamespaceIsolationUpdateOnTime()) {
731+
filterAndUnloadMatchedNameSpaces(asyncResponse, policyData);
732+
} else {
733+
asyncResponse.resume(Response.noContent().build());
734+
return;
735+
}
718736
} catch (IllegalArgumentException iae) {
719737
log.info("[{}] Failed to update clusters/{}/namespaceIsolationPolicies/{}. Input data is invalid",
720738
clientAppId(), cluster, policyName, iae);
721-
String jsonInput = ObjectMapperFactory.create().writeValueAsString(policyData);
722-
throw new RestException(Status.BAD_REQUEST,
723-
"Invalid format of input policy data. policy: " + policyName + "; data: " + jsonInput);
739+
asyncResponse.resume(new RestException(Status.BAD_REQUEST,
740+
"Invalid format of input policy data. policy: " + policyName + "; data: " + jsonInput));
724741
} catch (KeeperException.NoNodeException nne) {
725742
log.warn("[{}] Failed to update clusters/{}/namespaceIsolationPolicies: Does not exist", clientAppId(),
726743
cluster);
727-
throw new RestException(Status.NOT_FOUND,
728-
"NamespaceIsolationPolicies for cluster " + cluster + " does not exist");
744+
asyncResponse.resume(new RestException(Status.NOT_FOUND,
745+
"NamespaceIsolationPolicies for cluster " + cluster + " does not exist"));
729746
} catch (Exception e) {
730747
log.error("[{}] Failed to update clusters/{}/namespaceIsolationPolicies/{}", clientAppId(), cluster,
731748
policyName, e);
732-
throw new RestException(e);
749+
asyncResponse.resume(new RestException(e));
733750
}
734751
}
735752

753+
// get matched namespaces; call unload for each namespaces;
754+
private void filterAndUnloadMatchedNameSpaces(AsyncResponse asyncResponse,
755+
NamespaceIsolationData policyData) throws Exception {
756+
Namespaces namespaces = pulsar().getAdminClient().namespaces();
757+
758+
List<String> nssToUnload = Lists.newArrayList();
759+
760+
pulsar().getAdminClient().tenants().getTenantsAsync()
761+
.whenComplete((tenants, ex) -> {
762+
if (ex != null) {
763+
log.error("[{}] Failed to get tenants when setNamespaceIsolationPolicy.", clientAppId(), ex);
764+
return;
765+
}
766+
AtomicInteger tenantsNumber = new AtomicInteger(tenants.size());
767+
// get all tenants now, for each tenants, get its namespaces
768+
tenants.forEach(tenant -> namespaces.getNamespacesAsync(tenant)
769+
.whenComplete((nss, e) -> {
770+
int leftTenantsToHandle = tenantsNumber.decrementAndGet();
771+
if (ex != null) {
772+
log.error("[{}] Failed to get namespaces for tenant {} when setNamespaceIsolationPolicy.",
773+
clientAppId(), tenant, ex);
774+
775+
if (leftTenantsToHandle == 0) {
776+
unloadMatchedNamespacesList(asyncResponse, nssToUnload, namespaces);
777+
}
778+
779+
return;
780+
}
781+
782+
AtomicInteger nssNumber = new AtomicInteger(nss.size());
783+
784+
// get all namespaces for this tenant now.
785+
nss.forEach(namespaceName -> {
786+
int leftNssToHandle = nssNumber.decrementAndGet();
787+
788+
// if namespace match any policy regex, add it to ns list to be unload.
789+
if (policyData.namespaces.stream()
790+
.anyMatch(nsnameRegex -> namespaceName.matches(nsnameRegex))) {
791+
nssToUnload.add(namespaceName);
792+
}
793+
794+
// all the tenants & namespaces get filtered.
795+
if (leftNssToHandle == 0 && leftTenantsToHandle == 0) {
796+
unloadMatchedNamespacesList(asyncResponse, nssToUnload, namespaces);
797+
}
798+
});
799+
}));
800+
});
801+
}
802+
803+
private void unloadMatchedNamespacesList(AsyncResponse asyncResponse,
804+
List<String> nssToUnload,
805+
Namespaces namespaces) {
806+
if (nssToUnload.size() == 0) {
807+
asyncResponse.resume(Response.noContent().build());
808+
return;
809+
}
810+
811+
List<CompletableFuture<Void>> futures = nssToUnload.stream()
812+
.map(namespaceName -> namespaces.unloadAsync(namespaceName))
813+
.collect(Collectors.toList());
814+
815+
FutureUtil.waitForAll(futures).whenComplete((result, exception) -> {
816+
if (exception != null) {
817+
log.error("[{}] Failed to unload namespace while setNamespaceIsolationPolicy.",
818+
clientAppId(), exception);
819+
asyncResponse.resume(new RestException(exception));
820+
return;
821+
}
822+
823+
try {
824+
// write load info to load manager to make the load happens fast
825+
pulsar().getLoadManager().get().writeLoadReportOnZookeeper(true);
826+
} catch (Exception e) {
827+
log.warn("[{}] Failed to writeLoadReportOnZookeeper.", clientAppId(), e);
828+
}
829+
830+
asyncResponse.resume(Response.noContent().build());
831+
return;
832+
});
833+
}
834+
736835
private boolean createZnodeIfNotExist(String path, Optional<Object> value)
737836
throws KeeperException, InterruptedException {
738837
// create persistent node on ZooKeeper

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,14 @@ public interface LoadManager {
8080
*/
8181
void writeLoadReportOnZookeeper() throws Exception;
8282

83+
/**
84+
* Publish the current load report on ZK, forced or not.
85+
* By default rely on method writeLoadReportOnZookeeper().
86+
*/
87+
default void writeLoadReportOnZookeeper(boolean force) throws Exception {
88+
writeLoadReportOnZookeeper();
89+
}
90+
8391
/**
8492
* Update namespace bundle resource quota on ZK.
8593
*/

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,13 @@ public interface ModularLoadManager {
9494
*/
9595
void writeBrokerDataOnZooKeeper();
9696

97+
/**
98+
* As any broker, write the local broker data to ZooKeeper, forced or not.
99+
*/
100+
default void writeBrokerDataOnZooKeeper(boolean force) {
101+
writeBrokerDataOnZooKeeper();
102+
}
103+
97104
/**
98105
* As the leader broker, write bundle data aggregated from all brokers to ZooKeeper.
99106
*/

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -992,16 +992,21 @@ private void updateLoadBalancingMetrics(final SystemResourceUsage systemResource
992992
*/
993993
@Override
994994
public void writeBrokerDataOnZooKeeper() {
995+
writeBrokerDataOnZooKeeper(false);
996+
}
997+
998+
@Override
999+
public void writeBrokerDataOnZooKeeper(boolean force) {
9951000
try {
9961001
updateLocalBrokerData();
997-
if (needBrokerDataUpdate()) {
1002+
if (needBrokerDataUpdate() || force) {
9981003
localData.setLastUpdate(System.currentTimeMillis());
9991004

10001005
try {
10011006
zkClient.setData(brokerZnodePath, localData.getJsonBytes(), -1);
10021007
} catch (KeeperException.NoNodeException e) {
10031008
ZkUtils.createFullPathOptimistic(zkClient, brokerZnodePath, localData.getJsonBytes(),
1004-
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
1009+
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
10051010
}
10061011

10071012
// Clear deltas.

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,11 @@ public void writeLoadReportOnZookeeper() {
114114
loadManager.writeBrokerDataOnZooKeeper();
115115
}
116116

117+
@Override
118+
public void writeLoadReportOnZookeeper(boolean force) {
119+
loadManager.writeBrokerDataOnZooKeeper(force);
120+
}
121+
117122
@Override
118123
public void writeResourceQuotasToZooKeeper() {
119124
loadManager.writeBundleDataOnZooKeeper();

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ public class AdminApiTest2 extends MockedPulsarServiceBaseTest {
100100
@Override
101101
public void setup() throws Exception {
102102
conf.setLoadBalancerEnabled(true);
103+
conf.setEnableNamespaceIsolationUpdateOnTime(true);
103104
super.internalSetup();
104105

105106
// create otherbroker to test redirect on calls that need
@@ -908,6 +909,62 @@ public void brokerNamespaceIsolationPolicies() throws Exception {
908909
assertFalse(isolationData.isPrimary);
909910
}
910911

912+
// create 1 namespace:
913+
// 0. without isolation policy configured, lookup will success.
914+
// 1. with matched isolation broker configured and matched, lookup will success.
915+
// 2. update isolation policy, without broker matched, lookup will fail.
916+
@Test
917+
public void brokerNamespaceIsolationPoliciesUpdateOnTime() throws Exception {
918+
String brokerName = pulsar.getAdvertisedAddress();
919+
String ns1Name = "prop-xyz/test_ns1_iso_" + System.currentTimeMillis();
920+
admin.namespaces().createNamespace(ns1Name, Sets.newHashSet("test"));
921+
922+
// 0. without isolation policy configured, lookup will success.
923+
String brokerUrl = admin.lookups().lookupTopic(ns1Name + "/topic1");
924+
assertTrue(brokerUrl.contains(brokerName));
925+
log.info("0 get lookup url {}", brokerUrl);
926+
927+
// create
928+
String policyName1 = "policy-1";
929+
String cluster = pulsar.getConfiguration().getClusterName();
930+
String namespaceRegex = ns1Name;
931+
NamespaceIsolationData nsPolicyData1 = new NamespaceIsolationData();
932+
nsPolicyData1.namespaces = new ArrayList<String>();
933+
nsPolicyData1.namespaces.add(ns1Name);
934+
nsPolicyData1.primary = new ArrayList<String>();
935+
nsPolicyData1.primary.add(brokerName + ".*");
936+
nsPolicyData1.auto_failover_policy = new AutoFailoverPolicyData();
937+
nsPolicyData1.auto_failover_policy.policy_type = AutoFailoverPolicyType.min_available;
938+
nsPolicyData1.auto_failover_policy.parameters = new HashMap<String, String>();
939+
nsPolicyData1.auto_failover_policy.parameters.put("min_limit", "1");
940+
nsPolicyData1.auto_failover_policy.parameters.put("usage_threshold", "100");
941+
admin.clusters().createNamespaceIsolationPolicyAsync(cluster, policyName1, nsPolicyData1).get();
942+
943+
// 1. with matched isolation broker configured and matched, lookup will success.
944+
brokerUrl = admin.lookups().lookupTopic(ns1Name + "/topic2");
945+
assertTrue(brokerUrl.contains(brokerName));
946+
log.info(" 1 get lookup url {}", brokerUrl);
947+
948+
// 2. update isolation policy, without broker matched, lookup will fail.
949+
nsPolicyData1.primary = new ArrayList<String>();
950+
nsPolicyData1.primary.add(brokerName + "not_match");
951+
admin.clusters().updateNamespaceIsolationPolicyAsync(cluster, policyName1, nsPolicyData1).get();
952+
953+
try {
954+
admin.lookups().lookupTopic(ns1Name + "/topic3");
955+
} catch (Exception e) {
956+
// expected lookup fail, because no brokers matched the policy.
957+
log.info(" 2 expected fail lookup");
958+
}
959+
960+
try {
961+
admin.lookups().lookupTopic(ns1Name + "/topic1");
962+
} catch (Exception e) {
963+
// expected lookup fail, because no brokers matched the policy.
964+
log.info(" 22 expected fail lookup");
965+
}
966+
}
967+
911968
@Test
912969
public void clustersList() throws PulsarAdminException {
913970
final String cluster = pulsar.getConfiguration().getClusterName();

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,8 @@ public void clusters() throws Exception {
284284
policyData.auto_failover_policy.parameters = new HashMap<String, String>();
285285
policyData.auto_failover_policy.parameters.put("min_limit", "1");
286286
policyData.auto_failover_policy.parameters.put("usage_threshold", "90");
287-
clusters.setNamespaceIsolationPolicy("use", "policy1", policyData);
287+
AsyncResponse response = mock(AsyncResponse.class);
288+
clusters.setNamespaceIsolationPolicy(response,"use", "policy1", policyData);
288289
clusters.getNamespaceIsolationPolicies("use");
289290

290291
try {

0 commit comments

Comments
 (0)