Skip to content

Commit 0f95678

Browse files
symatMate Szalay-Beko
authored andcommitted
ZOOKEEPER-3188: skip unreachable addresses when Learner connects to Leader
1 parent e232c55 commit 0f95678

4 files changed

Lines changed: 61 additions & 12 deletions

File tree

zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -255,14 +255,14 @@ protected void sockConnect(Socket sock, InetSocketAddress addr, int timeout) thr
255255
* Establish a connection with the LearnerMaster found by findLearnerMaster.
256256
* Followers only connect to Leaders, Observers can connect to any active LearnerMaster.
257257
* Retries until either initLimit time has elapsed or 5 tries have happened.
258-
* @param addr - the address of the Peer to connect to.
258+
* @param multiAddr - the address of the Peer to connect to.
259259
* @throws IOException - if the socket connection fails on the 5th attempt
260260
* if there is an authentication failure while connecting to leader
261261
*/
262-
protected void connectToLeader(MultipleAddresses addr, String hostname) throws IOException {
262+
protected void connectToLeader(MultipleAddresses multiAddr, String hostname) throws IOException {
263263

264-
this.leaderAddr = addr;
265-
Set<InetSocketAddress> addresses = addr.getAllAddresses();
264+
this.leaderAddr = multiAddr;
265+
Set<InetSocketAddress> addresses = multiAddr.getAllReachableAddresses();
266266
ExecutorService executor = Executors.newFixedThreadPool(addresses.size());
267267
CountDownLatch latch = new CountDownLatch(addresses.size());
268268
AtomicReference<Socket> socket = new AtomicReference<>(null);
@@ -284,15 +284,14 @@ protected void connectToLeader(MultipleAddresses addr, String hostname) throws I
284284
}
285285

286286
if (socket.get() == null) {
287-
throw new IOException("Failed connect to " + addr);
287+
throw new IOException("Failed connect to " + multiAddr);
288288
} else {
289289
sock = socket.get();
290290
}
291291

292292
self.authLearner.authenticate(sock, hostname);
293293

294-
leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(
295-
sock.getInputStream()));
294+
leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(sock.getInputStream()));
296295
bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
297296
leaderOs = BinaryOutputArchive.getArchive(bufferedOutput);
298297
}
@@ -315,9 +314,13 @@ public void run() {
315314
Thread.currentThread().setName("LeaderConnector-" + address);
316315
Socket sock = connectToLeader();
317316

318-
if (sock != null && sock.isConnected() && !socket.compareAndSet(null, sock)) {
319-
LOG.info("Connection to the leader is already established, close the redundant connection");
320-
sock.close();
317+
if (sock != null && sock.isConnected()) {
318+
if (socket.compareAndSet(null, sock)) {
319+
LOG.info("Successfully connected to leader, using address: {}", address);
320+
} else {
321+
LOG.info("Connection to the leader is already established, close the redundant connection");
322+
sock.close();
323+
}
321324
}
322325

323326
} catch (Exception e) {

zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/MultipleAddresses.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,13 +120,25 @@ public void addAddress(InetSocketAddress address) {
120120
* @throws NoRouteToHostException if none of the addresses are reachable
121121
*/
122122
public InetSocketAddress getReachableAddress() throws NoRouteToHostException {
123-
// using parallelStream() + findAny() will help to minimize the time spent, but
123+
// using parallelStream() + findAny() will help to minimize the time spent on network operations
124124
return addresses.parallelStream()
125125
.filter(this::checkIfAddressIsReachable)
126126
.findAny()
127127
.orElseThrow(() -> new NoRouteToHostException("No valid address among " + addresses));
128128
}
129129

130+
/**
131+
* Returns a set of all reachable addresses. If none is reachable than returns empty set.
132+
*
133+
* @return all addresses which are reachable.
134+
*/
135+
public Set<InetSocketAddress> getAllReachableAddresses() {
136+
// using parallelStream() will help to minimize the time spent on network operations
137+
return addresses.parallelStream()
138+
.filter(this::checkIfAddressIsReachable)
139+
.collect(Collectors.toSet());
140+
}
141+
130142
/**
131143
* Returns a reachable address or an arbitrary one, if none is reachable. It throws an exception
132144
* if there are no addresses registered. The function is nondeterministic in the sense that the

zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -666,7 +666,8 @@ synchronized boolean connectOne(long sid, MultipleAddresses electionAddr) {
666666
sslSock.getSession().getCipherSuite());
667667
}
668668

669-
LOG.debug("Connected to server {}", sid);
669+
LOG.debug("Connected to server {} using election address: {}:{}",
670+
sid, sock.getInetAddress(), sock.getPort());
670671
// Sends connection request asynchronously if the quorum
671672
// sasl authentication is enabled. This is required because
672673
// sasl server authentication process may take few seconds to

zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/MultipleAddressesTest.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@
2424
import java.net.UnknownHostException;
2525
import java.util.ArrayList;
2626
import java.util.Arrays;
27+
import java.util.HashSet;
2728
import java.util.List;
29+
import java.util.Set;
2830
import java.util.stream.Collectors;
2931
import java.util.stream.IntStream;
3032
import org.apache.commons.collections.CollectionUtils;
@@ -133,6 +135,37 @@ public void testRecreateSocketAddressesWithWrongAddresses() {
133135
Assert.assertEquals(address, multipleAddresses.getOne());
134136
}
135137

138+
@Test
139+
public void testAlwaysGetReachableAddress() throws Exception{
140+
InetSocketAddress reachableHost = new InetSocketAddress("127.0.0.1", 1234);
141+
InetSocketAddress unreachableHost1 = new InetSocketAddress("unreachable1.address.zookeeper.apache.com", 1234);
142+
InetSocketAddress unreachableHost2 = new InetSocketAddress("unreachable2.address.zookeeper.apache.com", 1234);
143+
InetSocketAddress unreachableHost3 = new InetSocketAddress("unreachable3.address.zookeeper.apache.com", 1234);
144+
145+
MultipleAddresses multipleAddresses = new MultipleAddresses(
146+
Arrays.asList(unreachableHost1, unreachableHost2, unreachableHost3, reachableHost));
147+
148+
// we call the getReachableAddress() function multiple times, to make sure we
149+
// always got back a reachable address and not just a random one
150+
for (int i = 0; i < 10; i++) {
151+
Assert.assertEquals(reachableHost, multipleAddresses.getReachableAddress());
152+
}
153+
}
154+
155+
@Test
156+
public void testGetAllReachableAddresses() throws Exception {
157+
InetSocketAddress reachableHost1 = new InetSocketAddress("127.0.0.1", 1234);
158+
InetSocketAddress reachableHost2 = new InetSocketAddress("127.0.0.1", 2345);
159+
InetSocketAddress unreachableHost1 = new InetSocketAddress("unreachable1.address.zookeeper.apache.com", 1234);
160+
InetSocketAddress unreachableHost2 = new InetSocketAddress("unreachable2.address.zookeeper.apache.com", 1234);
161+
162+
MultipleAddresses multipleAddresses = new MultipleAddresses(
163+
Arrays.asList(unreachableHost1, unreachableHost2, reachableHost1, reachableHost2));
164+
165+
Set<InetSocketAddress> reachableHosts = new HashSet<>(Arrays.asList(reachableHost1, reachableHost2));
166+
Assert.assertEquals(reachableHosts, multipleAddresses.getAllReachableAddresses());
167+
}
168+
136169
@Test
137170
public void testEquals() {
138171
List<InetSocketAddress> addresses = getAddressList();

0 commit comments

Comments
 (0)