@@ -76,6 +76,8 @@ public class LeaderLatch implements Closeable {
7676 private final AtomicReference <Future <?>> startTask = new AtomicReference <>();
7777 private final ConnectionStateListener listener = (client , newState ) -> handleStateChange (newState );
7878
79+ private final Object leadershipLock = new Object ();
80+
7981 public enum State {
8082 LATENT ,
8183 STARTED ,
@@ -164,32 +166,34 @@ public void close() throws IOException {
164166 * @param closeMode allows the default close mode to be overridden at the time the latch is closed.
165167 * @throws IOException errors
166168 */
167- public synchronized void close (CloseMode closeMode ) throws IOException {
168- Preconditions .checkState (state .compareAndSet (State .STARTED , State .CLOSED ), "Already closed or has not been started" );
169- Preconditions .checkNotNull (closeMode , "closeMode cannot be null" );
170-
171- cancelStartTask ();
172-
173- try {
174- setNode (null );
175- client .removeWatchers ();
176- } catch (Exception e ) {
177- ThreadUtils .checkInterrupted (e );
178- throw new IOException (e );
179- } finally {
180- client .getConnectionStateListenable ().removeListener (listener );
181-
182- switch (closeMode ) {
183- case NOTIFY_LEADER : {
184- setLeadership (false );
185- listeners .clear ();
186- break ;
187- }
169+ public void close (CloseMode closeMode ) throws IOException {
170+ synchronized (leadershipLock ) {
171+ Preconditions .checkState (state .compareAndSet (State .STARTED , State .CLOSED ), "Already closed or has not been started" );
172+ Preconditions .checkNotNull (closeMode , "closeMode cannot be null" );
188173
189- case SILENT : {
190- listeners .clear ();
191- setLeadership (false );
192- break ;
174+ cancelStartTask ();
175+
176+ try {
177+ setNode (null );
178+ client .removeWatchers ();
179+ } catch (Exception e ) {
180+ ThreadUtils .checkInterrupted (e );
181+ throw new IOException (e );
182+ } finally {
183+ client .getConnectionStateListenable ().removeListener (listener );
184+
185+ switch (closeMode ) {
186+ case NOTIFY_LEADER : {
187+ setLeadership (false );
188+ listeners .clear ();
189+ break ;
190+ }
191+
192+ case SILENT : {
193+ listeners .clear ();
194+ setLeadership (false );
195+ break ;
196+ }
193197 }
194198 }
195199 }
@@ -275,9 +279,9 @@ public void removeListener(LeaderLatchListener listener) {
275279 * while waiting
276280 */
277281 public void await () throws InterruptedException , EOFException {
278- synchronized (this ) {
282+ synchronized (leadershipLock ) {
279283 while ((state .get () == State .STARTED ) && !hasLeadership .get ()) {
280- wait ();
284+ leadershipLock . wait ();
281285 }
282286 }
283287 if (state .get () != State .STARTED ) {
@@ -326,7 +330,7 @@ public void await() throws InterruptedException, EOFException {
326330 public boolean await (long timeout , TimeUnit unit ) throws InterruptedException {
327331 long waitNanos = TimeUnit .NANOSECONDS .convert (timeout , unit );
328332
329- synchronized (this ) {
333+ synchronized (leadershipLock ) {
330334 while (true ) {
331335 if (state .get () != State .STARTED ) {
332336 return false ;
@@ -341,7 +345,7 @@ public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
341345 }
342346
343347 long startNanos = System .nanoTime ();
344- TimeUnit .NANOSECONDS .timedWait (this , waitNanos );
348+ TimeUnit .NANOSECONDS .timedWait (leadershipLock , waitNanos );
345349 long elapsed = System .nanoTime () - startNanos ;
346350 waitNanos -= elapsed ;
347351 }
@@ -478,56 +482,60 @@ void reset() throws Exception {
478482 client .create ().creatingParentContainersIfNeeded ().withProtection ().withMode (CreateMode .EPHEMERAL_SEQUENTIAL ).inBackground (callback ).forPath (ZKPaths .makePath (latchPath , LOCK_NAME ), LeaderSelector .getIdBytes (id ));
479483 }
480484
481- private synchronized void internalStart () {
482- if (state .get () == State .STARTED ) {
483- client .getConnectionStateListenable ().addListener (listener );
484- try {
485- reset ();
486- } catch (Exception e ) {
487- ThreadUtils .checkInterrupted (e );
488- log .error ("An error occurred checking resetting leadership." , e );
485+ private void internalStart () {
486+ synchronized (leadershipLock ) {
487+ if (state .get () == State .STARTED ) {
488+ client .getConnectionStateListenable ().addListener (listener );
489+ try {
490+ reset ();
491+ } catch (Exception e ) {
492+ ThreadUtils .checkInterrupted (e );
493+ log .error ("An error occurred checking resetting leadership." , e );
494+ }
489495 }
490496 }
491497 }
492498
493499 @ VisibleForTesting
494500 volatile CountDownLatch debugCheckLeaderShipLatch = null ;
495501
496- private synchronized void checkLeadership (List <String > children ) throws Exception {
502+ private void checkLeadership (List <String > children ) throws Exception {
497503 if (debugCheckLeaderShipLatch != null ) {
498504 debugCheckLeaderShipLatch .await ();
499505 }
500506
501- final String localOurPath = ourPath .get ();
502- List <String > sortedChildren = LockInternals .getSortedChildren (LOCK_NAME , sorter , children );
503- int ourIndex = (localOurPath != null ) ? sortedChildren .indexOf (ZKPaths .getNodeFromPath (localOurPath )) : -1 ;
504- if (ourIndex < 0 ) {
505- log .error ("Can't find our node. Resetting. Index: " + ourIndex );
506- reset ();
507- } else if (ourIndex == 0 ) {
508- lastPathIsLeader .set (localOurPath );
509- setLeadership (true );
510- } else {
511- String watchPath = sortedChildren .get (ourIndex - 1 );
512- Watcher watcher = event -> {
513- if (state .get () == State .STARTED && event .getType () == Watcher .Event .EventType .NodeDeleted ) {
514- try {
515- getChildren ();
516- } catch (Exception ex ) {
517- ThreadUtils .checkInterrupted (ex );
518- log .error ("An error occurred checking the leadership." , ex );
507+ synchronized (leadershipLock ) {
508+ final String localOurPath = ourPath .get ();
509+ List <String > sortedChildren = LockInternals .getSortedChildren (LOCK_NAME , sorter , children );
510+ int ourIndex = (localOurPath != null ) ? sortedChildren .indexOf (ZKPaths .getNodeFromPath (localOurPath )) : -1 ;
511+ if (ourIndex < 0 ) {
512+ log .error ("Can't find our node. Resetting. Index: " + ourIndex );
513+ reset ();
514+ } else if (ourIndex == 0 ) {
515+ lastPathIsLeader .set (localOurPath );
516+ setLeadership (true );
517+ } else {
518+ String watchPath = sortedChildren .get (ourIndex - 1 );
519+ Watcher watcher = event -> {
520+ if (state .get () == State .STARTED && event .getType () == Watcher .Event .EventType .NodeDeleted ) {
521+ try {
522+ getChildren ();
523+ } catch (Exception ex ) {
524+ ThreadUtils .checkInterrupted (ex );
525+ log .error ("An error occurred checking the leadership." , ex );
526+ }
519527 }
520- }
521- };
528+ };
522529
523- BackgroundCallback callback = (client , event ) -> {
524- if (event .getResultCode () == KeeperException .Code .NONODE .intValue ()) {
525- // previous node is gone - retry getChildren
526- getChildren ();
527- }
528- };
529- // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
530- client .getData ().usingWatcher (watcher ).inBackground (callback ).forPath (ZKPaths .makePath (latchPath , watchPath ));
530+ BackgroundCallback callback = (client , event ) -> {
531+ if (event .getResultCode () == KeeperException .Code .NONODE .intValue ()) {
532+ // previous node is gone - retry getChildren
533+ getChildren ();
534+ }
535+ };
536+ // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
537+ client .getData ().usingWatcher (watcher ).inBackground (callback ).forPath (ZKPaths .makePath (latchPath , watchPath ));
538+ }
531539 }
532540 }
533541
@@ -575,16 +583,18 @@ protected void handleStateChange(ConnectionState newState) {
575583 }
576584 }
577585
578- private synchronized void setLeadership (boolean newValue ) {
579- boolean oldValue = hasLeadership .getAndSet (newValue );
586+ private void setLeadership (boolean newValue ) {
587+ synchronized (leadershipLock ) {
588+ boolean oldValue = hasLeadership .getAndSet (newValue );
580589
581- if (oldValue && !newValue ) { // Lost leadership, was true, now false
582- listeners .forEach (LeaderLatchListener ::notLeader );
583- } else if (!oldValue && newValue ) { // Gained leadership, was false, now true
584- listeners .forEach (LeaderLatchListener ::isLeader );
585- }
590+ if (oldValue && !newValue ) { // Lost leadership, was true, now false
591+ listeners .forEach (LeaderLatchListener ::notLeader );
592+ } else if (!oldValue && newValue ) { // Gained leadership, was false, now true
593+ listeners .forEach (LeaderLatchListener ::isLeader );
594+ }
586595
587- notifyAll ();
596+ leadershipLock .notifyAll ();
597+ }
588598 }
589599
590600 private void setNode (String newValue ) throws Exception {
0 commit comments