Skip to content

Commit 67c8d0e

Browse files
authored
Merge 38de349 into feb3023
2 parents feb3023 + 38de349 commit 67c8d0e

File tree

2 files changed

+104
-0
lines changed

2 files changed

+104
-0
lines changed

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ public void notify(Event event) {
174174
log.info("Worker: {} added, currentNode : {}", path, workerAddress);
175175
} else if (type == Type.REMOVE) {
176176
log.info("Worker node : {} down.", path);
177+
removeSingleWorkerNode(workerAddress);
177178
alertDao.sendServerStoppedAlert(1, path, "WORKER");
178179
listenerEventAlertManager.publishServerDownListenerEvent(path, "WORKER");
179180
} else if (type == Type.UPDATE) {
@@ -193,6 +194,16 @@ private void syncSingleWorkerNodeInfo(String workerAddress, WorkerHeartBeat info
193194
workerNodeInfoWriteLock.unlock();
194195
}
195196
}
197+
198+
private void removeSingleWorkerNode(String workerAddress) {
199+
workerNodeInfoWriteLock.lock();
200+
try {
201+
workerNodeInfo.remove(workerAddress);
202+
log.info("remove worker node {} from workerNodeInfo when worker server down", workerAddress);
203+
} finally {
204+
workerNodeInfoWriteLock.unlock();
205+
}
206+
}
196207
}
197208

198209
class MasterDataListener implements SubscribeListener {
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.server.master.registry;
19+
20+
import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
21+
import org.apache.dolphinscheduler.common.utils.JSONUtils;
22+
import org.apache.dolphinscheduler.dao.AlertDao;
23+
import org.apache.dolphinscheduler.registry.api.Event;
24+
import org.apache.dolphinscheduler.registry.api.RegistryClient;
25+
import org.apache.dolphinscheduler.service.alert.ListenerEventAlertManager;
26+
27+
import java.lang.reflect.InvocationTargetException;
28+
import java.lang.reflect.Method;
29+
import java.util.HashMap;
30+
import java.util.Map;
31+
32+
import org.junit.jupiter.api.Assertions;
33+
import org.junit.jupiter.api.Test;
34+
import org.mockito.InjectMocks;
35+
import org.mockito.Mock;
36+
import org.mockito.Mockito;
37+
import org.mockito.MockitoAnnotations;
38+
39+
public class ServerNodeManagerTest {
40+
41+
@Mock
42+
RegistryClient registryClient;
43+
44+
@Mock
45+
AlertDao alertDao;
46+
47+
@Mock
48+
ListenerEventAlertManager listenerEventAlertManager;
49+
50+
@InjectMocks
51+
ServerNodeManager serverNodeManager;
52+
53+
@Test
54+
public void updateWorkerNodesTest() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
55+
56+
MockitoAnnotations.initMocks(this);
57+
HashMap<String, String> workerNodeMaps = new HashMap<>();
58+
workerNodeMaps.put("worker-node-1", JSONUtils.toJsonString(new WorkerHeartBeat()));
59+
workerNodeMaps.put("worker-node-2", JSONUtils.toJsonString(new WorkerHeartBeat()));
60+
61+
Mockito.when(registryClient.getServerMaps(Mockito.any())).thenReturn(workerNodeMaps);
62+
Mockito.when(registryClient.isWorkerPath(Mockito.anyString())).thenReturn(true);
63+
64+
// two worker server running (worker-node-1, worker-node-2)
65+
Method updateWorkerNodes = serverNodeManager.getClass().getDeclaredMethod("updateWorkerNodes");
66+
updateWorkerNodes.setAccessible(true);
67+
updateWorkerNodes.invoke(serverNodeManager);
68+
69+
Map<String, WorkerHeartBeat> workerNodeInfo = serverNodeManager.getWorkerNodeInfo();
70+
Assertions.assertTrue(workerNodeInfo.containsKey("worker-node-1"));
71+
Assertions.assertTrue(workerNodeInfo.containsKey("worker-node-2"));
72+
73+
// receive remove event when worker-node-1 server stop
74+
ServerNodeManager.WorkerDataListener workerDataListener = serverNodeManager.new WorkerDataListener();
75+
Event event = new Event("", "/nodes/worker/worker-node-1", "", Event.Type.REMOVE);
76+
workerDataListener.notify(event);
77+
78+
// check worker-node-1 not exist in cache
79+
workerNodeInfo = serverNodeManager.getWorkerNodeInfo();
80+
Assertions.assertFalse(workerNodeInfo.containsKey("worker-node-1"));
81+
Assertions.assertTrue(workerNodeInfo.containsKey("worker-node-2"));
82+
83+
// worker-node-1 restart, getServerMaps(RegistryNodeType.WORKER) method return two worker
84+
updateWorkerNodes.invoke(serverNodeManager);
85+
86+
// check cache
87+
workerNodeInfo = serverNodeManager.getWorkerNodeInfo();
88+
Assertions.assertTrue(workerNodeInfo.containsKey("worker-node-1"));
89+
Assertions.assertTrue(workerNodeInfo.containsKey("worker-node-2"));
90+
91+
}
92+
93+
}

0 commit comments

Comments
 (0)