-
Notifications
You must be signed in to change notification settings - Fork 5k
Fix TaskGroupQueue will never be wakeup due to wakeup failed at one time #15528
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix TaskGroupQueue will never be wakeup due to wakeup failed at one time #15528
Conversation
...in/java/org/apache/dolphinscheduler/server/master/runner/taskgroup/TaskGroupCoordinator.java
Fixed
Show fixed
Hide fixed
38ff493 to
212ba8d
Compare
|
Please retry analysis of this Pull-Request directly on SonarCloud |
2d17d56 to
98a8d8f
Compare
98a8d8f to
3cfd45c
Compare
| ProcessInstance processInstance = processInstanceDao.queryOptionalById(taskGroupQueue.getProcessId()) | ||
| .orElseThrow( | ||
| () -> new ServiceException(Status.PROCESS_INSTANCE_NOT_EXIST, taskGroupQueue.getProcessId())); |
Check notice
Code scanning / CodeQL
Unread local variable
59d46b2 to
82c1bce
Compare
ec952bc to
628f690
Compare
e36844f to
b9e452e
Compare
| if (taskGroupId == null) { | ||
| throw new IllegalArgumentException("taskGroupId cannot be null"); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there any reason we do not use @NonNull directly for parameter taskGroupId?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| -- create idx_t_ds_task_group_queue_in_queue on t_ds_task_group_queue | ||
| DROP PROCEDURE IF EXISTS create_idx_t_ds_task_group_queue_in_queue; | ||
| delimiter d// | ||
| CREATE PROCEDURE create_idx_t_ds_task_group_queue_in_queue() | ||
| BEGIN | ||
| DECLARE index_exists INT DEFAULT 0; | ||
| SELECT COUNT(*) INTO index_exists FROM information_schema.statistics WHERE table_schema = (SELECT DATABASE()) AND table_name = 't_ds_task_group_queue' AND index_name = 'idx_t_ds_task_group_queue_in_queue'; | ||
| IF index_exists = 0 THEN CREATE INDEX idx_t_ds_task_group_queue_in_queue ON t_ds_task_group_queue(in_queue); | ||
| END IF; | ||
| END; | ||
| d// | ||
| delimiter ; | ||
| CALL create_idx_t_ds_task_group_queue_in_queue; | ||
| DROP PROCEDURE create_idx_t_ds_task_group_queue_in_queue; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we could directly remove the related 3.3.0 upgrade. I would merge almost all upgrade sql from 3.3.0 to 3.2.1. and we will not release 3.3.0 until we close or nearly close 3.2.x
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we can remove the upgrade 3.3.0. We can use another PR to do this.
caishunfeng
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
b9e452e to
3681244
Compare
|
Gallardot
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good job! Rest LGTM.
| log.warn("The TaskInstance: {} state: {} finished, will release the TaskGroupQueue: {}", | ||
| taskInstance.getName(), taskInstance.getState(), taskGroupQueue); | ||
| releaseTaskGroupQueueSlot(taskGroupQueue); | ||
| continue; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Useless code
| taskGroupQueue.setInQueue(Flag.YES.getCode()); | ||
| taskGroupQueue.setStatus(TaskGroupQueueStatus.ACQUIRE_SUCCESS); | ||
| taskGroupQueue.setUpdateTime(new Date()); | ||
| taskGroupQueueDao.updateById(taskGroupQueue); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is some repetitive code in the state change that might be suitable for turning into a function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good suggestion, I will use another PR to move these kind of code in the whole project into function.

Purpose of the pull request
Brief change log
In DS, we use TaskGroup to restrict the Task parallelism.



e.g. We have two task TaskA, TaskB, and they belong to the same task group, if we execute these two task at the same time, only one will get the taskGroup slot and the other will wait until be wakeup.
Unfortunately, if the TaskA wakeup TaskB failed, then TaskB will never be wakeup, since we use
putmode, the waiting task will only be wakeup by other task, it will not try to pull the slot.This PR will refactor this.

This PR add a new component
TaskGroupCoordinatorwhich use to deal with acquire/release task group slot and notify task.Once a task need to acquire TaskGroup slot, it should call the

TaskGroupCoordinator::acquireTaskGroupSlot, in this method, will insert a in queue TaskGroupQueue which status is WAITING.Once a task is finished it will call
TaskGroupCoordinator::releaseTaskGroupSlot, in this method, will move the TaskGroupQueue out queue.And TaskGroupCoordinator will loop the slot, if there exist available slot, it will try to wakeup a waiting task.
Verify this pull request
This pull request is code cleanup without any test coverage.
(or)
This pull request is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(or)
If your pull request contain incompatible change, you should also add it to
docs/docs/en/guide/upgrede/incompatible.md