Skip to content

Commit 3396d1f

Browse files
Fix Deferrable stuck as "scheduled" during backfill (#26205)
Co-authored-by: Tzu-ping Chung <[email protected]>
1 parent f01eed6 commit 3396d1f

File tree

2 files changed

+30
-0
lines changed

2 files changed

+30
-0
lines changed

airflow/jobs/backfill_job.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,12 @@ def _update_counters(self, ti_status, session=None):
217217
tis_to_be_scheduled.append(ti)
218218
ti_status.running.pop(reduced_key)
219219
ti_status.to_run[ti.key] = ti
220+
# special case: Deferrable task can go from DEFERRED to SCHEDULED;
221+
# when that happens, we need to put it back as in UP_FOR_RESCHEDULE
222+
elif ti.state == TaskInstanceState.SCHEDULED:
223+
self.log.debug("Task instance %s is resumed from deferred state", ti)
224+
ti_status.running.pop(ti.key)
225+
ti_status.to_run[ti.key] = ti
220226

221227
# Batch schedule of task instances
222228
if tis_to_be_scheduled:

tests/jobs/test_backfill_job.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1415,6 +1415,30 @@ def test_update_counters(self, dag_maker, session):
14151415

14161416
ti_status.to_run.clear()
14171417

1418+
# test for scheduled
1419+
ti.set_state(State.SCHEDULED)
1420+
# Deferred tasks are put into scheduled by the triggerer
1421+
# Check that they are put into to_run
1422+
ti_status.running[ti.key] = ti
1423+
job._update_counters(ti_status=ti_status, session=session)
1424+
assert len(ti_status.running) == 0
1425+
assert len(ti_status.succeeded) == 0
1426+
assert len(ti_status.skipped) == 0
1427+
assert len(ti_status.failed) == 0
1428+
assert len(ti_status.to_run) == 1
1429+
1430+
ti_status.to_run.clear()
1431+
# test for deferred
1432+
# if a task is deferred and it's not yet time for the triggerer
1433+
# to reschedule it, we should leave it in ti_status.running
1434+
ti.set_state(State.DEFERRED)
1435+
ti_status.running[ti.key] = ti
1436+
job._update_counters(ti_status=ti_status, session=session)
1437+
assert len(ti_status.running) == 1
1438+
assert len(ti_status.succeeded) == 0
1439+
assert len(ti_status.skipped) == 0
1440+
assert len(ti_status.failed) == 0
1441+
assert len(ti_status.to_run) == 0
14181442
session.close()
14191443

14201444
def test_dag_dagrun_infos_between(self, dag_maker):

0 commit comments

Comments
 (0)