-
Notifications
You must be signed in to change notification settings - Fork 5k
Closed
Labels
Description
Search before asking
- I had searched in the issues and found no similar issues.
What happened
the eventChannel in TaskExecutorLifecycleEventRemoteReporter will be removed when it is empty:
public void receiveTaskExecutorLifecycleEventACK(final TaskExecutorLifecycleEventAck eventAck) {
final int taskExecutorId = eventAck.getTaskExecutorId();
eventChannelsLock.lock();
try {
final ReportableTaskExecutorLifecycleEventChannel eventChannel = eventChannels.get(taskExecutorId);
if (eventChannel == null) {
return;
}
final IReportableTaskExecutorLifecycleEvent removed =
eventChannel.remove(eventAck.getTaskExecutorLifecycleEventType());
if (removed != null) {
log.info("Success removed {} by ack: {}", removed, eventAck);
} else {
log.info("Failed removed ReportableTaskExecutorLifecycleEvent by ack: {}", eventAck);
}
%%%%%%%%%% here the channel was removed %%%%%%%%%%
if (eventChannel.isEmpty()) {
eventChannels.remove(taskExecutorId);
log.debug("Removed ReportableTaskExecutorLifecycleEventChannel: {}", taskExecutorId);
}
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
taskExecutionEventEmptyCondition.signalAll();
} finally {
eventChannelsLock.unlock();
}
}so if there are no events waiting for report, reassignWorkflowInstanceHost will return false.
public boolean reassignWorkflowInstanceHost(int taskInstanceId, String workflowHost) {
eventChannelsLock.lock();
try {
%%%%%%%%%% here cannot get channel if there is no event %%%%%%%%%%
final ReportableTaskExecutorLifecycleEventChannel eventChannel = eventChannels.get(taskInstanceId);
if (eventChannel == null) {
return false;
}
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
eventChannel.taskExecutionEventsQueue.forEach(event -> event.setWorkflowInstanceHost(workflowHost));
return true;
} finally {
eventChannelsLock.unlock();
}
}so if master restart during the task's execution, the master failover will failed to take-over the old task-instance, and then create a new task-instance and TaskExecutionRunnable, so now if the old task finished, the event will stuck in eventChannel, because the old TaskExecutionRunnable is not exists:
[WI-0][TI-28459975] - 2025-07-22 16:39:21.465 ERROR [PhysicalTaskExecutorLifecycleEventReporter] o.a.d.e.m.TaskExecutorEventRemoteReporterClient:[68] - Report ITaskExecutorLifecycleEvent: TaskExecutorSuccessLifecycleEvent(super=AbstractTaskExecutorLifecycleEvent(super=AbstractDelayEvent(delayTime=0, createTimeInNano=544710287716565, expiredTimeInNano=544710287716816), taskInstanceId=28459975, eventCreateTime=1753173561446, type=SUCCESS), workflowInstanceId=4842042, workflowInstanceHost=10.0.6.23:5678, taskInstanceHost=10.0.6.23:1234, endTime=1753173561446, varPool=[], latestReportTime=1753173561455) to master failed
org.apache.dolphinscheduler.extract.base.exception.MethodInvocationException: Cannot find the TaskExecuteRunnable: 28459975
at org.apache.dolphinscheduler.extract.base.exception.MethodInvocationException.of(MethodInvocationException.java:27)
at org.apache.dolphinscheduler.extract.base.client.SyncClientMethodInvoker.invoke(SyncClientMethodInvoker.java:53)
at org.apache.dolphinscheduler.extract.base.client.ClientInvocationHandler.invoke(ClientInvocationHandler.java:56)
at com.sun.proxy.$Proxy152.onTaskExecutorSuccess(Unknown Source)
at org.apache.dolphinscheduler.extract.master.TaskExecutorEventRemoteReporterClient.reportTaskSuccessEventToMaster(TaskExecutorEventRemoteReporterClient.java:118)
at org.apache.dolphinscheduler.extract.master.TaskExecutorEventRemoteReporterClient.reportTaskExecutionEventToMaster(TaskExecutorEventRemoteReporterClient.java:61)
at org.apache.dolphinscheduler.task.executor.eventbus.TaskExecutorLifecycleEventRemoteReporter.handleTaskExecutionEventChannel(TaskExecutorLifecycleEventRemoteReporter.java:180)
at org.apache.dolphinscheduler.task.executor.eventbus.TaskExecutorLifecycleEventRemoteReporter.run(TaskExecutorLifecycleEventRemoteReporter.java:84)
What you expected to happen
How to reproduce
Anything else
No response
Version
dev
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct