@@ -65,7 +65,6 @@ public CuratorDruidLeaderSelector(CuratorFramework curator, @Self DruidNode self
6565 this .latchPath = latchPath ;
6666 this .leaderLatch .set (createNewLeaderLatch ());
6767
68- // Adding ConnectionStateListener to handle session changes using a method reference
6968 curator .getConnectionStateListenable ().addListener (this ::handleConnectionStateChanged );
7069 }
7170
@@ -97,13 +96,13 @@ public void isLeader()
9796
9897 // give others a chance to become leader.
9998 CloseableUtils .closeAndSuppressExceptions (
100- createNewLeaderLatchWithListener (),
101- e -> log .warn ("Could not close old leader latch; continuing with new one anyway." )
99+ createNewLeaderLatchWithListener (),
100+ e -> log .warn ("Could not close old leader latch; continuing with new one anyway." )
102101 );
103102
104103 leader = false ;
105104 try {
106- // Small delay before starting the latch so that others waiting are chosen to become leader.
105+ //Small delay before starting the latch so that others waiting are chosen to become leader.
107106 Thread .sleep (ThreadLocalRandom .current ().nextInt (1000 , 5000 ));
108107 leaderLatch .get ().start ();
109108 }
@@ -132,7 +131,9 @@ public void notLeader()
132131 log .makeAlert (ex , "listener.stopBeingLeader() failed. Unable to stopBeingLeader" ).emit ();
133132 }
134133 }
135- }, listenerExecutor );
134+ },
135+ listenerExecutor
136+ );
136137
137138 return leaderLatch .getAndSet (newLeaderLatch );
138139 }
@@ -209,7 +210,9 @@ public void unregisterListener()
209210 listenerExecutor .shutdownNow ();
210211 }
211212
212- // Method to handle connection state changes
213+ /**
214+ * Handles connection state changes. Recreates the leader latch if connection to zookeeper is lost.
215+ */
213216 private void handleConnectionStateChanged (CuratorFramework client , ConnectionState newState )
214217 {
215218 switch (newState ) {
@@ -228,17 +231,23 @@ private void handleConnectionStateChanged(CuratorFramework client, ConnectionSta
228231
229232 private void recreateLeaderLatch ()
230233 {
231- // Close existing leader latch
232- CloseableUtils .closeAndSuppressExceptions (leaderLatch .get (), e -> log .warn (e , "Failed to close LeaderLatch." ));
234+ // give others a chance to become leader.
235+ CloseableUtils .closeAndSuppressExceptions (
236+ createNewLeaderLatchWithListener (),
237+ e -> log .warn ("Could not close old leader latch; continuing with new one anyway." )
238+ );
233239
234- // Create and start a new leader latch
235- LeaderLatch newLeaderLatch = createNewLeaderLatchWithListener ();
240+ leader = false ;
236241 try {
237- newLeaderLatch .start ();
242+ //Small delay before starting the latch so that others waiting are chosen to become leader.
243+ Thread .sleep (ThreadLocalRandom .current ().nextInt (1000 , 5000 ));
244+ leaderLatch .get ().start ();
238245 }
239- catch (Exception ex ) {
240- throw new RuntimeException ("Failed to start new LeaderLatch after session change" , ex );
246+ catch (Exception e ) {
247+ // If an exception gets thrown out here, then the node will zombie out 'cause it won't be looking for
248+ // the latch anymore. I don't believe it's actually possible for an Exception to throw out here, but
249+ // Curator likes to have "throws Exception" on methods so it might happen...
250+ log .makeAlert (e , "I am a zombie" ).emit ();
241251 }
242- leaderLatch .set (newLeaderLatch );
243252 }
244253}
0 commit comments