-
Notifications
You must be signed in to change notification settings - Fork 5k
Description
Search before asking
- I had searched in the issues and found no similar issues.
What happened
In the cluster, a large number of workflows are executed using a serial waiting strategy. It was found that some tasks have been completed, but the workflow has not ended. Checking the logs, the following logs were found:
At 17:12:43.517, the master sent the execution result ACK of task 204531 to the worker, but the worker responded with a timeout.
[WARN] 2023-12-07 17:12:43.517 +0800 org.apache.dolphinscheduler.extract.base.future.ResponseFuture:[78] - [WorkflowInstance-184914][TaskInstance-204531] - Wait response in 3000/ms timeout, request id 83365
[ERROR] 2023-12-07 17:12:43.517 +0800 org.apache.dolphinscheduler.server.master.processor.queue.TaskExecuteRunnable:[64] - [WorkflowInstance-184914][TaskInstance-204531] - Handle task event error, this event will be removed, event: TaskEvent(taskInstanceId=204531, workerAddress=dolphinscheduler-worker-0.dolphinscheduler-worker-headless:1234, state=TaskExecutionStatus{code=7, desc='success'}, startTime=Thu Dec 07 17:11:47 CST 2023, endTime=Thu Dec 07 17:12:39 CST 2023, executePath=/tmp/dolphinscheduler/exec/process/default/10579449480160/11310276585441_19/184914/204531, logPath=/opt/dolphinscheduler/logs/20231207/11310276585441/19/184914/204531.log, processId=0, appIds=null, event=RESULT, varPool=[], cacheTaskInstanceId=0, processInstanceId=184914)
org.apache.dolphinscheduler.server.master.event.TaskEventHandleError: Handle task result event error, save taskInstance to db error
at org.apache.dolphinscheduler.server.master.event.TaskResultEventHandler.handleTaskEvent(TaskResultEventHandler.java:105)
at org.apache.dolphinscheduler.server.master.processor.queue.TaskExecuteRunnable.run(TaskExecuteRunnable.java:56)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.reflect.UndeclaredThrowableException: null
at com.sun.proxy.$Proxy246.handleTaskInstanceExecutionFinishEventAck(Unknown Source)
at org.apache.dolphinscheduler.server.master.event.TaskResultEventHandler.sendAckToWorker(TaskResultEventHandler.java:121)
at org.apache.dolphinscheduler.server.master.event.TaskResultEventHandler.handleTaskEvent(TaskResultEventHandler.java:102)
... 6 common frames omitted
Caused by: org.apache.dolphinscheduler.extract.base.exception.RemotingTimeoutException: wait response on the channel Host(ip=dolphinscheduler-worker-0.dolphinscheduler-worker-headless, port=1234) timeout 3000
at org.apache.dolphinscheduler.extract.base.NettyRemotingClient.sendSync(NettyRemotingClient.java:215)
at org.apache.dolphinscheduler.extract.base.client.SyncClientMethodInvoker.invoke(SyncClientMethodInvoker.java:46)
at org.apache.dolphinscheduler.extract.base.client.ClientInvocationHandler.invoke(ClientInvocationHandler.java:55)
... 9 common frames omitted
At 17:13:02.140, the following log was found on the worker, indicating that the worker had received the ACK of task 204531, but there was a delay in processing. The worker system load was relatively high at the time.
[INFO] 2023-12-07 17:13:02.140 +0800 o.a.d.server.worker.runner.listener.TaskInstanceExecutionFinishEventAckListenFunction:[44] - [WorkflowInstance-0][TaskInstance-204531] - Receive TaskInstanceExecutionFinishEventAck: TaskInstanceExecutionFinishEventAck(taskInstanceId=204531, success=true)
Checking the code, it was found that an exception occurred when the master sent sendAckToWorker(taskEvent), causing the subsequent process to not be handled, leading to the workflow not ending. However, the worker actually received the ACK, but there was a delay in processing, so the worker will not retry sending the ACK.
Lines 90 to 106 in ce11674
| try { | |
| taskInstance.setStartTime(taskEvent.getStartTime()); | |
| taskInstance.setHost(taskEvent.getWorkerAddress()); | |
| taskInstance.setLogPath(taskEvent.getLogPath()); | |
| taskInstance.setExecutePath(taskEvent.getExecutePath()); | |
| taskInstance.setPid(taskEvent.getProcessId()); | |
| taskInstance.setAppLink(taskEvent.getAppIds()); | |
| taskInstance.setState(taskEvent.getState()); | |
| taskInstance.setEndTime(taskEvent.getEndTime()); | |
| taskInstance.setVarPool(taskEvent.getVarPool()); | |
| processService.changeOutParam(taskInstance); | |
| taskInstanceDao.updateById(taskInstance); | |
| sendAckToWorker(taskEvent); | |
| } catch (Exception ex) { | |
| TaskInstanceUtils.copyTaskInstance(oldTaskInstance, taskInstance); | |
| throw new TaskEventHandleError("Handle task result event error, save taskInstance to db error", ex); | |
| } |
What you expected to happen
Everything is OK.
How to reproduce
...
Anything else
No response
Version
dev
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct