-
Notifications
You must be signed in to change notification settings - Fork 17k
[WIP][AIRFLOW-2701] Clean up backfill dangling dagrun #3562
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2008,6 +2008,7 @@ def __init__( | |
| self.verbose = verbose | ||
| self.conf = conf | ||
| self.rerun_failed_tasks = rerun_failed_tasks | ||
| self.dag_runs = [] | ||
| super(BackfillJob, self).__init__(*args, **kwargs) | ||
|
|
||
| def _update_counters(self, ti_status): | ||
|
|
@@ -2469,6 +2470,7 @@ def _execute_for_run_dates(self, run_dates, ti_status, executor, pickle_id, | |
| session=session) | ||
| if dag_run is None: | ||
| continue | ||
| self.dag_runs.append(dag_run) | ||
|
|
||
| ti_status.active_runs.append(dag_run) | ||
| ti_status.to_run.update(tis_map or {}) | ||
|
|
@@ -2540,9 +2542,32 @@ def _execute(self, session=None): | |
| self.dag_id | ||
| ) | ||
| time.sleep(self.delay_on_limit_secs) | ||
|
|
||
| except (KeyboardInterrupt, SystemExit): | ||
| for dag_run in self.dag_runs: | ||
| dag_run.refresh_from_db(session) | ||
| make_transient(dag_run) | ||
|
|
||
| # Check all the tasks which are not in success state. | ||
| check_state = State.unfinished() + State.finished() | ||
| check_state.remove(State.SUCCESS) | ||
|
|
||
| unfinished_tasks = dag_run.get_task_instances( | ||
| state=check_state, | ||
| session=session | ||
| ) | ||
| if unfinished_tasks: | ||
| # if there are unfinished tasks and ctrl^c | ||
| # set dag run state to failed | ||
| for task in unfinished_tasks: | ||
| task.set_state(State.FAILED, session) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this actually kill any running tasks?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. my understanding is that we mostly run backfill with local scheduler which spin off couples of worker multiprocess to run the task. Once we set the task state to failed, the task will get rerun next time(using clear first, or with --rerun_failed_task). And if we ctrol^c the main backfill process, it will kill the worker multiprocesses as well which won't leave any orphan task still keep running. |
||
| dag_run.state = State.FAILED | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should also kill any running tasks. There are a lot of orphaned tasks on TARS because of ctrl+c when running backfill.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. updated. set all the unfinished task state to failed. |
||
| else: | ||
| dag_run.state = State.SUCCESS | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What if the DAG has failed tasks? We wouldn't want to mark the DagRun as SUCCESS in that case, right?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you are right...I thought the unfinished state should include failed task as well. let me do an update.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. updated. |
||
| session.merge(dag_run) | ||
| finally: | ||
| executor.end() | ||
| session.commit() | ||
| executor.end() | ||
|
|
||
| self.log.info("Backfill done. Exiting.") | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can first check the whether the DagRun status is SUCCESS,
If SUCCESS, we can just continue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the dagrun status won't set it here, but set in Basejob(outside loop) in the normal state. But if it encounters ctrl^c, it will not continue the BaseJob remaining function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see.