-
Notifications
You must be signed in to change notification settings - Fork 5k
[Fix-17355] Fix reassignWorkflowInstanceHost may failed when no events in channel #17372
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| final Optional<ITaskExecutor> taskExecutorOptional = physicalTaskExecutorRepository.get(taskInstanceId); | ||
| physicalTaskExecutorEventReporter.reassignWorkflowInstanceHost(taskInstanceId, workflowHost); | ||
| if (taskExecutorOptional.isPresent()) { | ||
| final ITaskExecutor taskExecutor = taskExecutorOptional.get(); | ||
| taskExecutor.getTaskExecutionContext().setWorkflowInstanceHost(workflowHost); | ||
| return true; | ||
| } | ||
| return false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| final Optional<ITaskExecutor> taskExecutorOptional = physicalTaskExecutorRepository.get(taskInstanceId); | |
| physicalTaskExecutorEventReporter.reassignWorkflowInstanceHost(taskInstanceId, workflowHost); | |
| if (taskExecutorOptional.isPresent()) { | |
| final ITaskExecutor taskExecutor = taskExecutorOptional.get(); | |
| taskExecutor.getTaskExecutionContext().setWorkflowInstanceHost(workflowHost); | |
| return true; | |
| } | |
| return false; | |
| Optional<ITaskExecutor> iTaskExecutorOptional = physicalTaskExecutorRepository.get(taskInstanceId); | |
| if (iTaskExecutorOptional.isPresent()) { | |
| iTaskExecutorOptional.get().getTaskExecutionContext().setWorkflowInstanceHost(workflowHost); | |
| return true; | |
| } | |
| return false; |
It's better to only change the workflow instance host in TaskExecutionContext, and inject ITaskExecutorRepository into TaskExecutorLifecycleEventRemoteReporter.
We can remove workflowInstanceHost from IReportableTaskExecutorLifecycleEvent, as this event does not inherently require knowledge of the workflow instance host. Instead, when reporting the event, the reporter should retrieve the current host from a centralized repository. This approach simplifies workflow host management and helps prevent potential concurrency issues. Once the workflow host changed, the reporter can get the correct host when report a event.
Or we can set the workflow instance host into ReportableTaskExecutorLifecycleEventChannel, then we need to register the channel into reporter when the TaskExecutor registered, and change the host in channel when reassignWorkflowInstanceHost
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
get host address from TaskExecutor may not work, TaskExecutor's lifecycle and event's lifecycle are not completely overlapping, TaskExecutor will removed in onTaskExecutorFinalizeLifecycleEvent before the all events be reported.
but the channel‘s lifecycle is similar to event, it still needs to lock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TaskExecutorFinalizeLifecycleEvent is the end event, and it doesn't rely on RPC, it's only a local event(It's used to do some resource clean work), so it's ok to assembly the host to channel?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, i mean, i did test for get host address from task-executor, when reporter handle the failed or other final events, the task-executor is already removed from repository.
assembly the host to channel is not ok, channel will removed when current events are all reported, at this moment reassign host will not work, this is a lifecycle overlapping issue, Channel or task-executor must have one to cover the task's all lifecycle to hold the host.
i'm thinking to keep the channel's lifecycle when it's empty, and inject ITaskExecutorRepository into TaskExecutorLifecycleEventRemoteReporter use to check the task-executor exists, if channel is empty and task-executor was removed, then remove the channel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it because reporter and coordinator are in different thread, they are async to handle the event, so the task executor might be removed prematurely.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A TaskExecutor will only be removed after the TaskExecutorFinalizeLifecycleEvent is fired, and this event should be fired in synchronous mode.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A TaskExecutor will only be removed after the TaskExecutorFinalizeLifecycleEvent is fired, and this event should be fired in synchronous mode.
so, how about moving the publishing of TaskExecutorFinalizeLifecycleEvent to TaskExecutorLifecycleEventRemoteReporter? When a task reaches a terminal state (Success/Killed/Paused/Failed), we could publish a new TaskExecutorFinishedLifecycleEvent to mark the TaskExecutor as invalid or inactive. Then, the final event TaskExecutorFinalizeLifecycleEvent would instead be published by TaskExecutorLifecycleEventRemoteReporter.
I've submitted the new changes implemented according to this flow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea. After the TaskExecutor received the ACK of a terminal event, then means the task executor's lifecycle is finished, then publish a Finalize event to clean the resource.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
submitted.
…cutor to contain the channel's lifecycle.
…x-reassign-host-issue
...n/java/org/apache/dolphinscheduler/extract/master/TaskExecutorEventRemoteReporterClient.java
Fixed
Show fixed
Hide fixed
TaskExecutorFinalizeLifecycleEvent will be published by TaskExecutorLifecycleEventRemoteReporter.
…x-reassign-host-issue
TaskExecutorFinalizeLifecycleEvent will be published by TaskExecutorLifecycleEventRemoteReporter.
...ava/org/apache/dolphinscheduler/task/executor/events/TaskExecutorFinishedLifecycleEvent.java
Fixed
Show fixed
Hide fixed
| @EqualsAndHashCode(callSuper = true) | ||
| @SuperBuilder | ||
| @NoArgsConstructor | ||
| public class TaskExecutorFinishedLifecycleEvent extends AbstractTaskExecutorLifecycleEvent { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please don't import a new Event type here. We don't really need this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
resubmitted.
| /** | ||
| * Extend the lifecycle of the TaskExecutor to span the entire processing cycle of the task. | ||
| * before TaskExecutor changes to inactive, all events will put into channel, | ||
| * so we can just remove the TaskExecutor after the associated channel has been removed. | ||
| */ | ||
| private void finalizeFinishedTaskExecutors() { | ||
| eventChannelsLock.lock(); | ||
| // check if the taskExecutor is inactive and no channel exist, then we can publish finalize event. | ||
| for (final ITaskExecutor taskExecutor : taskExecutorRepository.getAllWaitingReport()) { | ||
| final Integer taskExecutorId = taskExecutor.getId(); | ||
| if (!eventChannels.containsKey(taskExecutorId)) { | ||
| taskExecutorRepository.finishReport(taskExecutorId); | ||
| taskExecutor.getTaskExecutorEventBus().publish(TaskExecutorFinalizeLifecycleEvent.of(taskExecutor)); | ||
| } | ||
| } | ||
| eventChannelsLock.unlock(); | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is too complex. It's better to publish TaskExecutorFinalizeLifecycleEvent in receiveTaskExecutorLifecycleEventACK after receive a finished ack.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wow... it's pretty much better!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
resubmitted.
|
|
||
| @Override | ||
| public boolean reassignWorkflowInstanceHost(int taskInstanceId, String workflowHost) { | ||
| public void wake() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to revert this method, and change the implementation. Once the workflow host changed, we may need to reset the event's lastReportTime or set a new flag to mark the event need to retry now, and notify the reporter by taskExecutionEventEmptyCondition.signalAll().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
resubmitted.
…cutor to contain the channel's lifecycle.
TaskExecutorFinalizeLifecycleEvent will be published by TaskExecutorLifecycleEventRemoteReporter.
TaskExecutorFinalizeLifecycleEvent will be published by TaskExecutorLifecycleEventRemoteReporter.
cef0070 to
f63256d
Compare
…x-reassign-host-issue
…ifecycleEventRemoteReporter.
…ifecycleEventRemoteReporter.
…ifecycleEventRemoteReporter.
ruanwenjun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
|
||
| @Override | ||
| public boolean reassignWorkflowInstanceHost(int taskInstanceId, String workflowHost) { | ||
| public void resetAndReadyChannelEvents(int taskInstanceId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| public void resetAndReadyChannelEvents(int taskInstanceId) { | |
| public void onWorkflowInstanceHostChanged(int taskInstanceId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
apply committed
SbloodyS
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
|


…nceId existence.
close #17355
Brief change log
Modify reassignWorkflowInstanceHost's return value based on taskInstanceId existence instead of based on channel existence.
Verify this pull request
This pull request is code cleanup without any test coverage.
(or)
This pull request is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(or)
Pull Request Notice
Pull Request Notice
If your pull request contains incompatible change, you should also add it to
docs/docs/en/guide/upgrade/incompatible.md