9191import org .apache .dolphinscheduler .service .expand .CuringParamsService ;
9292import org .apache .dolphinscheduler .service .model .TaskNode ;
9393import org .apache .dolphinscheduler .service .process .ProcessService ;
94- import org .apache .dolphinscheduler .service .queue .PeerTaskInstancePriorityQueue ;
94+ import org .apache .dolphinscheduler .service .queue .StandByTaskInstancePriorityQueue ;
9595import org .apache .dolphinscheduler .service .utils .DagHelper ;
9696
9797import org .apache .commons .collections4 .CollectionUtils ;
@@ -208,7 +208,8 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable {
208208 /**
209209 * The StandBy task list, will be executed, need to know, the taskInstance in this queue may doesn't have id.
210210 */
211- private final PeerTaskInstancePriorityQueue readyToSubmitTaskQueue = new PeerTaskInstancePriorityQueue ();
211+ private final StandByTaskInstancePriorityQueue standByTaskInstancePriorityQueue =
212+ new StandByTaskInstancePriorityQueue ();
212213
213214 /**
214215 * wait to retry taskInstance map, taskCode as key, taskInstance as value
@@ -249,7 +250,7 @@ public WorkflowExecuteRunnable(
249250 this .taskInstanceDao = taskInstanceDao ;
250251 this .defaultTaskExecuteRunnableFactory = defaultTaskExecuteRunnableFactory ;
251252 this .listenerEventAlertManager = listenerEventAlertManager ;
252- TaskMetrics .registerTaskPrepared (readyToSubmitTaskQueue ::size );
253+ TaskMetrics .registerTaskPrepared (standByTaskInstancePriorityQueue ::size );
253254 }
254255
255256 /**
@@ -1430,7 +1431,7 @@ && tryToTakeOverTaskInstance(existTaskInstance)) {
14301431 // if previous node success , post node submit
14311432 for (TaskInstance task : taskInstances ) {
14321433
1433- if (readyToSubmitTaskQueue .contains (task )) {
1434+ if (standByTaskInstancePriorityQueue .contains (task )) {
14341435 log .warn ("Task is already at submit queue, taskInstanceName: {}" , task .getName ());
14351436 continue ;
14361437 }
@@ -1665,7 +1666,7 @@ private boolean processFailed() {
16651666 return true ;
16661667 }
16671668 if (workflowInstance .getFailureStrategy () == FailureStrategy .CONTINUE ) {
1668- return readyToSubmitTaskQueue .size () == 0 && taskExecuteRunnableMap .size () == 0
1669+ return standByTaskInstancePriorityQueue .size () == 0 && taskExecuteRunnableMap .size () == 0
16691670 && waitToRetryTaskInstanceMap .size () == 0 ;
16701671 }
16711672 }
@@ -1688,7 +1689,7 @@ private WorkflowExecutionStatus processReadyPause() {
16881689
16891690 List <TaskInstance > pauseList = getCompleteTaskByState (TaskExecutionStatus .PAUSE );
16901691 if (CollectionUtils .isNotEmpty (pauseList ) || workflowInstance .isBlocked () || !isComplementEnd ()
1691- || readyToSubmitTaskQueue .size () > 0 ) {
1692+ || standByTaskInstancePriorityQueue .size () > 0 ) {
16921693 return WorkflowExecutionStatus .PAUSE ;
16931694 } else {
16941695 return WorkflowExecutionStatus .SUCCESS ;
@@ -1711,8 +1712,8 @@ private WorkflowExecutionStatus processReadyBlock() {
17111712 }
17121713 }
17131714 }
1714- if (readyToSubmitTaskQueue .size () > 0 ) {
1715- for (Iterator <TaskInstance > iter = readyToSubmitTaskQueue .iterator (); iter .hasNext ();) {
1715+ if (standByTaskInstancePriorityQueue .size () > 0 ) {
1716+ for (Iterator <TaskInstance > iter = standByTaskInstancePriorityQueue .iterator (); iter .hasNext ();) {
17161717 iter .next ().setState (TaskExecutionStatus .PAUSE );
17171718 }
17181719 }
@@ -1773,7 +1774,7 @@ private WorkflowExecutionStatus getProcessInstanceState(ProcessInstance instance
17731774 // success
17741775 if (state == WorkflowExecutionStatus .RUNNING_EXECUTION ) {
17751776 List <TaskInstance > killTasks = getCompleteTaskByState (TaskExecutionStatus .KILL );
1776- if (readyToSubmitTaskQueue .size () > 0 || waitToRetryTaskInstanceMap .size () > 0 ) {
1777+ if (standByTaskInstancePriorityQueue .size () > 0 || waitToRetryTaskInstanceMap .size () > 0 ) {
17771778 // tasks currently pending submission, no retries, indicating that depend is waiting to complete
17781779 return WorkflowExecutionStatus .RUNNING_EXECUTION ;
17791780 } else if (CollectionUtils .isNotEmpty (killTasks )) {
@@ -1878,7 +1879,7 @@ private DependResult getDependResultForTask(TaskInstance taskInstance) {
18781879 * @param taskInstance task instance
18791880 */
18801881 public void addTaskToStandByList (TaskInstance taskInstance ) {
1881- if (readyToSubmitTaskQueue .contains (taskInstance )) {
1882+ if (standByTaskInstancePriorityQueue .contains (taskInstance )) {
18821883 log .warn ("Task already exists in ready submit queue, no need to add again, task code:{}" ,
18831884 taskInstance .getTaskCode ());
18841885 return ;
@@ -1888,7 +1889,7 @@ public void addTaskToStandByList(TaskInstance taskInstance) {
18881889 taskInstance .getId (),
18891890 taskInstance .getTaskCode ());
18901891 TaskMetrics .incTaskInstanceByState ("submit" );
1891- readyToSubmitTaskQueue .put (taskInstance );
1892+ standByTaskInstancePriorityQueue .put (taskInstance );
18921893 }
18931894
18941895 /**
@@ -1897,7 +1898,7 @@ public void addTaskToStandByList(TaskInstance taskInstance) {
18971898 * @param taskInstance task instance
18981899 */
18991900 private boolean removeTaskFromStandbyList (TaskInstance taskInstance ) {
1900- return readyToSubmitTaskQueue .remove (taskInstance );
1901+ return standByTaskInstancePriorityQueue .remove (taskInstance );
19011902 }
19021903
19031904 /**
@@ -1906,7 +1907,7 @@ private boolean removeTaskFromStandbyList(TaskInstance taskInstance) {
19061907 * @return Boolean whether has retry task in standby
19071908 */
19081909 private boolean hasRetryTaskInStandBy () {
1909- for (Iterator <TaskInstance > iter = readyToSubmitTaskQueue .iterator (); iter .hasNext ();) {
1910+ for (Iterator <TaskInstance > iter = standByTaskInstancePriorityQueue .iterator (); iter .hasNext ();) {
19101911 if (iter .next ().getState ().isFailure ()) {
19111912 return true ;
19121913 }
@@ -1923,8 +1924,8 @@ public void killAllTasks() {
19231924 workflowInstance .getId (),
19241925 taskExecuteRunnableMap .size ());
19251926
1926- if (readyToSubmitTaskQueue .size () > 0 ) {
1927- readyToSubmitTaskQueue .clear ();
1927+ if (standByTaskInstancePriorityQueue .size () > 0 ) {
1928+ standByTaskInstancePriorityQueue .clear ();
19281929 }
19291930
19301931 for (long taskCode : taskExecuteRunnableMap .keySet ()) {
@@ -1965,7 +1966,7 @@ public boolean workFlowFinish() {
19651966 public void submitStandByTask () throws StateEventHandleException {
19661967 ProcessInstance workflowInstance = workflowExecuteContext .getWorkflowInstance ();
19671968 TaskInstance task ;
1968- while ((task = readyToSubmitTaskQueue .peek ()) != null ) {
1969+ while ((task = standByTaskInstancePriorityQueue .peek ()) != null ) {
19691970 // stop tasks which is retrying if forced success happens
19701971 if (task .getId () != null && task .taskCanRetry ()) {
19711972 TaskInstance retryTask = taskInstanceDao .queryById (task .getId ());
0 commit comments