Skip to content

Commit 5285c68

Browse files
Start namespace service and schema registry service before start broker. (#6499)
### Motivation If the broker service is started, the client can connect to the broker and send requests depends on the namespace service, so we should create the namespace service before starting the broker. Otherwise, NPE occurs. ![image](https://user-images.githubusercontent.com/12592133/76090515-a9961400-5ff6-11ea-9077-cb8e79fa27c0.png) ![image](https://user-images.githubusercontent.com/12592133/76099838-b15db480-6006-11ea-8f39-31d820563c88.png) ### Modifications Move the namespace service creation and the schema registry service creation before start broker service.
1 parent ff01b10 commit 5285c68

File tree

3 files changed

+31
-8
lines changed

3 files changed

+31
-8
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -401,6 +401,10 @@ public void start() throws PulsarServerException {
401401
// Start load management service (even if load balancing is disabled)
402402
this.loadManager.set(LoadManager.create(this));
403403

404+
// needs load management service and before start broker service,
405+
this.startNamespaceService();
406+
schemaRegistryService = SchemaRegistryService.create(this);
407+
404408
this.defaultOffloader = createManagedLedgerOffloader(
405409
OffloadPolicies.create(this.getConfiguration().getProperties()));
406410

@@ -458,8 +462,6 @@ public Boolean get() {
458462
}
459463
this.webService.addStaticResources("/static", "/static");
460464

461-
schemaRegistryService = SchemaRegistryService.create(this);
462-
463465
webService.start();
464466

465467
// Refresh addresses, since the port might have been dynamically assigned
@@ -474,8 +476,8 @@ public Boolean get() {
474476
this.webSocketService.setLocalCluster(clusterData);
475477
}
476478

477-
// needs load management service
478-
this.startNamespaceService();
479+
// Initialize namespace service, after service url assigned. Should init zk and refresh self owner info.
480+
this.nsService.initialize();
479481

480482
// Start the leader election service
481483
startLeaderElectionService();

pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,6 @@
7373

7474
import java.net.URI;
7575
import java.net.URL;
76-
import java.util.ArrayList;
7776
import java.util.Collections;
7877
import java.util.HashMap;
7978
import java.util.List;
@@ -158,13 +157,19 @@ public NamespaceService(PulsarService pulsar) {
158157
host = pulsar.getAdvertisedAddress();
159158
this.config = pulsar.getConfiguration();
160159
this.loadManager = pulsar.getLoadManager();
161-
ServiceUnitZkUtils.initZK(pulsar.getLocalZkCache().getZooKeeper(), pulsar.getSafeBrokerServiceUrl());
162160
this.bundleFactory = new NamespaceBundleFactory(pulsar, Hashing.crc32());
163161
this.ownershipCache = new OwnershipCache(pulsar, bundleFactory, this);
164162
this.namespaceClients = new ConcurrentOpenHashMap<>();
165163
this.bundleOwnershipListeners = new CopyOnWriteArrayList<>();
166164
}
167165

166+
public void initialize() {
167+
ServiceUnitZkUtils.initZK(pulsar.getLocalZkCache().getZooKeeper(), pulsar.getSafeBrokerServiceUrl());
168+
if (!getOwnershipCache().refreshSelfOwnerInfo()) {
169+
throw new RuntimeException("Failed to refresh self owner info.");
170+
}
171+
}
172+
168173
public CompletableFuture<Optional<LookupResult>> getBrokerServiceUrlAsync(TopicName topic,
169174
boolean authoritative) {
170175
return getBundleAsync(topic)
@@ -424,7 +429,7 @@ private void searchForCandidateBroker(NamespaceBundle bundle,
424429
try {
425430
checkNotNull(candidateBroker);
426431

427-
if (pulsar.getSafeWebServiceAddress().equals(candidateBroker)) {
432+
if (candidateBroker.equals(pulsar.getSafeWebServiceAddress())) {
428433
// invalidate namespace policies and try to load latest policies to avoid data-discrepancy if broker
429434
// doesn't receive watch on policies changes
430435
final String policyPath = AdminResource.path(POLICIES, bundle.getNamespaceObject().toString());

pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ public class OwnershipCache {
7373
/**
7474
* The NamespaceEphemeralData objects that can be associated with the current owner
7575
*/
76-
private final NamespaceEphemeralData selfOwnerInfo;
76+
private NamespaceEphemeralData selfOwnerInfo;
7777

7878
/**
7979
* The NamespaceEphemeralData objects that can be associated with the current owner, when the broker is disabled.
@@ -111,6 +111,8 @@ public class OwnershipCache {
111111
*/
112112
private NamespaceService namespaceService;
113113

114+
private final PulsarService pulsar;
115+
114116
private class OwnedServiceUnitCacheLoader implements AsyncCacheLoader<String, OwnedBundle> {
115117

116118
@SuppressWarnings("deprecation")
@@ -156,6 +158,7 @@ public CompletableFuture<OwnedBundle> asyncLoad(String namespaceBundleZNode, Exe
156158
*/
157159
public OwnershipCache(PulsarService pulsar, NamespaceBundleFactory bundleFactory, NamespaceService namespaceService) {
158160
this.namespaceService = namespaceService;
161+
this.pulsar = pulsar;
159162
this.ownerBrokerUrl = pulsar.getSafeBrokerServiceUrl();
160163
this.ownerBrokerUrlTls = pulsar.getBrokerServiceUrlTls();
161164
this.selfOwnerInfo = new NamespaceEphemeralData(ownerBrokerUrl, ownerBrokerUrlTls,
@@ -211,6 +214,11 @@ public CompletableFuture<NamespaceEphemeralData> tryAcquiringOwnership(Namespace
211214

212215
CompletableFuture<NamespaceEphemeralData> future = new CompletableFuture<>();
213216

217+
if (!refreshSelfOwnerInfo()) {
218+
future.completeExceptionally(new RuntimeException("Namespace service does not ready for acquiring ownership"));
219+
return future;
220+
}
221+
214222
LOG.info("Trying to acquire ownership of {}", bundle);
215223

216224
// Doing a get() on the ownedBundlesCache will trigger an async ZK write to acquire the lock over the
@@ -367,4 +375,12 @@ public void updateBundleState(NamespaceBundle bundle, boolean isActive) throws E
367375
public NamespaceEphemeralData getSelfOwnerInfo() {
368376
return selfOwnerInfo;
369377
}
378+
379+
public synchronized boolean refreshSelfOwnerInfo() {
380+
if (selfOwnerInfo.getNativeUrl() == null) {
381+
this.selfOwnerInfo = new NamespaceEphemeralData(pulsar.getSafeBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls(),
382+
pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(), false);
383+
}
384+
return selfOwnerInfo.getNativeUrl() != null;
385+
}
370386
}

0 commit comments

Comments
 (0)