Skip to content

Conversation

@reele
Copy link
Contributor

@reele reele commented Apr 30, 2025

Purpose of the pull request

implement #16979

Brief change log

  1. Refactored DelayEntry.java,PriorityDelayQueue,PriorityAndDelayBasedTaskEntry,TimeBasedTaskExecutionRunnableComparableEntry with TaskDispatchEntryEvent,TaskDispatchEntryEventBus which inherited from AbstractDelayEventBus,AbstractDelayEvent

  2. Fix PriorityAndDelayBasedTaskEntry's compare issue, old object compare task priority after compare time in millisecond, cause the task priority only affect in same millisecond, now it will compare task priority first, then the create time.

Verify this pull request

the test codes were refactored.

Pull Request Notice

Pull Request Notice

If your pull request contains incompatible change, you should also add it to docs/docs/en/guide/upgrade/incompatible.md

Copy link
Member

@ruanwenjun ruanwenjun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove GlobalTaskDispatchWaitingQueue and GlobalTaskDispatchWaitingQueueLooper, this queue is used to handle the delay time, but bring some extra problems, e.g. there are existing many thread to deal with the task lifecycle in master, we need to do extra concurrency control, e.g. one task is in dispatching, and then receive a kill event.

Since TaskDispatchLifecycleEvent is already an DelayEvent so we can directly set the delay time to it, then we can removed TimeBasedTaskExecutionRunnableComparableEntry and WorkerGroupTaskDispatcher. We directly use ITaskExecutorClient to dispatch task in dispatchEventAction, if dispatch failed then regenerate a TaskDispatchLifecycleEvent with retry delay time.

Comment on lines 38 to 40
public int compareTo(@NotNull AbstractTaskDispatchEntryEvent<V> other) {
return super.compareTo(other);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may need to compare the data if super.compareTo(other) == 0.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, i'll handle it

@ruanwenjun ruanwenjun added this to the 3.3.1 milestone Apr 30, 2025
@ruanwenjun ruanwenjun added the improvement make more easy to user or prompt friendly label Apr 30, 2025
@ruanwenjun
Copy link
Member

ruanwenjun commented Apr 30, 2025

Good job, after this PR merged, the master code will be clearer.
Base on this, we can provide more EventBus to expose the workflow instance/task instance lifecycle event to exposed to outside.

@reele
Copy link
Contributor Author

reele commented Apr 30, 2025

Can we remove GlobalTaskDispatchWaitingQueue and GlobalTaskDispatchWaitingQueueLooper, this queue is used to handle the delay time, but bring some extra problems, e.g. there are existing many thread to deal with the task lifecycle in master, we need to do extra concurrency control, e.g. one task is in dispatching, and then receive a kill event.

Since TaskDispatchLifecycleEvent is already an DelayEvent so we can directly set the delay time to it, then we can removed TimeBasedTaskExecutionRunnableComparableEntry and WorkerGroupTaskDispatcher. We directly use ITaskExecutorClient to dispatch task in dispatchEventAction, if dispatch failed then regenerate a TaskDispatchLifecycleEvent with retry delay time.

i'll have a try.

@ruanwenjun
Copy link
Member

@reele Sorry, I miss something, we need to sort the tasks by priority under one worker group, so WorkerGroupTaskDispatcher cannot be removed, but GlobalTaskDispatchWaitingQueue and GlobalTaskDispatchWaitingQueueLooper can be removed.

@reele
Copy link
Contributor Author

reele commented May 6, 2025

@reele Sorry, I miss something, we need to sort the tasks by priority under one worker group, so WorkerGroupTaskDispatcher cannot be removed, but GlobalTaskDispatchWaitingQueue and GlobalTaskDispatchWaitingQueueLooper can be removed.

OK.

@ruanwenjun
Copy link
Member

@reele Sorry, I miss something, we need to sort the tasks by priority under one worker group, so WorkerGroupTaskDispatcher cannot be removed, but GlobalTaskDispatchWaitingQueue and GlobalTaskDispatchWaitingQueueLooper can be removed.

OK.

I'll figure out how to fix this prioritization problem, which is in fact very inaccurate, and it's not sorting globally.

@ruanwenjun
Copy link
Member

ruanwenjun commented May 13, 2025

I have removed GlobalTaskDispatchWaitingQueue at #17180

@reele
Copy link
Contributor Author

reele commented May 17, 2025

I have removed GlobalTaskDispatchWaitingQueue at #17180

Oh! sorry for not solving this pr in time......

@reele reele closed this May 17, 2025
@ruanwenjun ruanwenjun reopened this May 23, 2025
@ruanwenjun
Copy link
Member

I have removed GlobalTaskDispatchWaitingQueue at #17180

Oh! sorry for not solving this pr in time......

Hi @reele, I reopen this PR, this PR still needed.

@reele reele closed this Jun 2, 2025
@reele reele force-pushed the improvement-16979-unify-queue-objects branch from 66d005d to 22ceff8 Compare June 2, 2025 08:56
lile added 2 commits June 3, 2025 10:05
…jects' into improvement-16979-unify-queue-objects

# Conflicts:
#	dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcher.java
#	dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java
@reele reele reopened this Jun 3, 2025
@reele
Copy link
Contributor Author

reele commented Jun 4, 2025

@ruanwenjun recommitted.

Copy link
Member

@ruanwenjun ruanwenjun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Some class names could be improved.

import static com.google.common.base.Preconditions.checkNotNull;

@Getter
public class TaskReadyForDispatchEvent<V extends Comparable<V>> extends AbstractDelayEvent {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public class TaskReadyForDispatchEvent<V extends Comparable<V>> extends AbstractDelayEvent {
public class TaskDispatchableEvent<ITaskExecutionRunnable> extends AbstractDelayEvent {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

Comment on lines 25 to 27
public class WorkerGroupEventBus<V extends TaskReadyForDispatchEvent<T>, T extends Comparable<T>>
extends
AbstractDelayEventBus<V> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public class WorkerGroupEventBus<V extends TaskReadyForDispatchEvent<T>, T extends Comparable<T>>
extends
AbstractDelayEventBus<V> {
public class TaskDispatchableEventBus
extends
AbstractDelayEventBus< TaskDispatchableEvent > {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

Copy link
Member

@ruanwenjun ruanwenjun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

import lombok.Getter;

@Getter
public class TaskDispatchableEvent<V extends Comparable<V>> extends AbstractDelayEvent {

Check warning

Code scanning / CodeQL

Inconsistent compareTo Warning

This class declares
compareTo
but inherits equals; the two could be inconsistent.
Copy link
Member

@SbloodyS SbloodyS left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@sonarqubecloud
Copy link

sonarqubecloud bot commented Aug 7, 2025

@SbloodyS SbloodyS merged commit 2f57e05 into apache:dev Aug 7, 2025
71 checks passed
eco8848 pushed a commit to eco8848/dolphinscheduler that referenced this pull request Aug 8, 2025
davidzollo pushed a commit to davidzollo/dolphinscheduler that referenced this pull request Oct 27, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants