|
18 | 18 |
|
19 | 19 | import com.google.common.base.Preconditions; |
20 | 20 | import com.netflix.curator.framework.CuratorFramework; |
| 21 | +import com.netflix.curator.framework.api.BackgroundCallback; |
| 22 | +import com.netflix.curator.framework.api.CuratorEvent; |
21 | 23 | import com.netflix.curator.framework.recipes.locks.LockInternals; |
22 | 24 | import com.netflix.curator.framework.recipes.locks.LockInternalsSorter; |
23 | 25 | import com.netflix.curator.framework.recipes.locks.StandardLockInternalsDriver; |
24 | 26 | import com.netflix.curator.framework.state.ConnectionState; |
25 | 27 | import com.netflix.curator.framework.state.ConnectionStateListener; |
26 | 28 | import com.netflix.curator.utils.ZKPaths; |
27 | 29 | import org.apache.zookeeper.CreateMode; |
| 30 | +import org.apache.zookeeper.KeeperException; |
28 | 31 | import org.apache.zookeeper.WatchedEvent; |
29 | 32 | import org.apache.zookeeper.Watcher; |
30 | 33 | import org.slf4j.Logger; |
@@ -53,7 +56,6 @@ public class LeaderLatch implements Closeable |
53 | 56 | private final String latchPath; |
54 | 57 | private final String id; |
55 | 58 | private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT); |
56 | | - private final AtomicReference<ConnectionState> connectionState = new AtomicReference<ConnectionState>(ConnectionState.CONNECTED); |
57 | 59 | private final AtomicBoolean hasLeadership = new AtomicBoolean(false); |
58 | 60 |
|
59 | 61 | private final ConnectionStateListener listener = new ConnectionStateListener() |
@@ -116,9 +118,7 @@ public void start() throws Exception |
116 | 118 | Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once"); |
117 | 119 |
|
118 | 120 | client.getConnectionStateListenable().addListener(listener); |
119 | | - |
120 | | - client.newNamespaceAwareEnsurePath(latchPath).ensure(client.getZookeeperClient()); |
121 | | - internalStart(); |
| 121 | + reset(); |
122 | 122 | } |
123 | 123 |
|
124 | 124 | /** |
@@ -305,86 +305,123 @@ public Participant getLeader() throws Exception |
305 | 305 | */ |
306 | 306 | public boolean hasLeadership() |
307 | 307 | { |
308 | | - return (state.get() == State.STARTED) && hasLeadership.get() && (connectionState.get() == ConnectionState.CONNECTED); |
| 308 | + return (state.get() == State.STARTED) && hasLeadership.get(); |
309 | 309 | } |
310 | 310 |
|
311 | | - private void internalStart() throws Exception |
| 311 | + private void reset() throws Exception |
312 | 312 | { |
313 | | - hasLeadership.set(false); |
| 313 | + setLeadership(false); |
314 | 314 | if ( ourPath != null ) |
315 | 315 | { |
316 | 316 | client.delete().guaranteed().inBackground().forPath(ourPath); |
317 | 317 | } |
318 | | - ourPath = client.create().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(ZKPaths.makePath(latchPath, LOCK_NAME), LeaderSelector.getIdBytes(id)); |
| 318 | + ourPath = null; |
319 | 319 |
|
320 | | - checkForLeadership(); |
| 320 | + BackgroundCallback callback = new BackgroundCallback() |
| 321 | + { |
| 322 | + @Override |
| 323 | + public void processResult(CuratorFramework client, CuratorEvent event) throws Exception |
| 324 | + { |
| 325 | + if ( event.getResultCode() == KeeperException.Code.OK.intValue() ) |
| 326 | + { |
| 327 | + ourPath = event.getName(); |
| 328 | + getChildren(); |
| 329 | + } |
| 330 | + else |
| 331 | + { |
| 332 | + // TBD |
| 333 | + System.out.println(event.getResultCode()); |
| 334 | + } |
| 335 | + } |
| 336 | + }; |
| 337 | + client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback).forPath(ZKPaths.makePath(latchPath, LOCK_NAME), LeaderSelector.getIdBytes(id)); |
321 | 338 | } |
322 | | - |
323 | | - private void checkForLeadership() throws Exception |
| 339 | + |
| 340 | + private void checkLeadership(List<String> children) throws Exception |
324 | 341 | { |
325 | | - List<String> sortedChildren = LockInternals.getSortedChildren(client, latchPath, LOCK_NAME, sorter); |
326 | | - if ( sortedChildren.size() == 0 ) |
| 342 | + List<String> sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children); |
| 343 | + int ourIndex = (ourPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(ourPath)) : -1; |
| 344 | + if ( ourIndex < 0 ) |
327 | 345 | { |
328 | | - throw new Exception("no children - unexpected state"); |
| 346 | + log.error("Can't find our node. Resetting. Index: " + ourIndex); |
| 347 | + reset(); |
329 | 348 | } |
330 | | - |
331 | | - int ourIndex = sortedChildren.indexOf(ZKPaths.getNodeFromPath(ourPath)); |
332 | | - if ( ourIndex == 0 ) |
| 349 | + else if ( ourIndex == 0 ) |
333 | 350 | { |
334 | 351 | setLeadership(true); |
335 | 352 | } |
336 | 353 | else |
337 | 354 | { |
338 | | - final String ourPathWhenWatched = ourPath; // protected against a lost/suspended connection and an old watcher - I'm not sure if this is possible but it can't hurt |
339 | 355 | String watchPath = sortedChildren.get(ourIndex - 1); |
340 | 356 | Watcher watcher = new Watcher() |
341 | 357 | { |
342 | 358 | @Override |
343 | 359 | public void process(WatchedEvent event) |
344 | 360 | { |
345 | | - if ( (event.getType() == Event.EventType.NodeDeleted) && (ourPath != null) && ourPath.equals(ourPathWhenWatched) ) |
| 361 | + if ( state.get() == State.STARTED ) |
346 | 362 | { |
347 | | - try |
348 | | - { |
349 | | - checkForLeadership(); |
350 | | - } |
351 | | - catch(Exception ex) |
352 | | - { |
353 | | - log.error("An error ocurred checking the leadership.", ex); |
354 | | - } |
| 363 | + try |
| 364 | + { |
| 365 | + getChildren(); |
| 366 | + } |
| 367 | + catch(Exception ex) |
| 368 | + { |
| 369 | + log.error("An error ocurred checking the leadership.", ex); |
| 370 | + } |
355 | 371 | } |
356 | 372 | } |
357 | 373 | }; |
358 | | - if ( client.checkExists().usingWatcher(watcher).forPath(ZKPaths.makePath(latchPath, watchPath)) == null ) |
| 374 | + |
| 375 | + BackgroundCallback callback = new BackgroundCallback() |
359 | 376 | { |
360 | | - //the previous Participant may be down, so we need to reevaluate the list |
361 | | - //to get the actual previous Participant or get the leadership |
362 | | - checkForLeadership(); |
363 | | - } |
| 377 | + @Override |
| 378 | + public void processResult(CuratorFramework client, CuratorEvent event) throws Exception |
| 379 | + { |
| 380 | + if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() ) |
| 381 | + { |
| 382 | + // previous node is gone - reset |
| 383 | + reset(); |
| 384 | + } |
| 385 | + } |
| 386 | + }; |
| 387 | + client.checkExists().usingWatcher(watcher).inBackground(callback).forPath(ZKPaths.makePath(latchPath, watchPath)); |
364 | 388 | } |
365 | 389 | } |
366 | 390 |
|
367 | | - private void handleStateChange(ConnectionState newState) |
| 391 | + private void getChildren() throws Exception |
368 | 392 | { |
369 | | - if ( newState == ConnectionState.RECONNECTED ) |
| 393 | + BackgroundCallback callback = new BackgroundCallback() |
370 | 394 | { |
371 | | - newState = ConnectionState.CONNECTED; |
372 | | - } |
| 395 | + @Override |
| 396 | + public void processResult(CuratorFramework client, CuratorEvent event) throws Exception |
| 397 | + { |
| 398 | + if ( event.getResultCode() == KeeperException.Code.OK.intValue() ) |
| 399 | + { |
| 400 | + checkLeadership(event.getChildren()); |
| 401 | + } |
| 402 | + } |
| 403 | + }; |
| 404 | + client.getChildren().inBackground(callback).forPath(latchPath); |
| 405 | + } |
373 | 406 |
|
374 | | - ConnectionState previousState = connectionState.getAndSet(newState); |
375 | | - if ( (previousState == ConnectionState.LOST) && (newState == ConnectionState.CONNECTED) ) |
| 407 | + private void handleStateChange(ConnectionState newState) |
| 408 | + { |
| 409 | + if ( newState == ConnectionState.RECONNECTED ) |
376 | 410 | { |
377 | 411 | try |
378 | 412 | { |
379 | | - internalStart(); |
| 413 | + reset(); |
380 | 414 | } |
381 | 415 | catch ( Exception e ) |
382 | 416 | { |
383 | 417 | log.error("Could not restart leader latch", e); |
384 | | - connectionState.set(ConnectionState.LOST); |
385 | 418 | setLeadership(false); |
386 | 419 | } |
387 | 420 | } |
| 421 | + else |
| 422 | + { |
| 423 | + setLeadership(false); |
| 424 | + } |
388 | 425 | } |
389 | 426 |
|
390 | 427 | private synchronized void setLeadership(boolean newValue) |
|
0 commit comments