Skip to content

Commit ff4ec29

Browse files
committed
temp checkin
1 parent 4b7b365 commit ff4ec29

6 files changed

Lines changed: 257 additions & 76 deletions

File tree

curator-framework/src/main/java/com/netflix/curator/framework/imps/CreateBuilderImpl.java

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -448,30 +448,9 @@ public void processResult(int rc, String path, Object ctx, String name)
448448
{
449449
trace.commit();
450450

451-
boolean retry = false;
452451
if ( (rc == KeeperException.Code.NONODE.intValue()) && createParentsIfNeeded )
453452
{
454-
try
455-
{
456-
ZKPaths.mkdirs(client.getZooKeeper(), operationAndData.getData().getPath(), false);
457-
retry = true;
458-
}
459-
catch ( Exception e )
460-
{
461-
client.logError("Could not create parents for path: " + operationAndData.getData().getPath(), e);
462-
}
463-
}
464-
465-
if ( retry )
466-
{
467-
try
468-
{
469-
performBackgroundOperation(operationAndData);
470-
}
471-
catch ( Exception e )
472-
{
473-
client.logError("Could not create node after creating parents for path: " + operationAndData.getData().getPath(), e);
474-
}
453+
backgroundCreateParentsThenNode(operationAndData);
475454
}
476455
else
477456
{
@@ -483,6 +462,28 @@ public void processResult(int rc, String path, Object ctx, String name)
483462
);
484463
}
485464

465+
private void backgroundCreateParentsThenNode(final OperationAndData<PathAndBytes> mainOperationAndData)
466+
{
467+
BackgroundOperation<PathAndBytes> operation = new BackgroundOperation<PathAndBytes>()
468+
{
469+
@Override
470+
public void performBackgroundOperation(OperationAndData<PathAndBytes> dummy) throws Exception
471+
{
472+
try
473+
{
474+
ZKPaths.mkdirs(client.getZooKeeper(), mainOperationAndData.getData().getPath(), false);
475+
}
476+
catch ( KeeperException e )
477+
{
478+
// ignore
479+
}
480+
client.queueOperation(mainOperationAndData);
481+
}
482+
};
483+
OperationAndData<PathAndBytes> parentOperation = new OperationAndData<PathAndBytes>(operation, mainOperationAndData.getData(), null, null);
484+
client.queueOperation(parentOperation);
485+
}
486+
486487
private void sendBackgroundResponse(int rc, String path, Object ctx, String name, OperationAndData<PathAndBytes> operationAndData)
487488
{
488489
path = client.unfixForNamespace(path);

curator-framework/src/main/java/com/netflix/curator/framework/imps/CuratorFrameworkImpl.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -428,14 +428,14 @@ <DATA_TYPE> void processBackgroundOperation(OperationAndData<DATA_TYPE> operatio
428428
return;
429429
}
430430

431-
boolean queueOperation = false;
431+
boolean doQueueOperation = false;
432432
do
433433
{
434434
if ( RetryLoop.shouldRetry(event.getResultCode()) )
435435
{
436436
if ( client.getRetryPolicy().allowRetry(operationAndData.getThenIncrementRetryCount(), operationAndData.getElapsedTimeMs(), operationAndData) )
437437
{
438-
queueOperation = true;
438+
doQueueOperation = true;
439439
}
440440
else
441441
{
@@ -471,12 +471,17 @@ <DATA_TYPE> void processBackgroundOperation(OperationAndData<DATA_TYPE> operatio
471471
processEvent(event);
472472
} while ( false );
473473

474-
if ( queueOperation )
474+
if ( doQueueOperation )
475475
{
476-
backgroundOperations.offer(operationAndData);
476+
queueOperation(operationAndData);
477477
}
478478
}
479479

480+
<DATA_TYPE> void queueOperation(OperationAndData<DATA_TYPE> operationAndData)
481+
{
482+
backgroundOperations.offer(operationAndData);
483+
}
484+
480485
void logError(String reason, final Throwable e)
481486
{
482487
if ( (reason == null) || (reason.length() == 0) )

curator-recipes/src/main/java/com/netflix/curator/framework/recipes/leader/LeaderLatch.java

Lines changed: 77 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,16 @@
1818

1919
import com.google.common.base.Preconditions;
2020
import com.netflix.curator.framework.CuratorFramework;
21+
import com.netflix.curator.framework.api.BackgroundCallback;
22+
import com.netflix.curator.framework.api.CuratorEvent;
2123
import com.netflix.curator.framework.recipes.locks.LockInternals;
2224
import com.netflix.curator.framework.recipes.locks.LockInternalsSorter;
2325
import com.netflix.curator.framework.recipes.locks.StandardLockInternalsDriver;
2426
import com.netflix.curator.framework.state.ConnectionState;
2527
import com.netflix.curator.framework.state.ConnectionStateListener;
2628
import com.netflix.curator.utils.ZKPaths;
2729
import org.apache.zookeeper.CreateMode;
30+
import org.apache.zookeeper.KeeperException;
2831
import org.apache.zookeeper.WatchedEvent;
2932
import org.apache.zookeeper.Watcher;
3033
import org.slf4j.Logger;
@@ -53,7 +56,6 @@ public class LeaderLatch implements Closeable
5356
private final String latchPath;
5457
private final String id;
5558
private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
56-
private final AtomicReference<ConnectionState> connectionState = new AtomicReference<ConnectionState>(ConnectionState.CONNECTED);
5759
private final AtomicBoolean hasLeadership = new AtomicBoolean(false);
5860

5961
private final ConnectionStateListener listener = new ConnectionStateListener()
@@ -116,9 +118,7 @@ public void start() throws Exception
116118
Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
117119

118120
client.getConnectionStateListenable().addListener(listener);
119-
120-
client.newNamespaceAwareEnsurePath(latchPath).ensure(client.getZookeeperClient());
121-
internalStart();
121+
reset();
122122
}
123123

124124
/**
@@ -305,86 +305,123 @@ public Participant getLeader() throws Exception
305305
*/
306306
public boolean hasLeadership()
307307
{
308-
return (state.get() == State.STARTED) && hasLeadership.get() && (connectionState.get() == ConnectionState.CONNECTED);
308+
return (state.get() == State.STARTED) && hasLeadership.get();
309309
}
310310

311-
private void internalStart() throws Exception
311+
private void reset() throws Exception
312312
{
313-
hasLeadership.set(false);
313+
setLeadership(false);
314314
if ( ourPath != null )
315315
{
316316
client.delete().guaranteed().inBackground().forPath(ourPath);
317317
}
318-
ourPath = client.create().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(ZKPaths.makePath(latchPath, LOCK_NAME), LeaderSelector.getIdBytes(id));
318+
ourPath = null;
319319

320-
checkForLeadership();
320+
BackgroundCallback callback = new BackgroundCallback()
321+
{
322+
@Override
323+
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
324+
{
325+
if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
326+
{
327+
ourPath = event.getName();
328+
getChildren();
329+
}
330+
else
331+
{
332+
// TBD
333+
System.out.println(event.getResultCode());
334+
}
335+
}
336+
};
337+
client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback).forPath(ZKPaths.makePath(latchPath, LOCK_NAME), LeaderSelector.getIdBytes(id));
321338
}
322-
323-
private void checkForLeadership() throws Exception
339+
340+
private void checkLeadership(List<String> children) throws Exception
324341
{
325-
List<String> sortedChildren = LockInternals.getSortedChildren(client, latchPath, LOCK_NAME, sorter);
326-
if ( sortedChildren.size() == 0 )
342+
List<String> sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children);
343+
int ourIndex = (ourPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(ourPath)) : -1;
344+
if ( ourIndex < 0 )
327345
{
328-
throw new Exception("no children - unexpected state");
346+
log.error("Can't find our node. Resetting. Index: " + ourIndex);
347+
reset();
329348
}
330-
331-
int ourIndex = sortedChildren.indexOf(ZKPaths.getNodeFromPath(ourPath));
332-
if ( ourIndex == 0 )
349+
else if ( ourIndex == 0 )
333350
{
334351
setLeadership(true);
335352
}
336353
else
337354
{
338-
final String ourPathWhenWatched = ourPath; // protected against a lost/suspended connection and an old watcher - I'm not sure if this is possible but it can't hurt
339355
String watchPath = sortedChildren.get(ourIndex - 1);
340356
Watcher watcher = new Watcher()
341357
{
342358
@Override
343359
public void process(WatchedEvent event)
344360
{
345-
if ( (event.getType() == Event.EventType.NodeDeleted) && (ourPath != null) && ourPath.equals(ourPathWhenWatched) )
361+
if ( state.get() == State.STARTED )
346362
{
347-
try
348-
{
349-
checkForLeadership();
350-
}
351-
catch(Exception ex)
352-
{
353-
log.error("An error ocurred checking the leadership.", ex);
354-
}
363+
try
364+
{
365+
getChildren();
366+
}
367+
catch(Exception ex)
368+
{
369+
log.error("An error ocurred checking the leadership.", ex);
370+
}
355371
}
356372
}
357373
};
358-
if ( client.checkExists().usingWatcher(watcher).forPath(ZKPaths.makePath(latchPath, watchPath)) == null )
374+
375+
BackgroundCallback callback = new BackgroundCallback()
359376
{
360-
//the previous Participant may be down, so we need to reevaluate the list
361-
//to get the actual previous Participant or get the leadership
362-
checkForLeadership();
363-
}
377+
@Override
378+
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
379+
{
380+
if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() )
381+
{
382+
// previous node is gone - reset
383+
reset();
384+
}
385+
}
386+
};
387+
client.checkExists().usingWatcher(watcher).inBackground(callback).forPath(ZKPaths.makePath(latchPath, watchPath));
364388
}
365389
}
366390

367-
private void handleStateChange(ConnectionState newState)
391+
private void getChildren() throws Exception
368392
{
369-
if ( newState == ConnectionState.RECONNECTED )
393+
BackgroundCallback callback = new BackgroundCallback()
370394
{
371-
newState = ConnectionState.CONNECTED;
372-
}
395+
@Override
396+
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
397+
{
398+
if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
399+
{
400+
checkLeadership(event.getChildren());
401+
}
402+
}
403+
};
404+
client.getChildren().inBackground(callback).forPath(latchPath);
405+
}
373406

374-
ConnectionState previousState = connectionState.getAndSet(newState);
375-
if ( (previousState == ConnectionState.LOST) && (newState == ConnectionState.CONNECTED) )
407+
private void handleStateChange(ConnectionState newState)
408+
{
409+
if ( newState == ConnectionState.RECONNECTED )
376410
{
377411
try
378412
{
379-
internalStart();
413+
reset();
380414
}
381415
catch ( Exception e )
382416
{
383417
log.error("Could not restart leader latch", e);
384-
connectionState.set(ConnectionState.LOST);
385418
setLeadership(false);
386419
}
387420
}
421+
else
422+
{
423+
setLeadership(false);
424+
}
388425
}
389426

390427
private synchronized void setLeadership(boolean newValue)

curator-recipes/src/main/java/com/netflix/curator/framework/recipes/locks/LockInternals.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.netflix.curator.RetryLoop;
2424
import com.netflix.curator.framework.CuratorFramework;
2525
import com.netflix.curator.framework.api.CuratorWatcher;
26+
import com.netflix.curator.framework.imps.CuratorFrameworkState;
2627
import com.netflix.curator.utils.ZKPaths;
2728
import org.apache.zookeeper.CreateMode;
2829
import org.apache.zookeeper.KeeperException;
@@ -163,6 +164,24 @@ public int compare(String lhs, String rhs)
163164
return sortedList;
164165
}
165166

167+
public static List<String> getSortedChildren(final String lockName, final LockInternalsSorter sorter, List<String> children)
168+
{
169+
List<String> sortedList = Lists.newArrayList(children);
170+
Collections.sort
171+
(
172+
sortedList,
173+
new Comparator<String>()
174+
{
175+
@Override
176+
public int compare(String lhs, String rhs)
177+
{
178+
return sorter.fixForSorting(lhs, lockName).compareTo(sorter.fixForSorting(rhs, lockName));
179+
}
180+
}
181+
);
182+
return sortedList;
183+
}
184+
166185
List<String> getSortedChildren() throws Exception
167186
{
168187
return getSortedChildren(client, basePath, lockName, driver);
@@ -258,7 +277,7 @@ private boolean internalLockLoop(long startMillis, Long millisToWait, String our
258277
client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
259278
}
260279

261-
while ( client.isStarted() && !haveTheLock )
280+
while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
262281
{
263282
List<String> children = getSortedChildren();
264283
String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash

curator-recipes/src/test/java/com/netflix/curator/framework/recipes/leader/TestLeaderLatch.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,7 @@ public void stateChanged(CuratorFramework client, ConnectionState newState)
8888
Assert.assertEquals(getLeaders(latches).size(), 0);
8989

9090
server = new TestingServer(server.getPort(), server.getTempDirectory());
91-
waitForALeader(latches, timing); // should reconnect
92-
Assert.assertEquals(getLeaders(latches).size(), 1);
91+
Assert.assertEquals(waitForALeader(latches, timing).size(), 1); // should reconnect
9392
}
9493
finally
9594
{
@@ -220,7 +219,7 @@ private enum Mode
220219

221220
private void basic(Mode mode) throws Exception
222221
{
223-
final int PARTICIPANT_QTY = 10;
222+
final int PARTICIPANT_QTY = 1;//0;
224223

225224
List<LeaderLatch> latches = Lists.newArrayList();
226225

@@ -263,9 +262,7 @@ public Object call() throws Exception
263262

264263
while ( latches.size() > 0 )
265264
{
266-
waitForALeader(latches, timing);
267-
268-
List<LeaderLatch> leaders = getLeaders(latches);
265+
List<LeaderLatch> leaders = waitForALeader(latches, timing);
269266
Assert.assertEquals(leaders.size(), 1); // there can only be one leader
270267
LeaderLatch theLeader = leaders.get(0);
271268
if ( mode == Mode.START_IMMEDIATELY )
@@ -286,16 +283,18 @@ public Object call() throws Exception
286283
}
287284
}
288285

289-
private void waitForALeader(List<LeaderLatch> latches, Timing timing) throws InterruptedException
286+
private List<LeaderLatch> waitForALeader(List<LeaderLatch> latches, Timing timing) throws InterruptedException
290287
{
291288
for ( int i = 0; i < MAX_LOOPS; ++i )
292289
{
293-
if ( getLeaders(latches).size() != 0 )
290+
List<LeaderLatch> leaders = getLeaders(latches);
291+
if ( leaders.size() != 0 )
294292
{
295-
break;
293+
return leaders;
296294
}
297295
timing.sleepABit();
298296
}
297+
return Lists.newArrayList();
299298
}
300299

301300
private List<LeaderLatch> getLeaders(Collection<LeaderLatch> latches)

0 commit comments

Comments
 (0)