-
Notifications
You must be signed in to change notification settings - Fork 5k
[Improvement-16979] Unify PriorityDelayQueue with AbstractDelayEventBus #17155
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Improvement-16979] Unify PriorityDelayQueue with AbstractDelayEventBus #17155
Conversation
.../org/apache/dolphinscheduler/server/master/runner/events/AbstractTaskDispatchEntryEvent.java
Fixed
Show fixed
Hide fixed
...ava/org/apache/dolphinscheduler/server/master/runner/events/TaskDispatchDelayEntryEvent.java
Fixed
Show fixed
Hide fixed
.../org/apache/dolphinscheduler/server/master/runner/events/TaskDispatchPriorityEntryEvent.java
Fixed
Show fixed
Hide fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we remove GlobalTaskDispatchWaitingQueue and GlobalTaskDispatchWaitingQueueLooper, this queue is used to handle the delay time, but bring some extra problems, e.g. there are existing many thread to deal with the task lifecycle in master, we need to do extra concurrency control, e.g. one task is in dispatching, and then receive a kill event.
Since TaskDispatchLifecycleEvent is already an DelayEvent so we can directly set the delay time to it, then we can removed TimeBasedTaskExecutionRunnableComparableEntry and WorkerGroupTaskDispatcher. We directly use ITaskExecutorClient to dispatch task in dispatchEventAction, if dispatch failed then regenerate a TaskDispatchLifecycleEvent with retry delay time.
| public int compareTo(@NotNull AbstractTaskDispatchEntryEvent<V> other) { | ||
| return super.compareTo(other); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may need to compare the data if super.compareTo(other) == 0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, i'll handle it
|
Good job, after this PR merged, the master code will be clearer. |
i'll have a try. |
|
@reele Sorry, I miss something, we need to sort the tasks by priority under one worker group, so |
OK. |
I'll figure out how to fix this prioritization problem, which is in fact very inaccurate, and it's not sorting globally. |
|
I have removed |
Oh! sorry for not solving this pr in time...... |
66d005d to
22ceff8
Compare
…jects' into improvement-16979-unify-queue-objects # Conflicts: # dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcher.java # dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java
|
@ruanwenjun recommitted. |
...ache/dolphinscheduler/server/master/engine/task/dispatcher/event/TaskDispatchEntryEvent.java
Fixed
Show fixed
Hide fixed
...ache/dolphinscheduler/server/master/engine/task/dispatcher/event/TaskDispatchEntryEvent.java
Outdated
Show resolved
Hide resolved
.../apache/dolphinscheduler/server/master/engine/task/dispatcher/TaskDispatchEntryEventBus.java
Outdated
Show resolved
Hide resolved
…ler/server/master/engine/task/dispatcher/event/TaskDispatchEntryEvent.java Co-authored-by: Wenjun Ruan <[email protected]>
…ler/server/master/engine/task/dispatcher/TaskDispatchEntryEventBus.java Co-authored-by: Wenjun Ruan <[email protected]>
ruanwenjun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Some class names could be improved.
| import static com.google.common.base.Preconditions.checkNotNull; | ||
|
|
||
| @Getter | ||
| public class TaskReadyForDispatchEvent<V extends Comparable<V>> extends AbstractDelayEvent { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| public class TaskReadyForDispatchEvent<V extends Comparable<V>> extends AbstractDelayEvent { | |
| public class TaskDispatchableEvent<ITaskExecutionRunnable> extends AbstractDelayEvent { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
| public class WorkerGroupEventBus<V extends TaskReadyForDispatchEvent<T>, T extends Comparable<T>> | ||
| extends | ||
| AbstractDelayEventBus<V> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| public class WorkerGroupEventBus<V extends TaskReadyForDispatchEvent<T>, T extends Comparable<T>> | |
| extends | |
| AbstractDelayEventBus<V> { | |
| public class TaskDispatchableEventBus | |
| extends | |
| AbstractDelayEventBus< TaskDispatchableEvent > { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
...e/dolphinscheduler/server/master/engine/task/dispatcher/event/TaskReadyForDispatchEvent.java
Fixed
Show fixed
Hide fixed
ruanwenjun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
| import lombok.Getter; | ||
|
|
||
| @Getter | ||
| public class TaskDispatchableEvent<V extends Comparable<V>> extends AbstractDelayEvent { |
Check warning
Code scanning / CodeQL
Inconsistent compareTo Warning
compareTo
SbloodyS
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
|



Purpose of the pull request
implement #16979
Brief change log
Refactored
DelayEntry.java,PriorityDelayQueue,PriorityAndDelayBasedTaskEntry,TimeBasedTaskExecutionRunnableComparableEntrywithTaskDispatchEntryEvent,TaskDispatchEntryEventBuswhich inherited fromAbstractDelayEventBus,AbstractDelayEventFix
PriorityAndDelayBasedTaskEntry's compare issue, old object compare task priority after compare time in millisecond, cause the task priority only affect in same millisecond, now it will compare task priority first, then the create time.Verify this pull request
the test codes were refactored.
Pull Request Notice
Pull Request Notice
If your pull request contains incompatible change, you should also add it to
docs/docs/en/guide/upgrade/incompatible.md