Skip to content

Commit d325974

Browse files
authored
Merge 656988f into 23b2c8a
2 parents 23b2c8a + 656988f commit d325974

File tree

2 files changed

+31
-22
lines changed

2 files changed

+31
-22
lines changed

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceDispatchOperationFunction.java

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -62,19 +62,23 @@ public LogicTaskDispatchResponse operate(LogicTaskDispatchRequest taskDispatchRe
6262
LogUtils.setTaskInstanceLogFullPathMDC(taskExecutionContext.getLogPath());
6363

6464
MasterTaskExecutionContextHolder.putTaskExecutionContext(taskExecutionContext);
65-
// todo: calculate the delay in master dispatcher then we don't need to use a queue to store the task
66-
final long remainTime =
67-
DateUtils.getRemainTime(DateUtils.timeStampToDate(taskExecutionContext.getFirstSubmitTime()),
68-
TimeUnit.SECONDS.toMillis(taskExecutionContext.getDelayTime()));
69-
if (remainTime > 0) {
70-
log.info("Current taskInstance: {} is choosing delay execution, delay time: {}/ms, remainTime: {}/ms",
71-
taskExecutionContext.getTaskName(),
72-
TimeUnit.SECONDS.toMillis(taskExecutionContext.getDelayTime()), remainTime);
73-
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.DELAY_EXECUTION);
74-
// todo: send delay execution message
75-
return LogicTaskDispatchResponse.success(taskExecutionContext.getTaskInstanceId());
76-
}
7765

66+
int delayTime = taskExecutionContext.getDelayTime();
67+
if (delayTime > 0) {
68+
// todo: calculate the delay in master dispatcher then we don't need to use a queue to store the task
69+
final long remainTime =
70+
DateUtils.getRemainTime(DateUtils.timeStampToDate(taskExecutionContext.getFirstSubmitTime()),
71+
TimeUnit.SECONDS.toMillis(delayTime));
72+
if (remainTime > 0) {
73+
log.info(
74+
"Current taskInstance: {} is choosing delay execution, delay time: {}/ms, remainTime: {}/ms",
75+
taskExecutionContext.getTaskName(),
76+
TimeUnit.SECONDS.toMillis(taskExecutionContext.getDelayTime()), remainTime);
77+
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.DELAY_EXECUTION);
78+
// todo: send delay execution message
79+
return LogicTaskDispatchResponse.success(taskExecutionContext.getTaskInstanceId());
80+
}
81+
}
7882
final MasterDelayTaskExecuteRunnable masterDelayTaskExecuteRunnable =
7983
masterTaskExecuteRunnableFactoryBuilder
8084
.createWorkerDelayTaskExecuteRunnableFactory(taskExecutionContext.getTaskType())

dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceDispatchQueueLooper.java

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -75,16 +75,21 @@ public void run() {
7575
TaskExecutionContext taskExecutionContext = globalTaskInstanceDispatchQueue.take();
7676
LogUtils.setTaskInstanceLogFullPathMDC(taskExecutionContext.getLogPath());
7777
LogUtils.setTaskInstanceIdMDC(taskExecutionContext.getTaskInstanceId());
78-
// delay task process
79-
long remainTime =
80-
DateUtils.getRemainTime(DateUtils.timeStampToDate(taskExecutionContext.getFirstSubmitTime()),
81-
taskExecutionContext.getDelayTime() * 60L);
82-
if (remainTime > 0) {
83-
log.info("Current taskInstance is choose delay execution, delay time: {}s", remainTime);
84-
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.DELAY_EXECUTION);
85-
// todo: use delay running event
86-
workerMessageSender.sendMessage(taskExecutionContext,
87-
ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType.FINISH);
78+
79+
int delayTime = taskExecutionContext.getDelayTime();
80+
if (delayTime > 0) {
81+
// delay task process
82+
long remainTime =
83+
DateUtils.getRemainTime(
84+
DateUtils.timeStampToDate(taskExecutionContext.getFirstSubmitTime()),
85+
delayTime * 60L);
86+
if (remainTime > 0) {
87+
log.info("Current taskInstance is choose delay execution, delay time: {}s", remainTime);
88+
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.DELAY_EXECUTION);
89+
// todo: use delay running event
90+
workerMessageSender.sendMessage(taskExecutionContext,
91+
ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType.FINISH);
92+
}
8893
}
8994
WorkerDelayTaskExecuteRunnable workerTaskExecuteRunnable = WorkerTaskExecuteRunnableFactoryBuilder
9095
.createWorkerDelayTaskExecuteRunnableFactory(

0 commit comments

Comments
 (0)