Skip to content

Conversation

@reele
Copy link
Contributor

@reele reele commented Jul 25, 2025

…nceId existence.

close #17355

Brief change log

Modify reassignWorkflowInstanceHost's return value based on taskInstanceId existence instead of based on channel existence.

Verify this pull request

This pull request is code cleanup without any test coverage.

(or)

This pull request is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(or)

Pull Request Notice

Pull Request Notice

If your pull request contains incompatible change, you should also add it to docs/docs/en/guide/upgrade/incompatible.md

Comment on lines 81 to 85
final Optional<ITaskExecutor> taskExecutorOptional = physicalTaskExecutorRepository.get(taskInstanceId);
physicalTaskExecutorEventReporter.reassignWorkflowInstanceHost(taskInstanceId, workflowHost);
if (taskExecutorOptional.isPresent()) {
final ITaskExecutor taskExecutor = taskExecutorOptional.get();
taskExecutor.getTaskExecutionContext().setWorkflowInstanceHost(workflowHost);
return true;
}
return false;
Copy link
Member

@ruanwenjun ruanwenjun Jul 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
final Optional<ITaskExecutor> taskExecutorOptional = physicalTaskExecutorRepository.get(taskInstanceId);
physicalTaskExecutorEventReporter.reassignWorkflowInstanceHost(taskInstanceId, workflowHost);
if (taskExecutorOptional.isPresent()) {
final ITaskExecutor taskExecutor = taskExecutorOptional.get();
taskExecutor.getTaskExecutionContext().setWorkflowInstanceHost(workflowHost);
return true;
}
return false;
Optional<ITaskExecutor> iTaskExecutorOptional = physicalTaskExecutorRepository.get(taskInstanceId);
if (iTaskExecutorOptional.isPresent()) {
iTaskExecutorOptional.get().getTaskExecutionContext().setWorkflowInstanceHost(workflowHost);
return true;
}
return false;

It's better to only change the workflow instance host in TaskExecutionContext, and inject ITaskExecutorRepository into TaskExecutorLifecycleEventRemoteReporter.

We can remove workflowInstanceHost from IReportableTaskExecutorLifecycleEvent, as this event does not inherently require knowledge of the workflow instance host. Instead, when reporting the event, the reporter should retrieve the current host from a centralized repository. This approach simplifies workflow host management and helps prevent potential concurrency issues. Once the workflow host changed, the reporter can get the correct host when report a event.

Or we can set the workflow instance host into ReportableTaskExecutorLifecycleEventChannel, then we need to register the channel into reporter when the TaskExecutor registered, and change the host in channel when reassignWorkflowInstanceHost

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get host address from TaskExecutor may not work, TaskExecutor's lifecycle and event's lifecycle are not completely overlapping, TaskExecutor will removed in onTaskExecutorFinalizeLifecycleEvent before the all events be reported.

but the channel‘s lifecycle is similar to event, it still needs to lock.

Copy link
Member

@ruanwenjun ruanwenjun Jul 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TaskExecutorFinalizeLifecycleEvent is the end event, and it doesn't rely on RPC, it's only a local event(It's used to do some resource clean work), so it's ok to assembly the host to channel?

Copy link
Contributor Author

@reele reele Jul 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, i mean, i did test for get host address from task-executor, when reporter handle the failed or other final events, the task-executor is already removed from repository.

assembly the host to channel is not ok, channel will removed when current events are all reported, at this moment reassign host will not work, this is a lifecycle overlapping issue, Channel or task-executor must have one to cover the task's all lifecycle to hold the host.

i'm thinking to keep the channel's lifecycle when it's empty, and inject ITaskExecutorRepository into TaskExecutorLifecycleEventRemoteReporter use to check the task-executor exists, if channel is empty and task-executor was removed, then remove the channel.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it because reporter and coordinator are in different thread, they are async to handle the event, so the task executor might be removed prematurely.

Copy link
Member

@ruanwenjun ruanwenjun Jul 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A TaskExecutor will only be removed after the TaskExecutorFinalizeLifecycleEvent is fired, and this event should be fired in synchronous mode.

Copy link
Contributor Author

@reele reele Jul 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A TaskExecutor will only be removed after the TaskExecutorFinalizeLifecycleEvent is fired, and this event should be fired in synchronous mode.

so, how about moving the publishing of TaskExecutorFinalizeLifecycleEvent to TaskExecutorLifecycleEventRemoteReporter? When a task reaches a terminal state (Success/Killed/Paused/Failed), we could publish a new TaskExecutorFinishedLifecycleEvent to mark the TaskExecutor as invalid or inactive. Then, the final event TaskExecutorFinalizeLifecycleEvent would instead be published by TaskExecutorLifecycleEventRemoteReporter.

I've submitted the new changes implemented according to this flow.

Copy link
Member

@ruanwenjun ruanwenjun Jul 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. After the TaskExecutor received the ACK of a terminal event, then means the task executor's lifecycle is finished, then publish a Finalize event to clean the resource.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

submitted.

lile added 3 commits July 30, 2025 14:42
TaskExecutorFinalizeLifecycleEvent will be published by TaskExecutorLifecycleEventRemoteReporter.
TaskExecutorFinalizeLifecycleEvent will be published by TaskExecutorLifecycleEventRemoteReporter.
@SbloodyS SbloodyS added this to the 3.3.1 milestone Jul 30, 2025
@SbloodyS SbloodyS added the bug Something isn't working label Jul 30, 2025
@EqualsAndHashCode(callSuper = true)
@SuperBuilder
@NoArgsConstructor
public class TaskExecutorFinishedLifecycleEvent extends AbstractTaskExecutorLifecycleEvent {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't import a new Event type here. We don't really need this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resubmitted.

Comment on lines 172 to 189
/**
* Extend the lifecycle of the TaskExecutor to span the entire processing cycle of the task.
* before TaskExecutor changes to inactive, all events will put into channel,
* so we can just remove the TaskExecutor after the associated channel has been removed.
*/
private void finalizeFinishedTaskExecutors() {
eventChannelsLock.lock();
// check if the taskExecutor is inactive and no channel exist, then we can publish finalize event.
for (final ITaskExecutor taskExecutor : taskExecutorRepository.getAllWaitingReport()) {
final Integer taskExecutorId = taskExecutor.getId();
if (!eventChannels.containsKey(taskExecutorId)) {
taskExecutorRepository.finishReport(taskExecutorId);
taskExecutor.getTaskExecutorEventBus().publish(TaskExecutorFinalizeLifecycleEvent.of(taskExecutor));
}
}
eventChannelsLock.unlock();
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is too complex. It's better to publish TaskExecutorFinalizeLifecycleEvent in receiveTaskExecutorLifecycleEventACK after receive a finished ack.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wow... it's pretty much better!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resubmitted.


@Override
public boolean reassignWorkflowInstanceHost(int taskInstanceId, String workflowHost) {
public void wake() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to revert this method, and change the implementation. Once the workflow host changed, we may need to reset the event's lastReportTime or set a new flag to mark the event need to retry now, and notify the reporter by taskExecutionEventEmptyCondition.signalAll().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resubmitted.

lile added 5 commits July 30, 2025 23:59
TaskExecutorFinalizeLifecycleEvent will be published by TaskExecutorLifecycleEventRemoteReporter.
TaskExecutorFinalizeLifecycleEvent will be published by TaskExecutorLifecycleEventRemoteReporter.
@ruanwenjun ruanwenjun force-pushed the fix-reassign-host-issue branch from cef0070 to f63256d Compare July 30, 2025 15:59
ruanwenjun
ruanwenjun previously approved these changes Jul 31, 2025
Copy link
Member

@ruanwenjun ruanwenjun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM


@Override
public boolean reassignWorkflowInstanceHost(int taskInstanceId, String workflowHost) {
public void resetAndReadyChannelEvents(int taskInstanceId) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public void resetAndReadyChannelEvents(int taskInstanceId) {
public void onWorkflowInstanceHostChanged(int taskInstanceId) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

apply committed

ruanwenjun
ruanwenjun previously approved these changes Jul 31, 2025
Copy link
Member

@SbloodyS SbloodyS left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@sonarqubecloud
Copy link

sonarqubecloud bot commented Aug 5, 2025

Quality Gate Failed Quality Gate failed

Failed conditions
1.7% Coverage on New Code (required ≥ 60%)

See analysis details on SonarQube Cloud

@SbloodyS SbloodyS merged commit e2a9e76 into apache:dev Aug 5, 2025
69 of 70 checks passed
eco8848 pushed a commit to eco8848/dolphinscheduler that referenced this pull request Aug 8, 2025
davidzollo pushed a commit to davidzollo/dolphinscheduler that referenced this pull request Oct 27, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

backend bug Something isn't working priority:high

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug] [TaskExecutorLifecycleEventRemoteReporter] worker-server's reassignWorkflowInstanceHost may can not find channel

3 participants