2626import java .io .DataInputStream ;
2727import java .io .DataOutputStream ;
2828import java .io .IOException ;
29+ import java .net .InetAddress ;
2930import java .net .InetSocketAddress ;
31+ import java .net .NoRouteToHostException ;
3032import java .net .ServerSocket ;
3133import java .net .Socket ;
3234import java .net .SocketException ;
3335import java .net .SocketTimeoutException ;
3436import java .nio .BufferUnderflowException ;
3537import java .nio .ByteBuffer ;
3638import java .nio .channels .UnresolvedAddressException ;
39+ import java .util .ArrayList ;
3740import java .util .Collections ;
3841import java .util .Enumeration ;
3942import java .util .HashSet ;
@@ -197,11 +200,11 @@ static public class Message {
197200 */
198201 static public class InitialMessage {
199202 public Long sid ;
200- public InetSocketAddress electionAddr ;
203+ public List < InetSocketAddress > electionAddr ;
201204
202- InitialMessage (Long sid , InetSocketAddress address ) {
205+ InitialMessage (Long sid , List < InetSocketAddress > addresses ) {
203206 this .sid = sid ;
204- this .electionAddr = address ;
207+ this .electionAddr = addresses ;
205208 }
206209
207210 @ SuppressWarnings ("serial" )
@@ -237,28 +240,33 @@ static public InitialMessage parse(Long protocolVersion, DataInputStream din)
237240 num_read , remaining , sid );
238241 }
239242
240- String addr = new String (b );
241- String [] host_port ;
242- try {
243- host_port = ConfigUtils .getHostAndPort (addr );
244- } catch (ConfigException e ) {
245- throw new InitialMessageException ("Badly formed address: %s" , addr );
246- }
243+ String [] addressStrings = new String (b ).split ("," );
244+ List <InetSocketAddress > addresses = new ArrayList <>(addressStrings .length );
245+ for (String addr : addressStrings ) {
247246
248- if (host_port .length != 2 ) {
249- throw new InitialMessageException ("Badly formed address: %s" , addr );
250- }
247+ String [] host_port ;
248+ try {
249+ host_port = ConfigUtils .getHostAndPort (addr );
250+ } catch (ConfigException e ) {
251+ throw new InitialMessageException ("Badly formed address: %s" , addr );
252+ }
251253
252- int port ;
253- try {
254- port = Integer .parseInt (host_port [1 ]);
255- } catch (NumberFormatException e ) {
256- throw new InitialMessageException ("Bad port number: %s" , host_port [1 ]);
257- } catch (ArrayIndexOutOfBoundsException e ) {
258- throw new InitialMessageException ("No port number in: %s" , addr );
254+ if (host_port .length != 2 ) {
255+ throw new InitialMessageException ("Badly formed address: %s" , addr );
256+ }
257+
258+ int port ;
259+ try {
260+ port = Integer .parseInt (host_port [1 ]);
261+ } catch (NumberFormatException e ) {
262+ throw new InitialMessageException ("Bad port number: %s" , host_port [1 ]);
263+ } catch (ArrayIndexOutOfBoundsException e ) {
264+ throw new InitialMessageException ("No port number in: %s" , addr );
265+ }
266+ addresses .add (new InetSocketAddress (host_port [0 ], port ));
259267 }
260268
261- return new InitialMessage (sid , new InetSocketAddress ( host_port [ 0 ], port ) );
269+ return new InitialMessage (sid , addresses );
262270 }
263271 }
264272
@@ -424,8 +432,8 @@ private boolean startConnection(Socket sock, Long sid)
424432 // represents protocol version (in other words - message type)
425433 dout .writeLong (PROTOCOL_VERSION );
426434 dout .writeLong (self .getId ());
427- InetSocketAddress address = self .getElectionAddress ().getReachableOrOne ();
428- String addr = formatInetAddr ( address );
435+ String addr = self .getElectionAddress ().getAllAddresses (). stream ()
436+ . map ( NetUtils :: formatInetAddr ). collect ( Collectors . joining ( "," ) );
429437 byte [] addr_bytes = addr .getBytes ();
430438 dout .writeInt (addr_bytes .length );
431439 dout .write (addr_bytes );
@@ -532,7 +540,7 @@ public void run() {
532540 private void handleConnection (Socket sock , DataInputStream din )
533541 throws IOException {
534542 Long sid = null , protocolVersion = null ;
535- InetSocketAddress electionAddr = null ;
543+ MultipleAddresses electionAddr = null ;
536544
537545 try {
538546 protocolVersion = din .readLong ();
@@ -542,7 +550,7 @@ private void handleConnection(Socket sock, DataInputStream din)
542550 try {
543551 InitialMessage init = InitialMessage .parse (protocolVersion , din );
544552 sid = init .sid ;
545- electionAddr = init .electionAddr ;
553+ electionAddr = new MultipleAddresses ( init .electionAddr ) ;
546554 } catch (InitialMessage .InitialMessageException ex ) {
547555 LOG .error (ex .toString ());
548556 closeSocket (sock );
@@ -585,7 +593,7 @@ private void handleConnection(Socket sock, DataInputStream din)
585593 closeSocket (sock );
586594
587595 if (electionAddr != null ) {
588- connectOne (sid , new MultipleAddresses ( electionAddr ) );
596+ connectOne (sid , electionAddr );
589597 } else {
590598 connectOne (sid );
591599 }
@@ -648,6 +656,10 @@ public void toSend(Long sid, ByteBuffer b) {
648656 synchronized boolean connectOne (long sid , MultipleAddresses electionAddr ){
649657 if (senderWorkerMap .get (sid ) != null ) {
650658 LOG .debug ("There is a connection already for server {}" , sid );
659+ // since ZOOKEEPER-3188 we can use multiple election addresses to reach a server. It is possible, that the
660+ // one we are using is already dead and if we need to clean-up, so when we will create a new connection
661+ // then we will choose an other one, which is actually reachable
662+ senderWorkerMap .get (sid ).asyncValidateIfSocketIsStillReachable ();
651663 return true ;
652664 }
653665
@@ -660,7 +672,7 @@ synchronized boolean connectOne(long sid, MultipleAddresses electionAddr){
660672 sock = new Socket ();
661673 }
662674 setSockOpts (sock );
663- sock .connect (electionAddr .getReachableOrOne (), cnxTO );
675+ sock .connect (electionAddr .getReachableAddress (), cnxTO );
664676 if (sock instanceof SSLSocket ) {
665677 SSLSocket sslSock = (SSLSocket ) sock ;
666678 sslSock .startHandshake ();
@@ -692,6 +704,10 @@ synchronized boolean connectOne(long sid, MultipleAddresses electionAddr){
692704 + " at election address " + electionAddr , e );
693705 closeSocket (sock );
694706 return false ;
707+ } catch (NoRouteToHostException e ) {
708+ LOG .warn ("None of the addresses ({}) are reachable for sid {}" , electionAddr , sid , e );
709+ closeSocket (sock );
710+ return false ;
695711 } catch (IOException e ) {
696712 LOG .warn ("Cannot open channel to " + sid
697713 + " at election address " + electionAddr ,
@@ -709,6 +725,10 @@ synchronized boolean connectOne(long sid, MultipleAddresses electionAddr){
709725 synchronized void connectOne (long sid ){
710726 if (senderWorkerMap .get (sid ) != null ) {
711727 LOG .debug ("There is a connection already for server {}" , sid );
728+ // since ZOOKEEPER-3188 we can use multiple election addresses to reach a server. It is possible, that the
729+ // one we are using is already dead and if we need to clean-up, so when we will create a new connection
730+ // then we will choose an other one, which is actually reachable
731+ senderWorkerMap .get (sid ).asyncValidateIfSocketIsStillReachable ();
712732 return ;
713733 }
714734 synchronized (self .QV_LOCK ) {
@@ -1100,6 +1120,7 @@ class SendWorker extends ZooKeeperThread {
11001120 RecvWorker recvWorker ;
11011121 volatile boolean running = true ;
11021122 DataOutputStream dout ;
1123+ AtomicBoolean ongoingAsyncValidation = new AtomicBoolean (false );
11031124
11041125 /**
11051126 * An instance of this thread receives messages to send
@@ -1239,6 +1260,37 @@ public void run() {
12391260 this .finish ();
12401261 LOG .warn ("Send worker leaving thread " + " id " + sid + " my id = " + self .getId ());
12411262 }
1263+
1264+ public void asyncValidateIfSocketIsStillReachable () {
1265+ if (ongoingAsyncValidation .compareAndSet (false , true )) {
1266+ Thread validator = new Thread (() -> {
1267+ LOG .debug ("validate if destination address is reachable for sid {}" , sid );
1268+ if (sock != null ) {
1269+ InetAddress address = sock .getInetAddress ();
1270+ try {
1271+ if (address .isReachable (500 )) {
1272+ LOG .debug ("destination address {} is reachable for sid {}" , address .toString (), sid );
1273+ return ;
1274+ }
1275+ } catch (NullPointerException | IOException ignored ) {
1276+ }
1277+ LOG .warn ("destination address {} not reachable anymore, shutting down the SendWorker for sid {}" , address .toString (), sid );
1278+ this .finish ();
1279+ }
1280+ });
1281+ validator .start ();
1282+ try {
1283+ validator .join ();
1284+ } catch (InterruptedException ignored ) {
1285+ // we don't care if the validation was interrupted. If SenderWorker is not working, we will
1286+ // try to connect and re-validate later
1287+ }
1288+ ongoingAsyncValidation .set (false );
1289+ } else {
1290+ LOG .debug ("validation of destination address for sid {} is skipped (it is already running)" , sid );
1291+ }
1292+ }
1293+
12421294 }
12431295
12441296 /**
0 commit comments