Skip to content

Commit 5b22432

Browse files
committed
ZOOKEEPER-3188: fix LeaderElection to work with multiple election addresses
1 parent 7bfbe7e commit 5b22432

2 files changed

Lines changed: 80 additions & 29 deletions

File tree

zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java

Lines changed: 79 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,17 @@
2626
import java.io.DataInputStream;
2727
import java.io.DataOutputStream;
2828
import java.io.IOException;
29+
import java.net.InetAddress;
2930
import java.net.InetSocketAddress;
31+
import java.net.NoRouteToHostException;
3032
import java.net.ServerSocket;
3133
import java.net.Socket;
3234
import java.net.SocketException;
3335
import java.net.SocketTimeoutException;
3436
import java.nio.BufferUnderflowException;
3537
import java.nio.ByteBuffer;
3638
import java.nio.channels.UnresolvedAddressException;
39+
import java.util.ArrayList;
3740
import java.util.Collections;
3841
import java.util.Enumeration;
3942
import java.util.HashSet;
@@ -197,11 +200,11 @@ static public class Message {
197200
*/
198201
static public class InitialMessage {
199202
public Long sid;
200-
public InetSocketAddress electionAddr;
203+
public List<InetSocketAddress> electionAddr;
201204

202-
InitialMessage(Long sid, InetSocketAddress address) {
205+
InitialMessage(Long sid, List<InetSocketAddress> addresses) {
203206
this.sid = sid;
204-
this.electionAddr = address;
207+
this.electionAddr = addresses;
205208
}
206209

207210
@SuppressWarnings("serial")
@@ -237,28 +240,33 @@ static public InitialMessage parse(Long protocolVersion, DataInputStream din)
237240
num_read, remaining, sid);
238241
}
239242

240-
String addr = new String(b);
241-
String[] host_port;
242-
try {
243-
host_port = ConfigUtils.getHostAndPort(addr);
244-
} catch (ConfigException e) {
245-
throw new InitialMessageException("Badly formed address: %s", addr);
246-
}
243+
String[] addressStrings = new String(b).split(",");
244+
List<InetSocketAddress> addresses = new ArrayList<>(addressStrings.length);
245+
for(String addr : addressStrings) {
247246

248-
if (host_port.length != 2) {
249-
throw new InitialMessageException("Badly formed address: %s", addr);
250-
}
247+
String[] host_port;
248+
try {
249+
host_port = ConfigUtils.getHostAndPort(addr);
250+
} catch (ConfigException e) {
251+
throw new InitialMessageException("Badly formed address: %s", addr);
252+
}
251253

252-
int port;
253-
try {
254-
port = Integer.parseInt(host_port[1]);
255-
} catch (NumberFormatException e) {
256-
throw new InitialMessageException("Bad port number: %s", host_port[1]);
257-
} catch (ArrayIndexOutOfBoundsException e) {
258-
throw new InitialMessageException("No port number in: %s", addr);
254+
if (host_port.length != 2) {
255+
throw new InitialMessageException("Badly formed address: %s", addr);
256+
}
257+
258+
int port;
259+
try {
260+
port = Integer.parseInt(host_port[1]);
261+
} catch (NumberFormatException e) {
262+
throw new InitialMessageException("Bad port number: %s", host_port[1]);
263+
} catch (ArrayIndexOutOfBoundsException e) {
264+
throw new InitialMessageException("No port number in: %s", addr);
265+
}
266+
addresses.add(new InetSocketAddress(host_port[0], port));
259267
}
260268

261-
return new InitialMessage(sid, new InetSocketAddress(host_port[0], port));
269+
return new InitialMessage(sid, addresses);
262270
}
263271
}
264272

@@ -424,8 +432,8 @@ private boolean startConnection(Socket sock, Long sid)
424432
// represents protocol version (in other words - message type)
425433
dout.writeLong(PROTOCOL_VERSION);
426434
dout.writeLong(self.getId());
427-
InetSocketAddress address = self.getElectionAddress().getReachableOrOne();
428-
String addr = formatInetAddr(address);
435+
String addr = self.getElectionAddress().getAllAddresses().stream()
436+
.map(NetUtils::formatInetAddr).collect(Collectors.joining(","));
429437
byte[] addr_bytes = addr.getBytes();
430438
dout.writeInt(addr_bytes.length);
431439
dout.write(addr_bytes);
@@ -532,7 +540,7 @@ public void run() {
532540
private void handleConnection(Socket sock, DataInputStream din)
533541
throws IOException {
534542
Long sid = null, protocolVersion = null;
535-
InetSocketAddress electionAddr = null;
543+
MultipleAddresses electionAddr = null;
536544

537545
try {
538546
protocolVersion = din.readLong();
@@ -542,7 +550,7 @@ private void handleConnection(Socket sock, DataInputStream din)
542550
try {
543551
InitialMessage init = InitialMessage.parse(protocolVersion, din);
544552
sid = init.sid;
545-
electionAddr = init.electionAddr;
553+
electionAddr = new MultipleAddresses(init.electionAddr);
546554
} catch (InitialMessage.InitialMessageException ex) {
547555
LOG.error(ex.toString());
548556
closeSocket(sock);
@@ -585,7 +593,7 @@ private void handleConnection(Socket sock, DataInputStream din)
585593
closeSocket(sock);
586594

587595
if (electionAddr != null) {
588-
connectOne(sid, new MultipleAddresses(electionAddr));
596+
connectOne(sid, electionAddr);
589597
} else {
590598
connectOne(sid);
591599
}
@@ -648,6 +656,10 @@ public void toSend(Long sid, ByteBuffer b) {
648656
synchronized boolean connectOne(long sid, MultipleAddresses electionAddr){
649657
if (senderWorkerMap.get(sid) != null) {
650658
LOG.debug("There is a connection already for server {}", sid);
659+
// since ZOOKEEPER-3188 we can use multiple election addresses to reach a server. It is possible, that the
660+
// one we are using is already dead and if we need to clean-up, so when we will create a new connection
661+
// then we will choose an other one, which is actually reachable
662+
senderWorkerMap.get(sid).asyncValidateIfSocketIsStillReachable();
651663
return true;
652664
}
653665

@@ -660,7 +672,7 @@ synchronized boolean connectOne(long sid, MultipleAddresses electionAddr){
660672
sock = new Socket();
661673
}
662674
setSockOpts(sock);
663-
sock.connect(electionAddr.getReachableOrOne(), cnxTO);
675+
sock.connect(electionAddr.getReachableAddress(), cnxTO);
664676
if (sock instanceof SSLSocket) {
665677
SSLSocket sslSock = (SSLSocket) sock;
666678
sslSock.startHandshake();
@@ -692,6 +704,10 @@ synchronized boolean connectOne(long sid, MultipleAddresses electionAddr){
692704
+ " at election address " + electionAddr, e);
693705
closeSocket(sock);
694706
return false;
707+
} catch (NoRouteToHostException e) {
708+
LOG.warn("None of the addresses ({}) are reachable for sid {}", electionAddr, sid, e);
709+
closeSocket(sock);
710+
return false;
695711
} catch (IOException e) {
696712
LOG.warn("Cannot open channel to " + sid
697713
+ " at election address " + electionAddr,
@@ -709,6 +725,10 @@ synchronized boolean connectOne(long sid, MultipleAddresses electionAddr){
709725
synchronized void connectOne(long sid){
710726
if (senderWorkerMap.get(sid) != null) {
711727
LOG.debug("There is a connection already for server {}", sid);
728+
// since ZOOKEEPER-3188 we can use multiple election addresses to reach a server. It is possible, that the
729+
// one we are using is already dead and if we need to clean-up, so when we will create a new connection
730+
// then we will choose an other one, which is actually reachable
731+
senderWorkerMap.get(sid).asyncValidateIfSocketIsStillReachable();
712732
return;
713733
}
714734
synchronized (self.QV_LOCK) {
@@ -1100,6 +1120,7 @@ class SendWorker extends ZooKeeperThread {
11001120
RecvWorker recvWorker;
11011121
volatile boolean running = true;
11021122
DataOutputStream dout;
1123+
AtomicBoolean ongoingAsyncValidation = new AtomicBoolean(false);
11031124

11041125
/**
11051126
* An instance of this thread receives messages to send
@@ -1239,6 +1260,37 @@ public void run() {
12391260
this.finish();
12401261
LOG.warn("Send worker leaving thread " + " id " + sid + " my id = " + self.getId());
12411262
}
1263+
1264+
public void asyncValidateIfSocketIsStillReachable() {
1265+
if(ongoingAsyncValidation.compareAndSet(false, true)) {
1266+
Thread validator = new Thread(() -> {
1267+
LOG.debug("validate if destination address is reachable for sid {}", sid);
1268+
if(sock != null) {
1269+
InetAddress address = sock.getInetAddress();
1270+
try {
1271+
if (address.isReachable(500)) {
1272+
LOG.debug("destination address {} is reachable for sid {}", address.toString(), sid);
1273+
return;
1274+
}
1275+
} catch (NullPointerException | IOException ignored) {
1276+
}
1277+
LOG.warn("destination address {} not reachable anymore, shutting down the SendWorker for sid {}", address.toString(), sid);
1278+
this.finish();
1279+
}
1280+
});
1281+
validator.start();
1282+
try {
1283+
validator.join();
1284+
} catch (InterruptedException ignored) {
1285+
// we don't care if the validation was interrupted. If SenderWorker is not working, we will
1286+
// try to connect and re-validate later
1287+
}
1288+
ongoingAsyncValidation.set(false);
1289+
} else {
1290+
LOG.debug("validation of destination address for sid {} is skipped (it is already running)", sid);
1291+
}
1292+
}
1293+
12421294
}
12431295

12441296
/**

zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -480,8 +480,7 @@ public void testBadPeerAddressInQuorum() throws Exception {
480480
LineNumberReader r = new LineNumberReader(new StringReader(os.toString()));
481481
String line;
482482
boolean found = false;
483-
Pattern p =
484-
Pattern.compile(".*Cannot open channel to .* at election address .*");
483+
Pattern p = Pattern.compile(".*None of the addresses .* are reachable for sid 2");
485484
while ((line = r.readLine()) != null) {
486485
found = p.matcher(line).matches();
487486
if (found) {

0 commit comments

Comments
 (0)