Skip to content

Commit dd29f27

Browse files
authored
Merge f111449 into 040ab34
2 parents 040ab34 + f111449 commit dd29f27

File tree

4 files changed

+131
-9
lines changed

4 files changed

+131
-9
lines changed

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import java.util.concurrent.DelayQueue;
2121

22+
import lombok.SneakyThrows;
2223
import lombok.extern.slf4j.Slf4j;
2324

2425
import org.springframework.stereotype.Component;
@@ -37,7 +38,8 @@ public void submitTaskExecuteRunnable(DefaultTaskExecuteRunnable priorityTaskExe
3738
queue.put(priorityTaskExecuteRunnable);
3839
}
3940

40-
public DefaultTaskExecuteRunnable takeTaskExecuteRunnable() throws InterruptedException {
41+
@SneakyThrows
42+
public DefaultTaskExecuteRunnable takeTaskExecuteRunnable() {
4143
return queue.take();
4244
}
4345

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
2121
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
22+
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
2223
import org.apache.dolphinscheduler.server.master.runner.dispatcher.TaskDispatchFactory;
2324
import org.apache.dolphinscheduler.server.master.runner.dispatcher.TaskDispatcher;
2425

@@ -65,14 +66,15 @@ public synchronized void start() {
6566
public void run() {
6667
DefaultTaskExecuteRunnable defaultTaskExecuteRunnable;
6768
while (RUNNING_FLAG.get()) {
69+
defaultTaskExecuteRunnable = globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable();
6870
try {
69-
defaultTaskExecuteRunnable = globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable();
70-
} catch (InterruptedException e) {
71-
log.warn("Get waiting dispatch task failed, the current thread has been interrupted, will stop loop");
72-
Thread.currentThread().interrupt();
73-
break;
74-
}
75-
try {
71+
TaskExecutionStatus status = defaultTaskExecuteRunnable.getTaskInstance().getState();
72+
if (status != TaskExecutionStatus.SUBMITTED_SUCCESS) {
73+
log.warn("The TaskInstance {} state is : {}, will not dispatch",
74+
defaultTaskExecuteRunnable.getTaskInstance().getName(), status);
75+
continue;
76+
}
77+
7678
TaskDispatcher taskDispatcher =
7779
taskDispatchFactory.getTaskDispatcher(defaultTaskExecuteRunnable.getTaskInstance());
7880
taskDispatcher.dispatchTask(defaultTaskExecuteRunnable);
@@ -86,14 +88,15 @@ public void run() {
8688
log.error("Dispatch Task: {} failed", defaultTaskExecuteRunnable.getTaskInstance().getName(), e);
8789
}
8890
}
89-
log.info("GlobalTaskDispatchWaitingQueueLooper started...");
9091
}
9192

9293
@Override
9394
public void close() throws Exception {
9495
if (RUNNING_FLAG.compareAndSet(true, false)) {
9596
log.info("GlobalTaskDispatchWaitingQueueLooper stopping...");
9697
log.info("GlobalTaskDispatchWaitingQueueLooper stopped...");
98+
} else {
99+
log.error("GlobalTaskDispatchWaitingQueueLooper is not started");
97100
}
98101
}
99102
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.server.master.runner;
19+
20+
import static java.time.Duration.ofSeconds;
21+
import static org.awaitility.Awaitility.await;
22+
import static org.mockito.ArgumentMatchers.any;
23+
import static org.mockito.Mockito.atLeastOnce;
24+
import static org.mockito.Mockito.doNothing;
25+
import static org.mockito.Mockito.mock;
26+
import static org.mockito.Mockito.never;
27+
import static org.mockito.Mockito.verify;
28+
import static org.mockito.Mockito.when;
29+
30+
import org.apache.dolphinscheduler.common.utils.JSONUtils;
31+
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
32+
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
33+
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
34+
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
35+
import org.apache.dolphinscheduler.server.master.runner.dispatcher.TaskDispatchFactory;
36+
import org.apache.dolphinscheduler.server.master.runner.dispatcher.TaskDispatcher;
37+
import org.apache.dolphinscheduler.server.master.runner.operator.TaskExecuteRunnableOperatorManager;
38+
39+
import java.util.HashMap;
40+
41+
import org.junit.jupiter.api.Test;
42+
import org.junit.jupiter.api.extension.ExtendWith;
43+
import org.mockito.InjectMocks;
44+
import org.mockito.Mock;
45+
import org.mockito.junit.jupiter.MockitoExtension;
46+
import org.mockito.junit.jupiter.MockitoSettings;
47+
import org.mockito.quality.Strictness;
48+
49+
@ExtendWith(MockitoExtension.class)
50+
@MockitoSettings(strictness = Strictness.LENIENT)
51+
class GlobalTaskDispatchWaitingQueueLooperTest {
52+
53+
@InjectMocks
54+
private GlobalTaskDispatchWaitingQueueLooper globalTaskDispatchWaitingQueueLooper;
55+
56+
@Mock
57+
private GlobalTaskDispatchWaitingQueue globalTaskDispatchWaitingQueue;
58+
59+
@Mock
60+
private TaskDispatchFactory taskDispatchFactory;
61+
62+
@Test
63+
void testTaskExecutionRunnableStatusIsNotSubmitted() throws Exception {
64+
ProcessInstance processInstance = new ProcessInstance();
65+
TaskInstance taskInstance = new TaskInstance();
66+
taskInstance.setState(TaskExecutionStatus.KILL);
67+
taskInstance.setTaskParams(JSONUtils.toJsonString(new HashMap<>()));
68+
TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
69+
TaskExecuteRunnableOperatorManager taskExecuteRunnableOperatorManager =
70+
new TaskExecuteRunnableOperatorManager();
71+
DefaultTaskExecuteRunnable defaultTaskExecuteRunnable = new DefaultTaskExecuteRunnable(processInstance,
72+
taskInstance, taskExecutionContext, taskExecuteRunnableOperatorManager);
73+
74+
TaskDispatcher taskDispatcher = mock(TaskDispatcher.class);
75+
when(taskDispatchFactory.getTaskDispatcher(taskInstance)).thenReturn(taskDispatcher);
76+
doNothing().when(taskDispatcher).dispatchTask(any());
77+
78+
when(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable()).thenReturn(defaultTaskExecuteRunnable);
79+
globalTaskDispatchWaitingQueueLooper.start();
80+
await().during(ofSeconds(1))
81+
.untilAsserted(() -> verify(taskDispatchFactory, never()).getTaskDispatcher(taskInstance));
82+
globalTaskDispatchWaitingQueueLooper.close();
83+
}
84+
85+
@Test
86+
void testTaskExecutionRunnableStatusIsSubmitted() throws Exception {
87+
ProcessInstance processInstance = new ProcessInstance();
88+
TaskInstance taskInstance = new TaskInstance();
89+
taskInstance.setState(TaskExecutionStatus.SUBMITTED_SUCCESS);
90+
taskInstance.setTaskParams(JSONUtils.toJsonString(new HashMap<>()));
91+
TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
92+
TaskExecuteRunnableOperatorManager taskExecuteRunnableOperatorManager =
93+
new TaskExecuteRunnableOperatorManager();
94+
DefaultTaskExecuteRunnable defaultTaskExecuteRunnable = new DefaultTaskExecuteRunnable(processInstance,
95+
taskInstance, taskExecutionContext, taskExecuteRunnableOperatorManager);
96+
97+
TaskDispatcher taskDispatcher = mock(TaskDispatcher.class);
98+
when(taskDispatchFactory.getTaskDispatcher(taskInstance)).thenReturn(taskDispatcher);
99+
doNothing().when(taskDispatcher).dispatchTask(any());
100+
101+
when(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable()).thenReturn(defaultTaskExecuteRunnable);
102+
globalTaskDispatchWaitingQueueLooper.start();
103+
await().atMost(ofSeconds(1)).untilAsserted(() -> {
104+
verify(taskDispatchFactory, atLeastOnce()).getTaskDispatcher(any(TaskInstance.class));
105+
verify(taskDispatcher, atLeastOnce()).dispatchTask(any(TaskExecuteRunnable.class));
106+
});
107+
globalTaskDispatchWaitingQueueLooper.close();
108+
109+
}
110+
}

pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@
8888
<exec-maven-plugin.version>3.0.0</exec-maven-plugin.version>
8989
<owasp-dependency-check-maven.version>7.1.2</owasp-dependency-check-maven.version>
9090
<lombok.version>1.18.20</lombok.version>
91+
<awaitility.version>4.2.0</awaitility.version>
9192
<docker.hub>apache</docker.hub>
9293
<docker.repo>${project.name}</docker.repo>
9394
<docker.tag>${project.version}</docker.tag>
@@ -365,6 +366,12 @@
365366
<version>${lombok.version}</version>
366367
<scope>provided</scope>
367368
</dependency>
369+
<dependency>
370+
<groupId>org.awaitility</groupId>
371+
<artifactId>awaitility</artifactId>
372+
<version>${awaitility.version}</version>
373+
<scope>test</scope>
374+
</dependency>
368375
</dependencies>
369376

370377
<build>

0 commit comments

Comments
 (0)