Skip to content

Commit 68bb197

Browse files
eye-guzhuangchong
authored andcommitted
[Fix-14008][registry] Fix etcd memory leak due to leaseId (#14034)
1 parent 64b9200 commit 68bb197

File tree

3 files changed

+156
-3
lines changed

3 files changed

+156
-3
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.plugin.registry.etcd;
19+
20+
import org.apache.dolphinscheduler.registry.api.RegistryException;
21+
22+
import java.util.Map;
23+
import java.util.Optional;
24+
import java.util.concurrent.ConcurrentHashMap;
25+
import java.util.concurrent.ExecutionException;
26+
27+
import lombok.extern.slf4j.Slf4j;
28+
import io.etcd.jetcd.Client;
29+
import io.etcd.jetcd.lease.LeaseKeepAliveResponse;
30+
import io.grpc.stub.StreamObserver;
31+
32+
@Slf4j
33+
public class EtcdKeepAliveLeaseManager {
34+
35+
private final Map<String, Long> keyLeaseCache = new ConcurrentHashMap<>();
36+
37+
private final Client client;
38+
39+
EtcdKeepAliveLeaseManager(Client client) {
40+
this.client = client;
41+
}
42+
43+
long getOrCreateKeepAliveLease(String key, long timeToLive) {
44+
return keyLeaseCache.computeIfAbsent(key, $ -> {
45+
try {
46+
long leaseId = client.getLeaseClient().grant(timeToLive).get().getID();
47+
client.getLeaseClient().keepAlive(leaseId, new StreamObserver<LeaseKeepAliveResponse>() {
48+
49+
@Override
50+
public void onNext(LeaseKeepAliveResponse value) {
51+
}
52+
53+
@Override
54+
public void onError(Throwable t) {
55+
log.error("Lease {} keep alive error, remove cache with key:{}", leaseId, key, t);
56+
keyLeaseCache.remove(key);
57+
}
58+
59+
@Override
60+
public void onCompleted() {
61+
log.error("Lease {} keep alive complete, remove cache with key:{}", leaseId, key);
62+
keyLeaseCache.remove(key);
63+
}
64+
});
65+
log.info("Lease {} keep alive create with key:{}", leaseId, key);
66+
return leaseId;
67+
} catch (InterruptedException e) {
68+
Thread.currentThread().interrupt();
69+
throw new RegistryException("Failed to create lease key: " + key, e);
70+
} catch (ExecutionException e) {
71+
throw new RegistryException("Failed to create lease key: " + key, e);
72+
}
73+
});
74+
}
75+
76+
Optional<Long> getKeepAliveLease(String key) {
77+
return Optional.ofNullable(keyLeaseCache.get(key));
78+
}
79+
}

dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistry.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,9 @@ public class EtcdRegistry implements Registry {
7171
private static Logger LOGGER = LoggerFactory.getLogger(EtcdRegistry.class);
7272
private final Client client;
7373
private EtcdConnectionStateListener etcdConnectionStateListener;
74+
75+
private EtcdKeepAliveLeaseManager etcdKeepAliveLeaseManager;
76+
7477
public static final String FOLDER_SEPARATOR = "/";
7578
// save the lock info for thread
7679
// key:lockKey Value:leaseId
@@ -101,6 +104,7 @@ public EtcdRegistry(EtcdRegistryProperties registryProperties) {
101104
client = clientBuilder.build();
102105
LOGGER.info("Started Etcd Registry...");
103106
etcdConnectionStateListener = new EtcdConnectionStateListener(client);
107+
etcdKeepAliveLeaseManager = new EtcdKeepAliveLeaseManager(client);
104108
}
105109

106110
/**
@@ -186,9 +190,7 @@ public void put(String key, String value, boolean deleteOnDisconnect) {
186190
try {
187191
if (deleteOnDisconnect) {
188192
// keep the key by lease, if disconnected, the lease will expire and the key will delete
189-
long leaseId = client.getLeaseClient().grant(TIME_TO_LIVE_SECONDS).get().getID();
190-
client.getLeaseClient().keepAlive(leaseId, Observers.observer(response -> {
191-
}));
193+
long leaseId = etcdKeepAliveLeaseManager.getOrCreateKeepAliveLease(key, TIME_TO_LIVE_SECONDS);
192194
PutOption putOption = PutOption.newBuilder().withLeaseId(leaseId).build();
193195
client.getKVClient().put(byteSequence(key), byteSequence(value),putOption).get();
194196
} else {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.plugin.registry.etcd;
19+
20+
import java.io.IOException;
21+
import java.util.Optional;
22+
23+
import org.junit.jupiter.api.AfterAll;
24+
import org.junit.jupiter.api.Assertions;
25+
import org.junit.jupiter.api.BeforeAll;
26+
import org.junit.jupiter.api.Test;
27+
28+
import io.etcd.jetcd.Client;
29+
import io.etcd.jetcd.launcher.EtcdCluster;
30+
import io.etcd.jetcd.test.EtcdClusterExtension;
31+
32+
class EtcdKeepAliveLeaseManagerTest {
33+
34+
static EtcdClusterExtension server;
35+
36+
static Client client;
37+
38+
static EtcdKeepAliveLeaseManager etcdKeepAliveLeaseManager;
39+
@BeforeAll
40+
public static void before() throws Exception {
41+
server = EtcdClusterExtension.builder()
42+
.withNodes(1)
43+
.withImage("ibmcom/etcd:3.2.24")
44+
.build();
45+
server.restart();
46+
47+
client = Client.builder().endpoints(server.clientEndpoints()).build();
48+
49+
etcdKeepAliveLeaseManager = new EtcdKeepAliveLeaseManager(client);
50+
}
51+
52+
@Test
53+
void getOrCreateKeepAliveLeaseTest() throws Exception {
54+
long first = etcdKeepAliveLeaseManager.getOrCreateKeepAliveLease("/test", 3);
55+
long second = etcdKeepAliveLeaseManager.getOrCreateKeepAliveLease("/test", 3);
56+
Assertions.assertEquals(first, second);
57+
58+
client.getLeaseClient().revoke(first).get();
59+
60+
// wait for lease expire
61+
Thread.sleep(3000);
62+
Optional<Long> keepAliveLease = etcdKeepAliveLeaseManager.getKeepAliveLease("/test");
63+
Assertions.assertFalse(keepAliveLease.isPresent());
64+
}
65+
66+
@AfterAll
67+
public static void after() throws IOException {
68+
try (EtcdCluster closeServer = server.cluster()) {
69+
client.close();
70+
}
71+
}
72+
}

0 commit comments

Comments
 (0)