Skip to content

Commit 6da7725

Browse files
author
Evaristo Camarero
committed
Fix corner case in PersistentTtlNode
- Creating touch node as fast as container node is created
1 parent 190cd65 commit 6da7725

2 files changed

Lines changed: 396 additions & 0 deletions

File tree

Lines changed: 277 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,277 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.curator.framework.recipes.nodes;
21+
22+
import java.io.Closeable;
23+
import java.io.IOException;
24+
import java.util.Objects;
25+
import java.util.concurrent.Executors;
26+
import java.util.concurrent.Future;
27+
import java.util.concurrent.ScheduledExecutorService;
28+
import java.util.concurrent.TimeUnit;
29+
import java.util.concurrent.atomic.AtomicReference;
30+
import org.apache.curator.framework.CuratorFramework;
31+
32+
import org.apache.curator.framework.recipes.watch.PersistentWatcher;
33+
import org.apache.curator.utils.ThreadUtils;
34+
import org.apache.curator.utils.ZKPaths;
35+
import org.apache.zookeeper.CreateMode;
36+
import org.apache.zookeeper.KeeperException;
37+
import org.apache.zookeeper.WatchedEvent;
38+
import org.apache.zookeeper.Watcher.Event.EventType;
39+
import org.slf4j.Logger;
40+
import org.slf4j.LoggerFactory;
41+
42+
/**
43+
* Manages a {@link PersistentNode} that uses {@link CreateMode#CONTAINER}. Asynchronously it
44+
* creates or updates a child on the persistent node that is marked with a provided TTL.
45+
*
46+
* <p>The effect of this is to have a node that can be watched, etc. The child node serves as a
47+
* method of having the parent node deleted if the TTL expires. i.e. if the process that is running
48+
* the PersistentTtlNode crashes and the TTL elapses, first the child node will be deleted due to
49+
* the TTL expiration and then the parent node will be deleted as it's a container node with no
50+
* children.
51+
*
52+
* <p>PersistentTtlNode is useful when you need to create a TTL node but don't want to keep it alive
53+
* manually by periodically setting data - PersistentTtlNode does that for you. Further the
54+
* keep-alive is done in a way that does not generate watch triggers on the parent node.
55+
*/
56+
public class PersistentTtlNodeWithWatcher implements Closeable {
57+
public static final String DEFAULT_CHILD_NODE_NAME = "touch";
58+
public static final int DEFAULT_TOUCH_SCHEDULE_FACTOR = 2;
59+
public static final boolean DEFAULT_USE_PARENT_CREATION = true;
60+
61+
private final Logger log = LoggerFactory.getLogger(getClass());
62+
private final PersistentNode node;
63+
private final CuratorFramework client;
64+
private final long ttlMs;
65+
private final int touchScheduleFactor;
66+
private final ScheduledExecutorService executorService;
67+
private final AtomicReference<Future<?>> futureRef = new AtomicReference<>();
68+
private final String childPath;
69+
private final PersistentWatcher nodeWatcher;
70+
71+
/**
72+
* @param client the client
73+
* @param path path for the parent ZNode
74+
* @param ttlMs max ttl for the node in milliseconds
75+
* @param initData data for the node
76+
*/
77+
public PersistentTtlNodeWithWatcher(
78+
CuratorFramework client, String path, long ttlMs, byte[] initData) {
79+
this(
80+
client,
81+
Executors.newSingleThreadScheduledExecutor(
82+
ThreadUtils.newThreadFactory("PersistentTtlNode")),
83+
path,
84+
ttlMs,
85+
initData,
86+
DEFAULT_CHILD_NODE_NAME,
87+
DEFAULT_TOUCH_SCHEDULE_FACTOR,
88+
DEFAULT_USE_PARENT_CREATION);
89+
}
90+
91+
/**
92+
* @param client the client
93+
* @param path path for the parent ZNode
94+
* @param ttlMs max ttl for the node in milliseconds
95+
* @param initData data for the node
96+
* @param useParentCreation if true, parent ZNode can be created without ancestors
97+
*/
98+
public PersistentTtlNodeWithWatcher(
99+
CuratorFramework client,
100+
String path,
101+
long ttlMs,
102+
byte[] initData,
103+
boolean useParentCreation) {
104+
this(
105+
client,
106+
Executors.newSingleThreadScheduledExecutor(
107+
ThreadUtils.newThreadFactory("PersistentTtlNode")),
108+
path,
109+
ttlMs,
110+
initData,
111+
DEFAULT_CHILD_NODE_NAME,
112+
DEFAULT_TOUCH_SCHEDULE_FACTOR,
113+
useParentCreation);
114+
}
115+
116+
/**
117+
* @param client the client
118+
* @param executorService ExecutorService to use for background thread. This service should be
119+
* single threaded, otherwise you may see inconsistent results.
120+
* @param path path for the parent ZNode
121+
* @param ttlMs max ttl for the node in milliseconds
122+
* @param initData data for the node
123+
* @param childNodeName name to use for the child node of the node created at <code>path</code>
124+
* @param touchScheduleFactor how ofter to set/create the child node as a factor of the ttlMs.
125+
* i.e. the child is touched every <code>(ttlMs / touchScheduleFactor)</code>
126+
*/
127+
public PersistentTtlNodeWithWatcher(
128+
CuratorFramework client,
129+
ScheduledExecutorService executorService,
130+
String path,
131+
long ttlMs,
132+
byte[] initData,
133+
String childNodeName,
134+
int touchScheduleFactor) {
135+
this(
136+
client,
137+
executorService,
138+
path,
139+
ttlMs,
140+
initData,
141+
childNodeName,
142+
touchScheduleFactor,
143+
DEFAULT_USE_PARENT_CREATION);
144+
}
145+
146+
/**
147+
* @param client the client
148+
* @param executorService ExecutorService to use for background thread. This service should be
149+
* single threaded, otherwise you may see inconsistent results.
150+
* @param path path for the parent ZNode
151+
* @param ttlMs max ttl for the node in milliseconds
152+
* @param initData data for the node
153+
* @param childNodeName name to use for the child node of the node created at <code>path</code>
154+
* @param touchScheduleFactor how ofter to set/create the child node as a factor of the ttlMs.
155+
* i.e. the child is touched every <code>(ttlMs / touchScheduleFactor)</code>
156+
* @param useParentCreation if true, parent ZNode can be created without ancestors
157+
*/
158+
public PersistentTtlNodeWithWatcher(
159+
CuratorFramework client,
160+
ScheduledExecutorService executorService,
161+
String path,
162+
long ttlMs,
163+
byte[] initData,
164+
String childNodeName,
165+
int touchScheduleFactor,
166+
boolean useParentCreation) {
167+
this.client = Objects.requireNonNull(client, "client cannot be null");
168+
this.ttlMs = ttlMs;
169+
this.touchScheduleFactor = touchScheduleFactor;
170+
node =
171+
new PersistentNode(client, CreateMode.CONTAINER, false, path, initData, useParentCreation) {
172+
@Override
173+
protected void deleteNode() {
174+
// NOP
175+
}
176+
};
177+
this.executorService =
178+
Objects.requireNonNull(executorService, "executorService cannot be null");
179+
childPath =
180+
ZKPaths.makePath(Objects.requireNonNull(path, "path cannot be null"), childNodeName);
181+
nodeWatcher = new PersistentWatcher(client, path, false);
182+
}
183+
184+
private void touch() {
185+
try {
186+
try {
187+
client.setData().forPath(childPath);
188+
} catch (KeeperException.NoNodeException e) {
189+
client
190+
.create()
191+
.orSetData()
192+
.withTtl(ttlMs)
193+
.withMode(CreateMode.PERSISTENT_WITH_TTL)
194+
.forPath(childPath);
195+
}
196+
} catch (KeeperException.NoNodeException ignore) {
197+
// ignore
198+
} catch (Exception e) {
199+
if (!ThreadUtils.checkInterrupted(e)) {
200+
log.debug("Could not touch child node", e);
201+
}
202+
}
203+
}
204+
205+
private void touchIfContainerCreated(WatchedEvent event) {
206+
if (event.getType().equals(EventType.NodeCreated)) {
207+
touch();
208+
}
209+
}
210+
211+
/** You must call start() to initiate the persistent ttl node */
212+
public void start() {
213+
nodeWatcher.getListenable().addListener(this::touchIfContainerCreated, executorService);
214+
nodeWatcher.start();
215+
node.start();
216+
Future<?> future =
217+
executorService.scheduleAtFixedRate(
218+
this::touch,
219+
ttlMs / touchScheduleFactor,
220+
ttlMs / touchScheduleFactor,
221+
TimeUnit.MILLISECONDS);
222+
futureRef.set(future);
223+
}
224+
225+
/**
226+
* Block until the either initial node creation initiated by {@link #start()} succeeds or the
227+
* timeout elapses.
228+
*
229+
* @param timeout the maximum time to wait
230+
* @param unit time unit
231+
* @return if the node was created before timeout
232+
* @throws InterruptedException if the thread is interrupted
233+
*/
234+
public boolean waitForInitialCreate(long timeout, TimeUnit unit) throws InterruptedException {
235+
return node.waitForInitialCreate(timeout, unit);
236+
}
237+
238+
/**
239+
* Set data that node should set in ZK also writes the data to the node. NOTE: it is an error to
240+
* call this method after {@link #start()} but before the initial create has completed. Use {@link
241+
* #waitForInitialCreate(long, TimeUnit)} to ensure initial creation.
242+
*
243+
* @param data new data value
244+
* @throws Exception errors
245+
*/
246+
public void setData(byte[] data) throws Exception {
247+
node.setData(data);
248+
}
249+
250+
/**
251+
* Return the current value of our data
252+
*
253+
* @return our data
254+
*/
255+
public byte[] getData() {
256+
return node.getData();
257+
}
258+
259+
/**
260+
* Call when you are done with the PersistentTtlNode. Note: the ZNode is <em>not</em> immediately
261+
* deleted. However, if no other PersistentTtlNode with the same path is running the node will get
262+
* deleted based on the ttl.
263+
*/
264+
@Override
265+
public void close() {
266+
Future<?> future = futureRef.getAndSet(null);
267+
if (future != null) {
268+
future.cancel(true);
269+
}
270+
nodeWatcher.close();
271+
try {
272+
node.close();
273+
} catch (IOException e) {
274+
throw new RuntimeException(e);
275+
}
276+
}
277+
}
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.curator.framework.recipes.nodes;
21+
22+
import static org.junit.jupiter.api.Assertions.assertNull;
23+
import static org.junit.jupiter.api.Assertions.assertTrue;
24+
25+
import java.util.concurrent.TimeUnit;
26+
27+
import org.apache.curator.framework.CuratorFramework;
28+
import org.apache.curator.framework.CuratorFrameworkFactory;
29+
import org.apache.curator.retry.RetryOneTime;
30+
import org.apache.curator.test.Timing;
31+
import org.apache.curator.test.compatibility.CuratorTestBase;
32+
import org.junit.jupiter.api.AfterEach;
33+
import org.junit.jupiter.api.BeforeAll;
34+
import org.junit.jupiter.api.BeforeEach;
35+
import org.junit.jupiter.api.Test;
36+
37+
public class TestPersistentTtlNodeWithStartStop extends CuratorTestBase {
38+
private final Timing timing = new Timing();
39+
40+
@BeforeAll
41+
public static void setUpClass() {
42+
System.setProperty("zookeeper.extendedTypesEnabled", "true");
43+
}
44+
45+
@BeforeEach
46+
@Override
47+
public void setup() throws Exception {
48+
System.setProperty("znode.container.checkIntervalMs", "1");
49+
super.setup();
50+
}
51+
52+
@AfterEach
53+
@Override
54+
public void teardown() throws Exception {
55+
System.clearProperty("znode.container.checkIntervalMs");
56+
super.teardown();
57+
}
58+
59+
@Test
60+
public void testWithPersistentTtlNode() throws Exception {
61+
try (CuratorFramework client =
62+
CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1))) {
63+
client.start();
64+
final long ttlMs = 1_000L;
65+
try (PersistentTtlNode node = new PersistentTtlNode(client, "/test", ttlMs, new byte[0])) {
66+
node.start();
67+
assertTrue(node.waitForInitialCreate(timing.session(), TimeUnit.MILLISECONDS));
68+
// Give some minor time for touch node to be created. Will worked after patch
69+
for (int i = 1; i <= 5; i++) {
70+
if (client.checkExists().forPath("/test") != null) {
71+
break;
72+
}
73+
Thread.sleep(10L);
74+
}
75+
}
76+
}
77+
try (CuratorFramework client1 =
78+
CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1))) {
79+
client1.start();
80+
assertTrue(client1.blockUntilConnected(2, TimeUnit.SECONDS));
81+
Thread.sleep(3_000L);
82+
assertNull(client1.checkExists().forPath("/test/touch"));
83+
assertNull(
84+
client1.checkExists().forPath("/test"),
85+
"Persistent TTL node NOT removed. The reason is that '/test/touch' was NOT create on time to make PerssistentTTLNode recipe to work");
86+
}
87+
}
88+
89+
@Test
90+
public void testWithPersistentTtlNodeWithWatcher() throws Exception {
91+
try (CuratorFramework client =
92+
CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1))) {
93+
client.start();
94+
final long ttlMs = 1_000L;
95+
try (PersistentTtlNodeWithWatcher node =
96+
new PersistentTtlNodeWithWatcher(client, "/test", ttlMs, new byte[0])) {
97+
node.start();
98+
assertTrue(node.waitForInitialCreate(timing.session(), TimeUnit.MILLISECONDS));
99+
// Give some minor time for touch node to be created. Will worked after patch
100+
for (int i = 1; i <= 5; i++) {
101+
if (client.checkExists().forPath("/test") == null) {
102+
break;
103+
}
104+
Thread.sleep(10L);
105+
}
106+
}
107+
}
108+
try (CuratorFramework client1 =
109+
CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1))) {
110+
client1.start();
111+
assertTrue(client1.blockUntilConnected(2, TimeUnit.SECONDS));
112+
Thread.sleep(3_000L);
113+
assertNull(client1.checkExists().forPath("/test/touch"));
114+
assertNull(
115+
client1.checkExists().forPath("/test"),
116+
"Persistent TTL node NOT removed. The reason is that '/test/touch' was NOT create on time to make PerssistentTTLNode recipe to work");
117+
}
118+
}
119+
}

0 commit comments

Comments
 (0)