-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Allow CeleryExecutor to "adopt" an orphaned queued or running task #10949
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
b58fac4 to
c1a3f9f
Compare
4f3cf22 to
627207a
Compare
627207a to
2871b48
Compare
XD-DENG
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A minor suggestion
airflow/migrations/versions/e1a11ece99cc_add_external_executor_id_to_ti.py
Outdated
Show resolved
Hide resolved
2871b48 to
12debd5
Compare
|
The doc tests are now failing with:
But I didn't change the imports in that file :/ Oh I missed Kaxil's earlier fix in my rebase+force-push |
bbb6f5f to
03a9290
Compare
03a9290 to
7d13295
Compare
64c2215 to
7c05b37
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me 👍
XD-DENG
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One more minor comment.
3f4b2ea to
f1967f6
Compare
|
Right, final rebase done, will merge once tests are green. (I wish Github had a button for that) |
This can happen when a task is enqueued by one executor, and then that scheduler dies/exits. The default fallback behaviour is unchanged -- that queued tasks are cleared and then and then later rescheduled. But for Celery we can do better -- if we record the Celery-generated task_id, we can then re-create the AsyncResult objects for orphaned tasks at a later date. However since Celery just reports all AsyncResult as "PENDING", even if they aren't tasks currently in the broker queue, we need to apply a timeout to "unblock" these tasks in case they never actually made it to the Celery broker. This all means that we can adopt tasks that have been enqueued another CeleryExecutor if it dies, without having to clear the task and slow down. This is especially useful as the task may have already started running, and while clearing it would stop it, it's better if we don't have to reset it! Co-authored-by: Kaxil Naik <[email protected]>
f1967f6 to
092b9c0
Compare
This can happen when a task is enqueued by one executor, and then that
scheduler dies/exits.
The default fallback behaviour is unchanged -- that queued tasks are
cleared and then and then later rescheduled.
But for Celery we can do better -- if we record the Celery-generated
task_id, we can then re-create the AsyncResult objects for orphaned
tasks at a later date.
However since Celery just reports all AsyncResult as "PENDING", even if
they aren't tasks currently in the broker queue, we need to apply a
timeout to "unblock" these tasks in case they never actually made it to
the Celery broker.
This all means that we can adopt tasks that have been enqueued another
CeleryExecutor if it dies, without having to clear the task and slow
down. This is especially useful as the task may have already started
running, and while clearing it would stop it, it's better if we don't
have to reset it!
Part of #9630
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.