Skip to content

Commit 037bf81

Browse files
committed
lock by leadershipLock object
Signed-off-by: tison <[email protected]>
1 parent 8a76593 commit 037bf81

File tree

1 file changed

+84
-74
lines changed
  • curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader

1 file changed

+84
-74
lines changed

curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java

Lines changed: 84 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@ public class LeaderLatch implements Closeable {
7676
private final AtomicReference<Future<?>> startTask = new AtomicReference<>();
7777
private final ConnectionStateListener listener = (client, newState) -> handleStateChange(newState);
7878

79+
private final Object leadershipLock = new Object();
80+
7981
public enum State {
8082
LATENT,
8183
STARTED,
@@ -164,32 +166,34 @@ public void close() throws IOException {
164166
* @param closeMode allows the default close mode to be overridden at the time the latch is closed.
165167
* @throws IOException errors
166168
*/
167-
public synchronized void close(CloseMode closeMode) throws IOException {
168-
Preconditions.checkState(state.compareAndSet(State.STARTED, State.CLOSED), "Already closed or has not been started");
169-
Preconditions.checkNotNull(closeMode, "closeMode cannot be null");
170-
171-
cancelStartTask();
172-
173-
try {
174-
setNode(null);
175-
client.removeWatchers();
176-
} catch (Exception e) {
177-
ThreadUtils.checkInterrupted(e);
178-
throw new IOException(e);
179-
} finally {
180-
client.getConnectionStateListenable().removeListener(listener);
181-
182-
switch (closeMode) {
183-
case NOTIFY_LEADER: {
184-
setLeadership(false);
185-
listeners.clear();
186-
break;
187-
}
169+
public void close(CloseMode closeMode) throws IOException {
170+
synchronized (leadershipLock) {
171+
Preconditions.checkState(state.compareAndSet(State.STARTED, State.CLOSED), "Already closed or has not been started");
172+
Preconditions.checkNotNull(closeMode, "closeMode cannot be null");
188173

189-
case SILENT: {
190-
listeners.clear();
191-
setLeadership(false);
192-
break;
174+
cancelStartTask();
175+
176+
try {
177+
setNode(null);
178+
client.removeWatchers();
179+
} catch (Exception e) {
180+
ThreadUtils.checkInterrupted(e);
181+
throw new IOException(e);
182+
} finally {
183+
client.getConnectionStateListenable().removeListener(listener);
184+
185+
switch (closeMode) {
186+
case NOTIFY_LEADER: {
187+
setLeadership(false);
188+
listeners.clear();
189+
break;
190+
}
191+
192+
case SILENT: {
193+
listeners.clear();
194+
setLeadership(false);
195+
break;
196+
}
193197
}
194198
}
195199
}
@@ -275,9 +279,9 @@ public void removeListener(LeaderLatchListener listener) {
275279
* while waiting
276280
*/
277281
public void await() throws InterruptedException, EOFException {
278-
synchronized (this) {
282+
synchronized (leadershipLock) {
279283
while ((state.get() == State.STARTED) && !hasLeadership.get()) {
280-
wait();
284+
leadershipLock.wait();
281285
}
282286
}
283287
if (state.get() != State.STARTED) {
@@ -326,7 +330,7 @@ public void await() throws InterruptedException, EOFException {
326330
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
327331
long waitNanos = TimeUnit.NANOSECONDS.convert(timeout, unit);
328332

329-
synchronized (this) {
333+
synchronized (leadershipLock) {
330334
while (true) {
331335
if (state.get() != State.STARTED) {
332336
return false;
@@ -341,7 +345,7 @@ public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
341345
}
342346

343347
long startNanos = System.nanoTime();
344-
TimeUnit.NANOSECONDS.timedWait(this, waitNanos);
348+
TimeUnit.NANOSECONDS.timedWait(leadershipLock, waitNanos);
345349
long elapsed = System.nanoTime() - startNanos;
346350
waitNanos -= elapsed;
347351
}
@@ -478,56 +482,60 @@ void reset() throws Exception {
478482
client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback).forPath(ZKPaths.makePath(latchPath, LOCK_NAME), LeaderSelector.getIdBytes(id));
479483
}
480484

481-
private synchronized void internalStart() {
482-
if (state.get() == State.STARTED) {
483-
client.getConnectionStateListenable().addListener(listener);
484-
try {
485-
reset();
486-
} catch (Exception e) {
487-
ThreadUtils.checkInterrupted(e);
488-
log.error("An error occurred checking resetting leadership.", e);
485+
private void internalStart() {
486+
synchronized (leadershipLock) {
487+
if (state.get() == State.STARTED) {
488+
client.getConnectionStateListenable().addListener(listener);
489+
try {
490+
reset();
491+
} catch (Exception e) {
492+
ThreadUtils.checkInterrupted(e);
493+
log.error("An error occurred checking resetting leadership.", e);
494+
}
489495
}
490496
}
491497
}
492498

493499
@VisibleForTesting
494500
volatile CountDownLatch debugCheckLeaderShipLatch = null;
495501

496-
private synchronized void checkLeadership(List<String> children) throws Exception {
502+
private void checkLeadership(List<String> children) throws Exception {
497503
if (debugCheckLeaderShipLatch != null) {
498504
debugCheckLeaderShipLatch.await();
499505
}
500506

501-
final String localOurPath = ourPath.get();
502-
List<String> sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children);
503-
int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1;
504-
if (ourIndex < 0) {
505-
log.error("Can't find our node. Resetting. Index: " + ourIndex);
506-
reset();
507-
} else if (ourIndex == 0) {
508-
lastPathIsLeader.set(localOurPath);
509-
setLeadership(true);
510-
} else {
511-
String watchPath = sortedChildren.get(ourIndex - 1);
512-
Watcher watcher = event -> {
513-
if (state.get() == State.STARTED && event.getType() == Watcher.Event.EventType.NodeDeleted) {
514-
try {
515-
getChildren();
516-
} catch (Exception ex) {
517-
ThreadUtils.checkInterrupted(ex);
518-
log.error("An error occurred checking the leadership.", ex);
507+
synchronized (leadershipLock) {
508+
final String localOurPath = ourPath.get();
509+
List<String> sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children);
510+
int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1;
511+
if (ourIndex < 0) {
512+
log.error("Can't find our node. Resetting. Index: " + ourIndex);
513+
reset();
514+
} else if (ourIndex == 0) {
515+
lastPathIsLeader.set(localOurPath);
516+
setLeadership(true);
517+
} else {
518+
String watchPath = sortedChildren.get(ourIndex - 1);
519+
Watcher watcher = event -> {
520+
if (state.get() == State.STARTED && event.getType() == Watcher.Event.EventType.NodeDeleted) {
521+
try {
522+
getChildren();
523+
} catch (Exception ex) {
524+
ThreadUtils.checkInterrupted(ex);
525+
log.error("An error occurred checking the leadership.", ex);
526+
}
519527
}
520-
}
521-
};
528+
};
522529

523-
BackgroundCallback callback = (client, event) -> {
524-
if (event.getResultCode() == KeeperException.Code.NONODE.intValue()) {
525-
// previous node is gone - retry getChildren
526-
getChildren();
527-
}
528-
};
529-
// use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
530-
client.getData().usingWatcher(watcher).inBackground(callback).forPath(ZKPaths.makePath(latchPath, watchPath));
530+
BackgroundCallback callback = (client, event) -> {
531+
if (event.getResultCode() == KeeperException.Code.NONODE.intValue()) {
532+
// previous node is gone - retry getChildren
533+
getChildren();
534+
}
535+
};
536+
// use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
537+
client.getData().usingWatcher(watcher).inBackground(callback).forPath(ZKPaths.makePath(latchPath, watchPath));
538+
}
531539
}
532540
}
533541

@@ -575,16 +583,18 @@ protected void handleStateChange(ConnectionState newState) {
575583
}
576584
}
577585

578-
private synchronized void setLeadership(boolean newValue) {
579-
boolean oldValue = hasLeadership.getAndSet(newValue);
586+
private void setLeadership(boolean newValue) {
587+
synchronized (leadershipLock) {
588+
boolean oldValue = hasLeadership.getAndSet(newValue);
580589

581-
if (oldValue && !newValue) { // Lost leadership, was true, now false
582-
listeners.forEach(LeaderLatchListener::notLeader);
583-
} else if (!oldValue && newValue) { // Gained leadership, was false, now true
584-
listeners.forEach(LeaderLatchListener::isLeader);
585-
}
590+
if (oldValue && !newValue) { // Lost leadership, was true, now false
591+
listeners.forEach(LeaderLatchListener::notLeader);
592+
} else if (!oldValue && newValue) { // Gained leadership, was false, now true
593+
listeners.forEach(LeaderLatchListener::isLeader);
594+
}
586595

587-
notifyAll();
596+
leadershipLock.notifyAll();
597+
}
588598
}
589599

590600
private void setNode(String newValue) throws Exception {

0 commit comments

Comments
 (0)