Skip to content

Commit d810c48

Browse files
authored
Fix taskGroupQueue doesn't removed from inQueue where wakeup failed (#14200)
* Fix taskGroupQueue doesn't removed from inQueue where wakeup failed * Fix avaliable taskGroup is 0 the TaskGroupQueue doesn't out queue
1 parent c6ac356 commit d810c48

File tree

3 files changed

+87
-37
lines changed

3 files changed

+87
-37
lines changed

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -496,24 +496,33 @@ public void taskFinished(TaskInstance taskInstance) throws StateEventHandleExcep
496496
*
497497
*/
498498
public void releaseTaskGroup(TaskInstance taskInstance) throws RemotingException, InterruptedException {
499-
if (taskInstance.getTaskGroupId() > 0) {
500-
TaskInstance nextTaskInstance = this.processService.releaseTaskGroup(taskInstance);
501-
if (nextTaskInstance != null) {
502-
if (nextTaskInstance.getProcessInstanceId() == taskInstance.getProcessInstanceId()) {
503-
TaskStateEvent nextEvent = TaskStateEvent.builder()
504-
.processInstanceId(processInstance.getId())
505-
.taskInstanceId(nextTaskInstance.getId())
506-
.type(StateEventType.WAKE_UP_TASK_GROUP)
507-
.build();
508-
this.stateEvents.add(nextEvent);
509-
} else {
510-
ProcessInstance processInstance =
511-
this.processService.findProcessInstanceById(nextTaskInstance.getProcessInstanceId());
512-
this.masterRpcClient.sendSyncCommand(Host.of(processInstance.getHost()),
513-
new TaskWakeupRequest(processInstance.getId(), nextTaskInstance.getId()).convert2Command());
514-
}
515-
}
499+
// todo: use Integer
500+
if (taskInstance.getTaskGroupId() <= 0) {
501+
log.info("The current TaskInstance: {} doesn't use taskGroup, no need to release taskGroup",
502+
taskInstance.getName());
503+
}
504+
TaskInstance nextTaskInstance = processService.releaseTaskGroup(taskInstance);
505+
if (nextTaskInstance == null) {
506+
log.info(
507+
"The current TaskInstance: {} is the last taskInstance in the taskGroup, no need to wakeup next taskInstance",
508+
taskInstance.getName());
509+
return;
510+
}
511+
if (nextTaskInstance.getProcessInstanceId() == taskInstance.getProcessInstanceId()) {
512+
TaskStateEvent nextEvent = TaskStateEvent.builder()
513+
.processInstanceId(processInstance.getId())
514+
.taskInstanceId(nextTaskInstance.getId())
515+
.type(StateEventType.WAKE_UP_TASK_GROUP)
516+
.build();
517+
stateEvents.add(nextEvent);
518+
} else {
519+
ProcessInstance processInstance =
520+
processService.findProcessInstanceById(nextTaskInstance.getProcessInstanceId());
521+
masterRpcClient.sendSyncCommand(
522+
Host.of(processInstance.getHost()),
523+
new TaskWakeupRequest(processInstance.getId(), nextTaskInstance.getId()).convert2Command());
516524
}
525+
log.info("Success send wakeup message to next taskInstance: {}", nextTaskInstance.getId());
517526
}
518527

519528
/**

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskTimeoutOperator.java

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,18 @@
1919

2020
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
2121
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
22+
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
2223
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
24+
import org.apache.dolphinscheduler.remote.command.task.TaskKillRequest;
25+
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
26+
import org.apache.dolphinscheduler.remote.utils.Host;
27+
import org.apache.dolphinscheduler.server.master.rpc.MasterRpcClient;
2328
import org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable;
2429

30+
import org.apache.commons.lang3.StringUtils;
31+
32+
import java.util.Date;
33+
2534
import lombok.extern.slf4j.Slf4j;
2635

2736
import org.springframework.beans.factory.annotation.Autowired;
@@ -34,6 +43,9 @@ public class TaskTimeoutOperator implements TaskOperator {
3443
@Autowired
3544
private TaskInstanceDao taskInstanceDao;
3645

46+
@Autowired
47+
private MasterRpcClient masterRpcClient;
48+
3749
@Override
3850
public void handle(DefaultTaskExecuteRunnable taskExecuteRunnable) {
3951
// Right now, if the task is running in worker, the timeout strategy will be handled at worker side.
@@ -48,8 +60,27 @@ public void handle(DefaultTaskExecuteRunnable taskExecuteRunnable) {
4860
taskInstance.getName(), taskTimeoutStrategy.name());
4961
return;
5062
}
51-
taskExecuteRunnable.kill();
52-
log.info("TaskInstance: {} timeout, killed the task instance", taskInstance.getName());
63+
try {
64+
timeoutTaskInstanceInDB(taskInstance);
65+
killRemoteTaskInstanceInThreadPool(taskInstance);
66+
log.info("TaskInstance: {} timeout, killed the task instance", taskInstance.getName());
67+
} catch (Exception ex) {
68+
log.error("TaskInstance timeout {} failed", taskInstance.getName(), ex);
69+
}
5370

5471
}
72+
73+
private void timeoutTaskInstanceInDB(TaskInstance taskInstance) {
74+
taskInstance.setState(TaskExecutionStatus.FAILURE);
75+
taskInstance.setEndTime(new Date());
76+
taskInstanceDao.updateTaskInstance(taskInstance);
77+
}
78+
79+
private void killRemoteTaskInstanceInThreadPool(TaskInstance taskInstance) throws RemotingException {
80+
if (StringUtils.isEmpty(taskInstance.getHost())) {
81+
return;
82+
}
83+
TaskKillRequest killCommand = new TaskKillRequest(taskInstance.getId());
84+
masterRpcClient.send(Host.of(taskInstance.getHost()), killCommand.convert2Command());
85+
}
5586
}

dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2334,14 +2334,17 @@ public boolean acquireTaskGroup(int taskInstanceId,
23342334
TaskGroup taskGroup = taskGroupMapper.selectById(taskGroupId);
23352335
if (taskGroup == null) {
23362336
// we don't throw exception here, to avoid the task group has been deleted during workflow running
2337+
log.warn("The taskGroup is not exist no need to acquire taskGroup, taskGroupId: {}", taskGroupId);
23372338
return true;
23382339
}
23392340
// if task group is not applicable
23402341
if (taskGroup.getStatus() == Flag.NO.getCode()) {
2342+
log.warn("The taskGroup status is {}, no need to acquire taskGroup, taskGroupId: {}", taskGroup.getStatus(),
2343+
taskGroupId);
23412344
return true;
23422345
}
23432346
// Create a waiting taskGroupQueue, after acquire resource, we can update the status to ACQUIRE_SUCCESS
2344-
TaskGroupQueue taskGroupQueue = this.taskGroupQueueMapper.queryByTaskId(taskInstanceId);
2347+
TaskGroupQueue taskGroupQueue = taskGroupQueueMapper.queryByTaskId(taskInstanceId);
23452348
if (taskGroupQueue == null) {
23462349
taskGroupQueue = insertIntoTaskGroupQueue(
23472350
taskInstanceId,
@@ -2350,14 +2353,12 @@ public boolean acquireTaskGroup(int taskInstanceId,
23502353
workflowInstanceId,
23512354
taskGroupPriority,
23522355
TaskGroupQueueStatus.WAIT_QUEUE);
2356+
log.info("Insert TaskGroupQueue: {} successfully", taskGroupQueue.getId());
23532357
} else {
23542358
log.info("The task queue is already exist, taskId: {}", taskInstanceId);
23552359
if (taskGroupQueue.getStatus() == TaskGroupQueueStatus.ACQUIRE_SUCCESS) {
23562360
return true;
23572361
}
2358-
taskGroupQueue.setInQueue(Flag.NO.getCode());
2359-
taskGroupQueue.setStatus(TaskGroupQueueStatus.WAIT_QUEUE);
2360-
this.taskGroupQueueMapper.updateById(taskGroupQueue);
23612362
}
23622363
// check if there already exist higher priority tasks
23632364
List<TaskGroupQueue> highPriorityTasks = taskGroupQueueMapper.queryHighPriorityTasks(
@@ -2368,14 +2369,15 @@ public boolean acquireTaskGroup(int taskInstanceId,
23682369
return false;
23692370
}
23702371
// try to get taskGroup
2371-
int count = taskGroupMapper.selectAvailableCountById(taskGroupId);
2372-
if (count == 1 && robTaskGroupResource(taskGroupQueue)) {
2373-
log.info("Success acquire taskGroup, taskInstanceId: {}, taskGroupId: {}", taskInstanceId, taskGroupId);
2374-
return true;
2372+
int availableTaskGroupCount = taskGroupMapper.selectAvailableCountById(taskGroupId);
2373+
if (availableTaskGroupCount < 1) {
2374+
log.info(
2375+
"Failed to acquire taskGroup, there is no avaliable taskGroup, taskInstanceId: {}, taskGroupId: {}",
2376+
taskInstanceId, taskGroupId);
2377+
taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId());
2378+
return false;
23752379
}
2376-
log.info("Failed to acquire taskGroup, taskInstanceId: {}, taskGroupId: {}", taskInstanceId, taskGroupId);
2377-
this.taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId());
2378-
return false;
2380+
return robTaskGroupResource(taskGroupQueue);
23792381
}
23802382

23812383
/**
@@ -2387,10 +2389,13 @@ public boolean robTaskGroupResource(TaskGroupQueue taskGroupQueue) {
23872389
for (int i = 0; i < 10; i++) {
23882390
TaskGroup taskGroup = taskGroupMapper.selectById(taskGroupQueue.getGroupId());
23892391
if (taskGroup.getGroupSize() <= taskGroup.getUseSize()) {
2392+
// remove
2393+
taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId());
23902394
log.info("The current task Group is full, taskGroup: {}", taskGroup);
23912395
return false;
23922396
}
2393-
int affectedCount = taskGroupMapper.robTaskGroupResource(taskGroup.getId(),
2397+
int affectedCount = taskGroupMapper.robTaskGroupResource(
2398+
taskGroup.getId(),
23942399
taskGroup.getUseSize(),
23952400
taskGroupQueue.getId(),
23962401
TaskGroupQueueStatus.WAIT_QUEUE.getCode());
@@ -2404,6 +2409,7 @@ public boolean robTaskGroupResource(TaskGroupQueue taskGroupQueue) {
24042409
}
24052410
}
24062411
log.info("Failed to rob taskGroup, taskGroupQueue: {}", taskGroupQueue);
2412+
taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId());
24072413
return false;
24082414
}
24092415

@@ -2431,10 +2437,11 @@ public TaskInstance releaseTaskGroup(TaskInstance taskInstance) {
24312437
do {
24322438
taskGroup = taskGroupMapper.selectById(taskInstance.getTaskGroupId());
24332439
if (taskGroup == null) {
2434-
log.error("The taskGroup is null, taskGroupId: {}", taskInstance.getTaskGroupId());
2440+
log.error("The taskGroup is not exist no need to release taskGroup, taskGroupId: {}",
2441+
taskInstance.getTaskGroupId());
24352442
return null;
24362443
}
2437-
thisTaskGroupQueue = this.taskGroupQueueMapper.queryByTaskId(taskInstance.getId());
2444+
thisTaskGroupQueue = taskGroupQueueMapper.queryByTaskId(taskInstance.getId());
24382445
if (thisTaskGroupQueue.getStatus() == TaskGroupQueueStatus.RELEASE) {
24392446
log.info("The taskGroupQueue's status is release, taskInstanceId: {}", taskInstance.getId());
24402447
return null;
@@ -2458,20 +2465,22 @@ public TaskInstance releaseTaskGroup(TaskInstance taskInstance) {
24582465
changeTaskGroupQueueStatus(taskInstance.getId(), TaskGroupQueueStatus.RELEASE);
24592466
TaskGroupQueue taskGroupQueue;
24602467
do {
2461-
taskGroupQueue = this.taskGroupQueueMapper.queryTheHighestPriorityTasks(taskGroup.getId(),
2468+
taskGroupQueue = taskGroupQueueMapper.queryTheHighestPriorityTasks(
2469+
taskGroup.getId(),
24622470
TaskGroupQueueStatus.WAIT_QUEUE.getCode(),
24632471
Flag.NO.getCode(),
24642472
Flag.NO.getCode());
24652473
if (taskGroupQueue == null) {
2466-
log.info("The taskGroupQueue is null, taskGroup: {}", taskGroup.getId());
2474+
log.info("There is no taskGroupQueue need to be wakeup taskGroup: {}", taskGroup.getId());
24672475
return null;
24682476
}
2469-
} while (this.taskGroupQueueMapper.updateInQueueCAS(Flag.NO.getCode(),
2477+
} while (this.taskGroupQueueMapper.updateInQueueCAS(
2478+
Flag.NO.getCode(),
24702479
Flag.YES.getCode(),
24712480
taskGroupQueue.getId()) != 1);
24722481
log.info("Finished to release task group queue: taskGroupId: {}, taskGroupQueueId: {}",
24732482
taskInstance.getTaskGroupId(), taskGroupQueue.getId());
2474-
return this.taskInstanceMapper.selectById(taskGroupQueue.getTaskId());
2483+
return taskInstanceMapper.selectById(taskGroupQueue.getTaskId());
24752484
}
24762485

24772486
/**
@@ -2505,6 +2514,7 @@ public TaskGroupQueue insertIntoTaskGroupQueue(Integer taskInstanceId,
25052514
.processId(workflowInstanceId)
25062515
.priority(taskGroupPriority)
25072516
.status(status)
2517+
.forceStart(Flag.NO.getCode())
25082518
.inQueue(Flag.NO.getCode())
25092519
.createTime(now)
25102520
.updateTime(now)

0 commit comments

Comments
 (0)