Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion airflow/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2008,6 +2008,7 @@ def __init__(
self.verbose = verbose
self.conf = conf
self.rerun_failed_tasks = rerun_failed_tasks
self.dag_runs = []
super(BackfillJob, self).__init__(*args, **kwargs)

def _update_counters(self, ti_status):
Expand Down Expand Up @@ -2469,6 +2470,7 @@ def _execute_for_run_dates(self, run_dates, ti_status, executor, pickle_id,
session=session)
if dag_run is None:
continue
self.dag_runs.append(dag_run)

ti_status.active_runs.append(dag_run)
ti_status.to_run.update(tis_map or {})
Expand Down Expand Up @@ -2540,9 +2542,32 @@ def _execute(self, session=None):
self.dag_id
)
time.sleep(self.delay_on_limit_secs)

except (KeyboardInterrupt, SystemExit):
for dag_run in self.dag_runs:
dag_run.refresh_from_db(session)
make_transient(dag_run)

# Check all the tasks which are not in success state.
check_state = State.unfinished() + State.finished()
check_state.remove(State.SUCCESS)

unfinished_tasks = dag_run.get_task_instances(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can first check the whether the DagRun status is SUCCESS,
If SUCCESS, we can just continue

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the dagrun status won't set it here, but set in Basejob(outside loop) in the normal state. But if it encounters ctrl^c, it will not continue the BaseJob remaining function.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see.

state=check_state,
session=session
)
if unfinished_tasks:
# if there are unfinished tasks and ctrl^c
# set dag run state to failed
for task in unfinished_tasks:
task.set_state(State.FAILED, session)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this actually kill any running tasks?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my understanding is that we mostly run backfill with local scheduler which spin off couples of worker multiprocess to run the task. Once we set the task state to failed, the task will get rerun next time(using clear first, or with --rerun_failed_task). And if we ctrol^c the main backfill process, it will kill the worker multiprocesses as well which won't leave any orphan task still keep running.

dag_run.state = State.FAILED
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also kill any running tasks. There are a lot of orphaned tasks on TARS because of ctrl+c when running backfill.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated. set all the unfinished task state to failed.

else:
dag_run.state = State.SUCCESS
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the DAG has failed tasks? We wouldn't want to mark the DagRun as SUCCESS in that case, right?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are right...I thought the unfinished state should include failed task as well. let me do an update.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.

session.merge(dag_run)
finally:
executor.end()
session.commit()
executor.end()

self.log.info("Backfill done. Exiting.")

Expand Down