Skip to content

Commit 747b7ec

Browse files
authored
Merge c9a54e9 into e1fcd4e
2 parents e1fcd4e + c9a54e9 commit 747b7ec

File tree

3 files changed

+50
-47
lines changed

3 files changed

+50
-47
lines changed

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@
9191
import org.apache.dolphinscheduler.service.expand.CuringParamsService;
9292
import org.apache.dolphinscheduler.service.model.TaskNode;
9393
import org.apache.dolphinscheduler.service.process.ProcessService;
94-
import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue;
94+
import org.apache.dolphinscheduler.service.queue.StandByTaskInstancePriorityQueue;
9595
import org.apache.dolphinscheduler.service.utils.DagHelper;
9696

9797
import org.apache.commons.collections4.CollectionUtils;
@@ -208,7 +208,8 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable {
208208
/**
209209
* The StandBy task list, will be executed, need to know, the taskInstance in this queue may doesn't have id.
210210
*/
211-
private final PeerTaskInstancePriorityQueue readyToSubmitTaskQueue = new PeerTaskInstancePriorityQueue();
211+
private final StandByTaskInstancePriorityQueue standByTaskInstancePriorityQueue =
212+
new StandByTaskInstancePriorityQueue();
212213

213214
/**
214215
* wait to retry taskInstance map, taskCode as key, taskInstance as value
@@ -249,7 +250,7 @@ public WorkflowExecuteRunnable(
249250
this.taskInstanceDao = taskInstanceDao;
250251
this.defaultTaskExecuteRunnableFactory = defaultTaskExecuteRunnableFactory;
251252
this.listenerEventAlertManager = listenerEventAlertManager;
252-
TaskMetrics.registerTaskPrepared(readyToSubmitTaskQueue::size);
253+
TaskMetrics.registerTaskPrepared(standByTaskInstancePriorityQueue::size);
253254
}
254255

255256
/**
@@ -1430,7 +1431,7 @@ && tryToTakeOverTaskInstance(existTaskInstance)) {
14301431
// if previous node success , post node submit
14311432
for (TaskInstance task : taskInstances) {
14321433

1433-
if (readyToSubmitTaskQueue.contains(task)) {
1434+
if (standByTaskInstancePriorityQueue.contains(task)) {
14341435
log.warn("Task is already at submit queue, taskInstanceName: {}", task.getName());
14351436
continue;
14361437
}
@@ -1665,7 +1666,7 @@ private boolean processFailed() {
16651666
return true;
16661667
}
16671668
if (workflowInstance.getFailureStrategy() == FailureStrategy.CONTINUE) {
1668-
return readyToSubmitTaskQueue.size() == 0 && taskExecuteRunnableMap.size() == 0
1669+
return standByTaskInstancePriorityQueue.size() == 0 && taskExecuteRunnableMap.size() == 0
16691670
&& waitToRetryTaskInstanceMap.size() == 0;
16701671
}
16711672
}
@@ -1688,7 +1689,7 @@ private WorkflowExecutionStatus processReadyPause() {
16881689

16891690
List<TaskInstance> pauseList = getCompleteTaskByState(TaskExecutionStatus.PAUSE);
16901691
if (CollectionUtils.isNotEmpty(pauseList) || workflowInstance.isBlocked() || !isComplementEnd()
1691-
|| readyToSubmitTaskQueue.size() > 0) {
1692+
|| standByTaskInstancePriorityQueue.size() > 0) {
16921693
return WorkflowExecutionStatus.PAUSE;
16931694
} else {
16941695
return WorkflowExecutionStatus.SUCCESS;
@@ -1711,8 +1712,8 @@ private WorkflowExecutionStatus processReadyBlock() {
17111712
}
17121713
}
17131714
}
1714-
if (readyToSubmitTaskQueue.size() > 0) {
1715-
for (Iterator<TaskInstance> iter = readyToSubmitTaskQueue.iterator(); iter.hasNext();) {
1715+
if (standByTaskInstancePriorityQueue.size() > 0) {
1716+
for (Iterator<TaskInstance> iter = standByTaskInstancePriorityQueue.iterator(); iter.hasNext();) {
17161717
iter.next().setState(TaskExecutionStatus.PAUSE);
17171718
}
17181719
}
@@ -1773,7 +1774,7 @@ private WorkflowExecutionStatus getProcessInstanceState(ProcessInstance instance
17731774
// success
17741775
if (state == WorkflowExecutionStatus.RUNNING_EXECUTION) {
17751776
List<TaskInstance> killTasks = getCompleteTaskByState(TaskExecutionStatus.KILL);
1776-
if (readyToSubmitTaskQueue.size() > 0 || waitToRetryTaskInstanceMap.size() > 0) {
1777+
if (standByTaskInstancePriorityQueue.size() > 0 || waitToRetryTaskInstanceMap.size() > 0) {
17771778
// tasks currently pending submission, no retries, indicating that depend is waiting to complete
17781779
return WorkflowExecutionStatus.RUNNING_EXECUTION;
17791780
} else if (CollectionUtils.isNotEmpty(killTasks)) {
@@ -1878,7 +1879,7 @@ private DependResult getDependResultForTask(TaskInstance taskInstance) {
18781879
* @param taskInstance task instance
18791880
*/
18801881
public void addTaskToStandByList(TaskInstance taskInstance) {
1881-
if (readyToSubmitTaskQueue.contains(taskInstance)) {
1882+
if (standByTaskInstancePriorityQueue.contains(taskInstance)) {
18821883
log.warn("Task already exists in ready submit queue, no need to add again, task code:{}",
18831884
taskInstance.getTaskCode());
18841885
return;
@@ -1888,7 +1889,7 @@ public void addTaskToStandByList(TaskInstance taskInstance) {
18881889
taskInstance.getId(),
18891890
taskInstance.getTaskCode());
18901891
TaskMetrics.incTaskInstanceByState("submit");
1891-
readyToSubmitTaskQueue.put(taskInstance);
1892+
standByTaskInstancePriorityQueue.put(taskInstance);
18921893
}
18931894

18941895
/**
@@ -1897,7 +1898,7 @@ public void addTaskToStandByList(TaskInstance taskInstance) {
18971898
* @param taskInstance task instance
18981899
*/
18991900
private boolean removeTaskFromStandbyList(TaskInstance taskInstance) {
1900-
return readyToSubmitTaskQueue.remove(taskInstance);
1901+
return standByTaskInstancePriorityQueue.remove(taskInstance);
19011902
}
19021903

19031904
/**
@@ -1906,7 +1907,7 @@ private boolean removeTaskFromStandbyList(TaskInstance taskInstance) {
19061907
* @return Boolean whether has retry task in standby
19071908
*/
19081909
private boolean hasRetryTaskInStandBy() {
1909-
for (Iterator<TaskInstance> iter = readyToSubmitTaskQueue.iterator(); iter.hasNext();) {
1910+
for (Iterator<TaskInstance> iter = standByTaskInstancePriorityQueue.iterator(); iter.hasNext();) {
19101911
if (iter.next().getState().isFailure()) {
19111912
return true;
19121913
}
@@ -1923,8 +1924,8 @@ public void killAllTasks() {
19231924
workflowInstance.getId(),
19241925
taskExecuteRunnableMap.size());
19251926

1926-
if (readyToSubmitTaskQueue.size() > 0) {
1927-
readyToSubmitTaskQueue.clear();
1927+
if (standByTaskInstancePriorityQueue.size() > 0) {
1928+
standByTaskInstancePriorityQueue.clear();
19281929
}
19291930

19301931
for (long taskCode : taskExecuteRunnableMap.keySet()) {
@@ -1965,7 +1966,7 @@ public boolean workFlowFinish() {
19651966
public void submitStandByTask() throws StateEventHandleException {
19661967
ProcessInstance workflowInstance = workflowExecuteContext.getWorkflowInstance();
19671968
TaskInstance task;
1968-
while ((task = readyToSubmitTaskQueue.peek()) != null) {
1969+
while ((task = standByTaskInstancePriorityQueue.peek()) != null) {
19691970
// stop tasks which is retrying if forced success happens
19701971
if (task.getId() != null && task.taskCanRetry()) {
19711972
TaskInstance retryTask = taskInstanceDao.queryById(task.getId());

dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java renamed to dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/StandByTaskInstancePriorityQueue.java

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.dolphinscheduler.service.queue;
1919

20-
import org.apache.dolphinscheduler.common.constants.Constants;
2120
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
2221
import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException;
2322

@@ -35,7 +34,7 @@
3534
* Task instances priority queue implementation
3635
* All the task instances are in the same process instance.
3736
*/
38-
public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInstance> {
37+
public class StandByTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInstance> {
3938

4039
/**
4140
* queue size
@@ -45,7 +44,8 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInst
4544
/**
4645
* queue
4746
*/
48-
private final PriorityQueue<TaskInstance> queue = new PriorityQueue<>(QUEUE_MAX_SIZE, new TaskInfoComparator());
47+
private final PriorityQueue<TaskInstance> queue =
48+
new PriorityQueue<>(QUEUE_MAX_SIZE, new TaskInstancePriorityComparator());
4949
private final Set<String> taskInstanceIdentifySet = Collections.synchronizedSet(new HashSet<>());
5050

5151
/**
@@ -163,24 +163,25 @@ private String getTaskInstanceIdentify(TaskInstance taskInstance) {
163163
}
164164

165165
/**
166-
* TaskInfoComparator
166+
* This comparator is used to sort task instances in the standby queue.
167+
* If the TaskInstance is in the same taskGroup, then we will sort the TaskInstance by {@link TaskInstance#getTaskGroupPriority()} in the taskGroup.
168+
* Otherwise, we will sort the TaskInstance by {@link TaskInstance#getTaskInstancePriority()} in the workflow.
167169
*/
168-
private static class TaskInfoComparator implements Comparator<TaskInstance> {
169-
170-
/**
171-
* compare o1 o2
172-
*
173-
* @param o1 o1
174-
* @param o2 o2
175-
* @return compare result
176-
*/
170+
private static class TaskInstancePriorityComparator implements Comparator<TaskInstance> {
171+
177172
@Override
178173
public int compare(TaskInstance o1, TaskInstance o2) {
179-
if (o1.getTaskInstancePriority().equals(o2.getTaskInstancePriority())) {
180-
// larger number, higher priority
181-
return Constants.OPPOSITE_VALUE * Integer.compare(o1.getTaskGroupPriority(), o2.getTaskGroupPriority());
174+
int taskPriorityInTaskGroup = -1 * Integer.compare(o1.getTaskGroupPriority(), o2.getTaskGroupPriority());
175+
int taskInstancePriorityInWorkflow =
176+
Long.compare(o1.getTaskInstancePriority().getCode(), o2.getTaskInstancePriority().getCode());
177+
178+
if (o1.getTaskGroupId() == o2.getTaskGroupId()) {
179+
// If at the same taskGroup
180+
if (taskPriorityInTaskGroup != 0) {
181+
return taskPriorityInTaskGroup;
182+
}
182183
}
183-
return o1.getTaskInstancePriority().compareTo(o2.getTaskInstancePriority());
184+
return taskInstancePriorityInWorkflow;
184185
}
185186
}
186187
}

dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java renamed to dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/StandByTaskInstancePriorityQueueTest.java

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,11 @@
2626
import org.junit.jupiter.api.Assertions;
2727
import org.junit.jupiter.api.Test;
2828

29-
public class PeerTaskInstancePriorityQueueTest {
29+
public class StandByTaskInstancePriorityQueueTest {
3030

3131
@Test
3232
public void put() throws TaskPriorityQueueException {
33-
PeerTaskInstancePriorityQueue queue = new PeerTaskInstancePriorityQueue();
33+
StandByTaskInstancePriorityQueue queue = new StandByTaskInstancePriorityQueue();
3434
TaskInstance taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH, 1);
3535
TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 1);
3636
queue.put(taskInstanceHigPriority);
@@ -42,30 +42,31 @@ public void put() throws TaskPriorityQueueException {
4242

4343
@Test
4444
public void take() throws Exception {
45-
PeerTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue();
45+
StandByTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue();
4646
int peekBeforeLength = queue.size();
4747
queue.take();
4848
Assertions.assertTrue(queue.size() < peekBeforeLength);
4949
}
5050

5151
@Test
5252
public void poll() throws Exception {
53-
PeerTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue();
53+
StandByTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue();
5454
Assertions.assertThrows(TaskPriorityQueueException.class, () -> {
5555
queue.poll(1000, TimeUnit.MILLISECONDS);
5656
});
5757
}
5858

5959
@Test
6060
public void peek() throws Exception {
61-
PeerTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue();
61+
StandByTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue();
6262
int peekBeforeLength = queue.size();
6363
Assertions.assertEquals(peekBeforeLength, queue.size());
6464
}
6565

6666
@Test
6767
public void peekTaskGroupPriority() throws Exception {
68-
PeerTaskInstancePriorityQueue queue = new PeerTaskInstancePriorityQueue();
68+
StandByTaskInstancePriorityQueue queue = new StandByTaskInstancePriorityQueue();
69+
6970
TaskInstance taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH, 2);
7071
TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.HIGH, 1);
7172
queue.put(taskInstanceMediumPriority);
@@ -80,23 +81,23 @@ public void peekTaskGroupPriority() throws Exception {
8081
queue.put(taskInstanceHigPriority);
8182
taskInstance = queue.peek();
8283
queue.clear();
83-
Assertions.assertEquals(taskInstance.getName(), "medium");
84+
Assertions.assertEquals("medium", taskInstance.getName());
8485

8586
taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH, 1);
8687
taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 2);
8788
queue.put(taskInstanceMediumPriority);
8889
queue.put(taskInstanceHigPriority);
8990
taskInstance = queue.peek();
9091
queue.clear();
91-
Assertions.assertEquals(taskInstance.getName(), "high");
92+
Assertions.assertEquals("medium", taskInstance.getName());
9293

9394
taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH, 1);
9495
taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 1);
9596
queue.put(taskInstanceMediumPriority);
9697
queue.put(taskInstanceHigPriority);
9798
taskInstance = queue.peek();
9899
queue.clear();
99-
Assertions.assertEquals(taskInstance.getName(), "high");
100+
Assertions.assertEquals("high", taskInstance.getName());
100101

101102
}
102103

@@ -107,7 +108,7 @@ public void size() throws Exception {
107108

108109
@Test
109110
public void contains() throws Exception {
110-
PeerTaskInstancePriorityQueue queue = new PeerTaskInstancePriorityQueue();
111+
StandByTaskInstancePriorityQueue queue = new StandByTaskInstancePriorityQueue();
111112
TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 1);
112113
queue.put(taskInstanceMediumPriority);
113114
Assertions.assertTrue(queue.contains(taskInstanceMediumPriority));
@@ -117,8 +118,8 @@ public void contains() throws Exception {
117118
}
118119

119120
@Test
120-
public void remove() throws Exception {
121-
PeerTaskInstancePriorityQueue queue = new PeerTaskInstancePriorityQueue();
121+
public void remove() {
122+
StandByTaskInstancePriorityQueue queue = new StandByTaskInstancePriorityQueue();
122123
TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 1);
123124
queue.put(taskInstanceMediumPriority);
124125
int peekBeforeLength = queue.size();
@@ -133,8 +134,8 @@ public void remove() throws Exception {
133134
* @return queue
134135
* @throws Exception
135136
*/
136-
private PeerTaskInstancePriorityQueue getPeerTaskInstancePriorityQueue() throws Exception {
137-
PeerTaskInstancePriorityQueue queue = new PeerTaskInstancePriorityQueue();
137+
private StandByTaskInstancePriorityQueue getPeerTaskInstancePriorityQueue() throws Exception {
138+
StandByTaskInstancePriorityQueue queue = new StandByTaskInstancePriorityQueue();
138139
TaskInstance taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH, 1);
139140
TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 1);
140141
taskInstanceHigPriority.setTaskGroupPriority(3);

0 commit comments

Comments
 (0)