Skip to content

Commit 69676b4

Browse files
pegasasfuchanghaiEricGao888rickchengx
authored
[Improvement][UT] Improve Worker registry coverage (#15380)
Co-authored-by: fuchanghai <[email protected]> Co-authored-by: Eric Gao <[email protected]> Co-authored-by: Rick Cheng <[email protected]>
1 parent 43a0652 commit 69676b4

File tree

5 files changed

+284
-17
lines changed

5 files changed

+284
-17
lines changed

dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerStopStrategy.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import org.apache.dolphinscheduler.registry.api.RegistryClient;
2121
import org.apache.dolphinscheduler.registry.api.StrategyType;
22-
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
2322

2423
import lombok.extern.slf4j.Slf4j;
2524

@@ -34,8 +33,6 @@ public class WorkerStopStrategy implements WorkerConnectStrategy {
3433

3534
@Autowired
3635
public RegistryClient registryClient;
37-
@Autowired
38-
private WorkerConfig workerConfig;
3936

4037
@Override
4138
public void disconnect() {

dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerWaitingStrategy.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
import java.time.Duration;
3131

32+
import lombok.NonNull;
3233
import lombok.extern.slf4j.Slf4j;
3334

3435
import org.springframework.beans.factory.annotation.Autowired;
@@ -52,6 +53,16 @@ public class WorkerWaitingStrategy implements WorkerConnectStrategy {
5253
@Autowired
5354
private WorkerTaskExecutorThreadPool workerManagerThread;
5455

56+
public WorkerWaitingStrategy(@NonNull WorkerConfig workerConfig,
57+
@NonNull RegistryClient registryClient,
58+
@NonNull MessageRetryRunner messageRetryRunner,
59+
@NonNull WorkerTaskExecutorThreadPool workerManagerThread) {
60+
this.workerConfig = workerConfig;
61+
this.registryClient = registryClient;
62+
this.messageRetryRunner = messageRetryRunner;
63+
this.workerManagerThread = workerManagerThread;
64+
}
65+
5566
@Override
5667
public void disconnect() {
5768
try {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.server.worker.registry;
19+
20+
import static org.mockito.Mockito.times;
21+
22+
import org.apache.dolphinscheduler.registry.api.ConnectionState;
23+
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
24+
25+
import org.junit.jupiter.api.Test;
26+
import org.junit.jupiter.api.extension.ExtendWith;
27+
import org.mockito.InjectMocks;
28+
import org.mockito.Mock;
29+
import org.mockito.Mockito;
30+
import org.mockito.junit.jupiter.MockitoExtension;
31+
import org.slf4j.Logger;
32+
import org.slf4j.LoggerFactory;
33+
34+
/**
35+
* worker registry test
36+
*/
37+
@ExtendWith(MockitoExtension.class)
38+
public class WorkerConnectionStateListenerTest {
39+
40+
private static final Logger log = LoggerFactory.getLogger(WorkerConnectionStateListenerTest.class);
41+
@InjectMocks
42+
private WorkerConnectionStateListener workerConnectionStateListener;
43+
@Mock
44+
private WorkerConfig workerConfig;
45+
@Mock
46+
private WorkerConnectStrategy workerConnectStrategy;
47+
48+
@Test
49+
public void testWorkerConnectionStateListener() {
50+
workerConnectionStateListener.onUpdate(ConnectionState.CONNECTED);
51+
52+
workerConnectionStateListener.onUpdate(ConnectionState.RECONNECTED);
53+
Mockito.verify(workerConnectStrategy, times(1)).reconnect();
54+
55+
workerConnectionStateListener.onUpdate(ConnectionState.SUSPENDED);
56+
57+
workerConnectionStateListener.onUpdate(ConnectionState.DISCONNECTED);
58+
Mockito.verify(workerConnectStrategy, times(1)).disconnect();
59+
}
60+
}

dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
import static org.mockito.BDDMockito.given;
2121

22+
import org.apache.dolphinscheduler.common.IStoppable;
23+
import org.apache.dolphinscheduler.common.model.Server;
2224
import org.apache.dolphinscheduler.common.utils.NetUtils;
2325
import org.apache.dolphinscheduler.meter.metrics.MetricsProvider;
2426
import org.apache.dolphinscheduler.meter.metrics.SystemMetrics;
@@ -29,6 +31,9 @@
2931
import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorThreadPool;
3032

3133
import java.time.Duration;
34+
import java.util.ArrayList;
35+
import java.util.Arrays;
36+
import java.util.Optional;
3237

3338
import org.junit.jupiter.api.Assertions;
3439
import org.junit.jupiter.api.Test;
@@ -37,33 +42,33 @@
3742
import org.mockito.Mock;
3843
import org.mockito.Mockito;
3944
import org.mockito.junit.jupiter.MockitoExtension;
45+
import org.slf4j.Logger;
46+
import org.slf4j.LoggerFactory;
4047

4148
/**
4249
* worker registry test
4350
*/
4451
@ExtendWith(MockitoExtension.class)
4552
public class WorkerRegistryClientTest {
4653

54+
private static final Logger log = LoggerFactory.getLogger(WorkerRegistryClientTest.class);
4755
@InjectMocks
4856
private WorkerRegistryClient workerRegistryClient;
49-
5057
@Mock
5158
private RegistryClient registryClient;
52-
5359
@Mock
5460
private WorkerConfig workerConfig;
55-
5661
@Mock
5762
private MetricsProvider metricsProvider;
58-
5963
@Mock
6064
private WorkerTaskExecutorThreadPool workerManagerThread;
61-
6265
@Mock
6366
private WorkerConnectStrategy workerConnectStrategy;
67+
@Mock
68+
private IStoppable stoppable;
6469

6570
@Test
66-
public void testStart() {
71+
public void testWorkerRegistryClientbasic() {
6772

6873
given(workerConfig.getWorkerAddress()).willReturn(NetUtils.getAddr(1234));
6974
given(workerConfig.getMaxHeartbeatInterval()).willReturn(Duration.ofSeconds(1));
@@ -75,16 +80,23 @@ public void testStart() {
7580
workerRegistryClient.initWorkRegistry();
7681
workerRegistryClient.start();
7782

78-
Assertions.assertTrue(true);
83+
workerRegistryClient.setRegistryStoppable(stoppable);
7984
}
8085

8186
@Test
82-
public void testUnRegistry() {
83-
84-
}
85-
86-
@Test
87-
public void testGetWorkerZkPaths() {
88-
87+
public void testWorkerRegistryClientgetAlertServerAddress() {
88+
given(registryClient.getServerList(Mockito.any(RegistryNodeType.class)))
89+
.willReturn(new ArrayList<Server>());
90+
Assertions.assertEquals(workerRegistryClient.getAlertServerAddress(), Optional.empty());
91+
Mockito.reset(registryClient);
92+
String host = "test";
93+
Integer port = 1;
94+
Server server = new Server();
95+
server.setHost(host);
96+
server.setPort(port);
97+
given(registryClient.getServerList(Mockito.any(RegistryNodeType.class)))
98+
.willReturn(new ArrayList<Server>(Arrays.asList(server)));
99+
Assertions.assertEquals(workerRegistryClient.getAlertServerAddress().get().getAddress(),
100+
String.format("%s:%d", host, port));
89101
}
90102
}
Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.server.worker.registry;
19+
20+
import static org.mockito.ArgumentMatchers.any;
21+
import static org.mockito.ArgumentMatchers.anyString;
22+
import static org.mockito.BDDMockito.given;
23+
import static org.mockito.Mockito.doNothing;
24+
25+
import org.apache.dolphinscheduler.common.IStoppable;
26+
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleException;
27+
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
28+
import org.apache.dolphinscheduler.registry.api.ConnectStrategyProperties;
29+
import org.apache.dolphinscheduler.registry.api.RegistryClient;
30+
import org.apache.dolphinscheduler.registry.api.RegistryException;
31+
import org.apache.dolphinscheduler.registry.api.StrategyType;
32+
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
33+
import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
34+
import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcServer;
35+
import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorThreadPool;
36+
37+
import java.time.Duration;
38+
39+
import org.junit.jupiter.api.Assertions;
40+
import org.junit.jupiter.api.Test;
41+
import org.junit.jupiter.api.extension.ExtendWith;
42+
import org.mockito.Mock;
43+
import org.mockito.MockedStatic;
44+
import org.mockito.Mockito;
45+
import org.mockito.junit.jupiter.MockitoExtension;
46+
import org.slf4j.Logger;
47+
import org.slf4j.LoggerFactory;
48+
49+
/**
50+
* worker registry test
51+
*/
52+
@ExtendWith(MockitoExtension.class)
53+
public class WorkerStrategyTest {
54+
55+
private static final Logger log = LoggerFactory.getLogger(WorkerStrategyTest.class);
56+
@Mock
57+
private RegistryClient registryClient;
58+
@Mock
59+
private IStoppable stoppable;
60+
@Mock
61+
private WorkerConfig workerConfig;
62+
@Mock
63+
private WorkerRpcServer workerRpcServer;
64+
@Mock
65+
private MessageRetryRunner messageRetryRunner;
66+
@Mock
67+
private WorkerTaskExecutorThreadPool workerManagerThread;
68+
@Mock
69+
private ConnectStrategyProperties connectStrategyProperties;
70+
71+
@Test
72+
public void testWorkerStopStrategy() {
73+
given(registryClient.getStoppable())
74+
.willReturn(stoppable);
75+
WorkerStopStrategy workerStopStrategy = new WorkerStopStrategy();
76+
workerStopStrategy.registryClient = registryClient;
77+
workerStopStrategy.reconnect();
78+
workerStopStrategy.disconnect();
79+
Assertions.assertEquals(workerStopStrategy.getStrategyType(), StrategyType.STOP);
80+
}
81+
82+
@Test
83+
public void testWorkerWaitingStrategyreconnect() {
84+
WorkerWaitingStrategy workerWaitingStrategy = new WorkerWaitingStrategy(
85+
workerConfig,
86+
registryClient,
87+
messageRetryRunner,
88+
workerManagerThread);
89+
Assertions.assertEquals(workerWaitingStrategy.getStrategyType(), StrategyType.WAITING);
90+
91+
try (
92+
MockedStatic<ServerLifeCycleManager> serverLifeCycleManagerMockedStatic =
93+
Mockito.mockStatic(ServerLifeCycleManager.class)) {
94+
serverLifeCycleManagerMockedStatic
95+
.when(() -> ServerLifeCycleManager.isRunning())
96+
.thenReturn(true);
97+
workerWaitingStrategy.reconnect();
98+
99+
}
100+
101+
try (
102+
MockedStatic<ServerLifeCycleManager> serverLifeCycleManagerMockedStatic =
103+
Mockito.mockStatic(ServerLifeCycleManager.class)) {
104+
doNothing().when(stoppable).stop(anyString());
105+
given(registryClient.getStoppable())
106+
.willReturn(stoppable);
107+
serverLifeCycleManagerMockedStatic
108+
.when(() -> ServerLifeCycleManager.recoverFromWaiting())
109+
.thenThrow(new ServerLifeCycleException(""));
110+
workerWaitingStrategy.reconnect();
111+
}
112+
113+
try (
114+
MockedStatic<ServerLifeCycleManager> serverLifeCycleManagerMockedStatic =
115+
Mockito.mockStatic(ServerLifeCycleManager.class)) {
116+
serverLifeCycleManagerMockedStatic
117+
.when(() -> ServerLifeCycleManager.recoverFromWaiting())
118+
.thenAnswer(invocation -> null);
119+
workerWaitingStrategy.reconnect();
120+
}
121+
}
122+
123+
@Test
124+
public void testWorkerWaitingStrategydisconnect() {
125+
WorkerWaitingStrategy workerWaitingStrategy = new WorkerWaitingStrategy(
126+
workerConfig,
127+
registryClient,
128+
messageRetryRunner,
129+
workerManagerThread);
130+
Assertions.assertEquals(workerWaitingStrategy.getStrategyType(), StrategyType.WAITING);
131+
132+
try (
133+
MockedStatic<ServerLifeCycleManager> serverLifeCycleManagerMockedStatic =
134+
Mockito.mockStatic(ServerLifeCycleManager.class)) {
135+
doNothing().when(stoppable).stop(anyString());
136+
given(registryClient.getStoppable())
137+
.willReturn(stoppable);
138+
serverLifeCycleManagerMockedStatic
139+
.when(() -> ServerLifeCycleManager.toWaiting())
140+
.thenThrow(new ServerLifeCycleException(""));
141+
workerWaitingStrategy.disconnect();
142+
}
143+
144+
try (
145+
MockedStatic<ServerLifeCycleManager> serverLifeCycleManagerMockedStatic =
146+
Mockito.mockStatic(ServerLifeCycleManager.class)) {
147+
given(connectStrategyProperties.getMaxWaitingTime()).willReturn(Duration.ofSeconds(1));
148+
given(workerConfig.getRegistryDisconnectStrategy()).willReturn(connectStrategyProperties);
149+
Mockito.reset(registryClient);
150+
doNothing().when(registryClient).connectUntilTimeout(any());
151+
serverLifeCycleManagerMockedStatic
152+
.when(() -> ServerLifeCycleManager.toWaiting())
153+
.thenAnswer(invocation -> null);
154+
workerWaitingStrategy.disconnect();
155+
}
156+
157+
try (
158+
MockedStatic<ServerLifeCycleManager> serverLifeCycleManagerMockedStatic =
159+
Mockito.mockStatic(ServerLifeCycleManager.class)) {
160+
given(connectStrategyProperties.getMaxWaitingTime()).willReturn(Duration.ofSeconds(1));
161+
given(workerConfig.getRegistryDisconnectStrategy()).willReturn(connectStrategyProperties);
162+
Mockito.reset(registryClient);
163+
doNothing().when(stoppable).stop(anyString());
164+
given(registryClient.getStoppable())
165+
.willReturn(stoppable);
166+
Mockito.doThrow(new RegistryException("TEST")).when(registryClient).connectUntilTimeout(any());
167+
serverLifeCycleManagerMockedStatic
168+
.when(() -> ServerLifeCycleManager.toWaiting())
169+
.thenAnswer(invocation -> null);
170+
workerWaitingStrategy.disconnect();
171+
}
172+
173+
try (
174+
MockedStatic<ServerLifeCycleManager> serverLifeCycleManagerMockedStatic =
175+
Mockito.mockStatic(ServerLifeCycleManager.class)) {
176+
Mockito.reset(workerConfig);
177+
given(workerConfig.getRegistryDisconnectStrategy()).willThrow(new NullPointerException(""));
178+
doNothing().when(stoppable).stop(anyString());
179+
given(registryClient.getStoppable())
180+
.willReturn(stoppable);
181+
serverLifeCycleManagerMockedStatic
182+
.when(() -> ServerLifeCycleManager.toWaiting())
183+
.thenAnswer(invocation -> null);
184+
workerWaitingStrategy.disconnect();
185+
}
186+
}
187+
}

0 commit comments

Comments
 (0)