-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Detect orphaned task instances by SchedulerJob id and heartbeat #10729
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Detect orphaned task instances by SchedulerJob id and heartbeat #10729
Conversation
c81a3bf to
b5c067c
Compare
airflow/migrations/versions/b247b1e3d1ed_add_queued_by_job_id_to_ti.py
Outdated
Show resolved
Hide resolved
kaxil
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just some minor non-code suggestion. LGTM otherwise
XD-DENG
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My 2 cents. Let me know if I misunderstood anything please.
|
Looks like this breaks on SQLite --- will need to take a look. |
Should be due to SQLite dialect doesn’t support multiple-table criteria within UPDATE (log line https://github.com/apache/airflow/runs/1073216815#step:6:1491) |
|
I need to extend the unit tests to check that it correctly tests the new functionality (not sure how I missed that) -- namely that TIs from a SchedulerJob that has "stalled"/stopped heartbeating are reset. |
4b05a14 to
61017d0
Compare
|
@ashb lemme know if you'd like me to look into the SQLite issue as well |
Please. I suspect we will have to have an The lastest push I tried using a CTE - that seems to have also broken mysql so switching back to a subquery is the first step |
airflow/migrations/versions/b247b1e3d1ed_add_queued_by_job_id_to_ti.py
Outdated
Show resolved
Hide resolved
airflow/migrations/versions/b247b1e3d1ed_add_queued_by_job_id_to_ti.py
Outdated
Show resolved
Hide resolved
54f808a to
dea0e30
Compare
c6e3b0e to
1862ce5
Compare
7ba1561 to
5b811e8
Compare
Once HA mode for scheduler lands, we can no longer reset orphaned task by looking at the tasks in (the memory of) the current executor. This changes it to keep track of which (Scheduler)Job queued/scheduled a TaskInstance (the new "queued_by_job_id" column stored against TaskInstance table), and then we can use the existing heartbeat mechanism for jobs to notice when a TI should be reset. As part of this the existing implementation of `reset_state_for_orphaned_tasks` has been moved out of BaseJob in to BackfillJob -- as only this and SchedulerJob had these methods, and the SchedulerJob version now operates differently
96b3647 to
83315a9
Compare
Something slightly odd is happening here. In apache#10729 it passed the mypy tests with this in place, but now if I make a change to this file (or to scheduler_job.py, which imports DagRun) I get an error _with_ this.
Something slightly odd is happening here. In #10729 it passed the mypy tests with this in place, but now if I make a change to this file (or to scheduler_job.py, which imports DagRun) I get an error _with_ this.
Once HA mode for scheduler lands, we can no longer reset orphaned
task by looking at the tasks in (the memory of) the current executor.
This changes it to keep track of which (Scheduler)Job queued/scheduled a
TaskInstance (the new "queued_by_job_id" column stored against
TaskInstance table), and then we can use the existing heartbeat
mechanism for jobs to notice when a TI should be reset.
As part of this the existing implementation of
reset_state_for_orphaned_taskshas been moved out of BaseJob in toBackfillJob -- as only this and SchedulerJob had these methods, and the
SchedulerJob version now operates differently