Skip to content

Commit db20035

Browse files
authored
[fix] [bk] Correctct the bookie info after ZK client is reconnected (#21035)
Motivation: After [PIP-118: reconnect broker when ZooKeeper session expires](#13341), the Broker will not shut down after losing the connection of the local metadata store in the default configuration. However, before the ZK client is reconnected, the events of BK online and offline are lost, resulting in incorrect BK info in the memory. You can reproduce the issue by the test `BkEnsemblesChaosTest. testBookieInfoIsCorrectEvenIfLostNotificationDueToZKClientReconnect`(90% probability of reproduce of the issue, run it again if the issue does not occur) Modifications: Refresh BK info in memory after the ZK client is reconnected.
1 parent 07eef59 commit db20035

File tree

5 files changed

+320
-2
lines changed

5 files changed

+320
-2
lines changed
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.service;
20+
21+
import org.apache.pulsar.broker.BrokerTestUtil;
22+
import org.apache.pulsar.client.api.Producer;
23+
import org.testng.annotations.AfterClass;
24+
import org.testng.annotations.BeforeClass;
25+
import org.testng.annotations.Test;
26+
27+
@Test(groups = "broker")
28+
public class BkEnsemblesChaosTest extends CanReconnectZKClientPulsarServiceBaseTest {
29+
30+
@Override
31+
@BeforeClass(alwaysRun = true, timeOut = 300000)
32+
public void setup() throws Exception {
33+
super.setup();
34+
}
35+
36+
@Override
37+
@AfterClass(alwaysRun = true, timeOut = 300000)
38+
public void cleanup() throws Exception {
39+
super.cleanup();
40+
}
41+
42+
@Test
43+
public void testBookieInfoIsCorrectEvenIfLostNotificationDueToZKClientReconnect() throws Exception {
44+
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + defaultNamespace + "/tp_");
45+
final byte[] msgValue = "test".getBytes();
46+
admin.topics().createNonPartitionedTopic(topicName);
47+
// Ensure broker works.
48+
Producer<byte[]> producer1 = client.newProducer().topic(topicName).create();
49+
producer1.send(msgValue);
50+
producer1.close();
51+
admin.topics().unload(topicName);
52+
53+
// Restart some bookies, which triggers the ZK node of Bookie deleted and created.
54+
// And make the local metadata store reconnect to lose some notification of the ZK node change.
55+
for (int i = 0; i < numberOfBookies - 1; i++){
56+
bkEnsemble.stopBK(i);
57+
}
58+
makeLocalMetadataStoreKeepReconnect();
59+
for (int i = 0; i < numberOfBookies - 1; i++){
60+
bkEnsemble.startBK(i);
61+
}
62+
// Sleep 100ms to lose the notifications of ZK node create.
63+
Thread.sleep(100);
64+
stopLocalMetadataStoreAlwaysReconnect();
65+
66+
// Ensure broker still works.
67+
admin.topics().unload(topicName);
68+
Producer<byte[]> producer2 = client.newProducer().topic(topicName).create();
69+
producer2.send(msgValue);
70+
}
71+
}
Lines changed: 215 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,215 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.service;
20+
21+
import com.google.common.collect.Sets;
22+
import io.netty.channel.Channel;
23+
import java.net.URL;
24+
import java.nio.channels.SelectionKey;
25+
import java.util.Collections;
26+
import java.util.Optional;
27+
import java.util.concurrent.atomic.AtomicBoolean;
28+
import lombok.extern.slf4j.Slf4j;
29+
import org.apache.pulsar.broker.PulsarService;
30+
import org.apache.pulsar.broker.ServiceConfiguration;
31+
import org.apache.pulsar.client.admin.PulsarAdmin;
32+
import org.apache.pulsar.client.api.PulsarClient;
33+
import org.apache.pulsar.common.policies.data.ClusterData;
34+
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
35+
import org.apache.pulsar.common.policies.data.TopicType;
36+
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
37+
import org.apache.pulsar.tests.TestRetrySupport;
38+
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
39+
import org.apache.pulsar.zookeeper.ZookeeperServerTest;
40+
import org.apache.zookeeper.ClientCnxn;
41+
import org.apache.zookeeper.ZooKeeper;
42+
import org.awaitility.reflect.WhiteboxImpl;
43+
44+
@Slf4j
45+
public abstract class CanReconnectZKClientPulsarServiceBaseTest extends TestRetrySupport {
46+
47+
protected final String defaultTenant = "public";
48+
protected final String defaultNamespace = defaultTenant + "/default";
49+
protected int numberOfBookies = 3;
50+
protected final String clusterName = "r1";
51+
protected URL url;
52+
protected URL urlTls;
53+
protected ServiceConfiguration config = new ServiceConfiguration();
54+
protected ZookeeperServerTest brokerConfigZk;
55+
protected LocalBookkeeperEnsemble bkEnsemble;
56+
protected PulsarService pulsar;
57+
protected BrokerService broker;
58+
protected PulsarAdmin admin;
59+
protected PulsarClient client;
60+
protected ZooKeeper localZkOfBroker;
61+
protected Object localMetaDataStoreClientCnx;
62+
protected final AtomicBoolean LocalMetadataStoreInReconnectFinishSignal = new AtomicBoolean();
63+
protected void startZKAndBK() throws Exception {
64+
// Start ZK.
65+
brokerConfigZk = new ZookeeperServerTest(0);
66+
brokerConfigZk.start();
67+
68+
// Start BK.
69+
bkEnsemble = new LocalBookkeeperEnsemble(numberOfBookies, 0, () -> 0);
70+
bkEnsemble.start();
71+
}
72+
73+
protected void startBrokers() throws Exception {
74+
// Start brokers.
75+
setConfigDefaults(config, clusterName, bkEnsemble, brokerConfigZk);
76+
pulsar = new PulsarService(config);
77+
pulsar.start();
78+
broker = pulsar.getBrokerService();
79+
ZKMetadataStore zkMetadataStore = (ZKMetadataStore) pulsar.getLocalMetadataStore();
80+
localZkOfBroker = zkMetadataStore.getZkClient();
81+
ClientCnxn cnxn = WhiteboxImpl.getInternalState(localZkOfBroker, "cnxn");
82+
Object sendThread = WhiteboxImpl.getInternalState(cnxn, "sendThread");
83+
localMetaDataStoreClientCnx = WhiteboxImpl.getInternalState(sendThread, "clientCnxnSocket");
84+
85+
url = new URL(pulsar.getWebServiceAddress());
86+
urlTls = new URL(pulsar.getWebServiceAddressTls());
87+
admin = PulsarAdmin.builder().serviceHttpUrl(url.toString()).build();
88+
client = PulsarClient.builder().serviceUrl(url.toString()).build();
89+
}
90+
91+
protected void makeLocalMetadataStoreKeepReconnect() throws Exception {
92+
if (!LocalMetadataStoreInReconnectFinishSignal.compareAndSet(false, true)) {
93+
throw new RuntimeException("Local metadata store is already keeping reconnect");
94+
}
95+
if (localMetaDataStoreClientCnx.getClass().getSimpleName().equals("ClientCnxnSocketNIO")) {
96+
makeLocalMetadataStoreKeepReconnectNIO();
97+
} else {
98+
// ClientCnxnSocketNetty.
99+
makeLocalMetadataStoreKeepReconnectNetty();
100+
}
101+
}
102+
103+
protected void makeLocalMetadataStoreKeepReconnectNIO() {
104+
new Thread(() -> {
105+
while (LocalMetadataStoreInReconnectFinishSignal.get()) {
106+
try {
107+
SelectionKey sockKey = WhiteboxImpl.getInternalState(localMetaDataStoreClientCnx, "sockKey");
108+
if (sockKey != null) {
109+
sockKey.channel().close();
110+
}
111+
// Prevents high cpu usage.
112+
Thread.sleep(5);
113+
} catch (Exception e) {
114+
log.error("Try close the ZK connection of local metadata store failed: {}", e.toString());
115+
}
116+
}
117+
}).start();
118+
}
119+
120+
protected void makeLocalMetadataStoreKeepReconnectNetty() {
121+
new Thread(() -> {
122+
while (LocalMetadataStoreInReconnectFinishSignal.get()) {
123+
try {
124+
Channel channel = WhiteboxImpl.getInternalState(localMetaDataStoreClientCnx, "channel");
125+
if (channel != null) {
126+
channel.close();
127+
}
128+
// Prevents high cpu usage.
129+
Thread.sleep(5);
130+
} catch (Exception e) {
131+
log.error("Try close the ZK connection of local metadata store failed: {}", e.toString());
132+
}
133+
}
134+
}).start();
135+
}
136+
137+
protected void stopLocalMetadataStoreAlwaysReconnect() {
138+
LocalMetadataStoreInReconnectFinishSignal.set(false);
139+
}
140+
141+
protected void createDefaultTenantsAndClustersAndNamespace() throws Exception {
142+
admin.clusters().createCluster(clusterName, ClusterData.builder()
143+
.serviceUrl(url.toString())
144+
.serviceUrlTls(urlTls.toString())
145+
.brokerServiceUrl(pulsar.getBrokerServiceUrl())
146+
.brokerServiceUrlTls(pulsar.getBrokerServiceUrlTls())
147+
.brokerClientTlsEnabled(false)
148+
.build());
149+
150+
admin.tenants().createTenant(defaultTenant, new TenantInfoImpl(Collections.emptySet(),
151+
Sets.newHashSet(clusterName)));
152+
153+
admin.namespaces().createNamespace(defaultNamespace, Sets.newHashSet(clusterName));
154+
}
155+
156+
@Override
157+
protected void setup() throws Exception {
158+
incrementSetupNumber();
159+
160+
log.info("--- Starting OneWayReplicatorTestBase::setup ---");
161+
162+
startZKAndBK();
163+
164+
startBrokers();
165+
166+
createDefaultTenantsAndClustersAndNamespace();
167+
168+
Thread.sleep(100);
169+
log.info("--- OneWayReplicatorTestBase::setup completed ---");
170+
}
171+
172+
private void setConfigDefaults(ServiceConfiguration config, String clusterName,
173+
LocalBookkeeperEnsemble bookkeeperEnsemble, ZookeeperServerTest brokerConfigZk) {
174+
config.setClusterName(clusterName);
175+
config.setAdvertisedAddress("localhost");
176+
config.setWebServicePort(Optional.of(0));
177+
config.setWebServicePortTls(Optional.of(0));
178+
config.setMetadataStoreUrl("zk:127.0.0.1:" + bookkeeperEnsemble.getZookeeperPort());
179+
config.setConfigurationMetadataStoreUrl("zk:127.0.0.1:" + brokerConfigZk.getZookeeperPort() + "/foo");
180+
config.setBrokerDeleteInactiveTopicsEnabled(false);
181+
config.setBrokerDeleteInactiveTopicsFrequencySeconds(60);
182+
config.setBrokerShutdownTimeoutMs(0L);
183+
config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
184+
config.setBrokerServicePort(Optional.of(0));
185+
config.setBrokerServicePortTls(Optional.of(0));
186+
config.setBacklogQuotaCheckIntervalInSeconds(5);
187+
config.setDefaultNumberOfNamespaceBundles(1);
188+
config.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
189+
config.setEnableReplicatedSubscriptions(true);
190+
config.setReplicatedSubscriptionsSnapshotFrequencyMillis(1000);
191+
}
192+
193+
@Override
194+
protected void cleanup() throws Exception {
195+
markCurrentSetupNumberCleaned();
196+
log.info("--- Shutting down ---");
197+
198+
stopLocalMetadataStoreAlwaysReconnect();
199+
200+
// Stop brokers.
201+
client.close();
202+
admin.close();
203+
if (pulsar != null) {
204+
pulsar.close();
205+
}
206+
207+
// Stop ZK and BK.
208+
bkEnsemble.stop();
209+
brokerConfigZk.stop();
210+
211+
// Reset configs.
212+
config = new ServiceConfiguration();
213+
setConfigDefaults(config, clusterName, bkEnsemble, brokerConfigZk);
214+
}
215+
}

pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCache.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,11 @@ public interface MetadataCache<T> {
148148
*/
149149
void invalidate(String path);
150150

151+
/**
152+
* Force the invalidation of all object in the metadata cache.
153+
*/
154+
void invalidateAll();
155+
151156
/**
152157
* Invalidate and reload an object in the metadata cache.
153158
*

pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,13 @@
5151
import org.apache.pulsar.metadata.api.MetadataCache;
5252
import org.apache.pulsar.metadata.api.MetadataStore;
5353
import org.apache.pulsar.metadata.api.Notification;
54+
import org.apache.pulsar.metadata.api.extended.SessionEvent;
55+
import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
5456

5557
@Slf4j
5658
public class PulsarRegistrationClient implements RegistrationClient {
5759

58-
private final MetadataStore store;
60+
private final AbstractMetadataStore store;
5961
private final String ledgersRootPath;
6062
// registration paths
6163
private final String bookieRegistrationPath;
@@ -68,10 +70,11 @@ public class PulsarRegistrationClient implements RegistrationClient {
6870
private final Map<BookieId, Versioned<BookieServiceInfo>> writableBookieInfo;
6971
private final Map<BookieId, Versioned<BookieServiceInfo>> readOnlyBookieInfo;
7072
private final FutureUtil.Sequencer<Void> sequencer;
73+
private SessionEvent lastMetadataSessionEvent;
7174

7275
public PulsarRegistrationClient(MetadataStore store,
7376
String ledgersRootPath) {
74-
this.store = store;
77+
this.store = (AbstractMetadataStore) store;
7578
this.ledgersRootPath = ledgersRootPath;
7679
this.bookieServiceInfoMetadataCache = store.getMetadataCache(BookieServiceInfoSerde.INSTANCE);
7780
this.sequencer = Sequencer.create();
@@ -88,13 +91,29 @@ public PulsarRegistrationClient(MetadataStore store,
8891
.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-registration-client"));
8992

9093
store.registerListener(this::updatedBookies);
94+
this.store.registerSessionListener(this::refreshBookies);
9195
}
9296

9397
@Override
9498
public void close() {
9599
executor.shutdownNow();
96100
}
97101

102+
private void refreshBookies(SessionEvent sessionEvent) {
103+
lastMetadataSessionEvent = sessionEvent;
104+
if (!SessionEvent.Reconnected.equals(sessionEvent) && !SessionEvent.SessionReestablished.equals(sessionEvent)){
105+
return;
106+
}
107+
// Clean caches.
108+
store.invalidateCaches(bookieRegistrationPath, bookieAllRegistrationPath, bookieReadonlyRegistrationPath);
109+
bookieServiceInfoMetadataCache.invalidateAll();
110+
// Refresh caches of the listeners.
111+
getReadOnlyBookies().thenAccept(bookies ->
112+
readOnlyBookiesWatchers.forEach(w -> executor.execute(() -> w.onBookiesChanged(bookies))));
113+
getWritableBookies().thenAccept(bookies ->
114+
writableBookiesWatchers.forEach(w -> executor.execute(() -> w.onBookiesChanged(bookies))));
115+
}
116+
98117
@Override
99118
public CompletableFuture<Versioned<Set<BookieId>>> getWritableBookies() {
100119
return getBookiesThenFreshCache(bookieRegistrationPath);

pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.github.benmanes.caffeine.cache.AsyncCacheLoader;
2424
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
2525
import com.github.benmanes.caffeine.cache.Caffeine;
26+
import com.github.benmanes.caffeine.cache.LoadingCache;
2627
import com.google.common.annotations.VisibleForTesting;
2728
import io.netty.util.concurrent.DefaultThreadFactory;
2829
import java.time.Instant;
@@ -523,6 +524,13 @@ public void invalidateAll() {
523524
existsCache.synchronous().invalidateAll();
524525
}
525526

527+
public void invalidateCaches(String...paths) {
528+
LoadingCache<String, List<String>> loadingCache = childrenCache.synchronous();
529+
for (String path : paths) {
530+
loadingCache.invalidate(path);
531+
}
532+
}
533+
526534
/**
527535
* Run the task in the executor thread and fail the future if the executor is shutting down.
528536
*/

0 commit comments

Comments
 (0)