Skip to content

Commit dbe80c8

Browse files
authored
Fix xfail test in test_scheduler.py (#23731)
1 parent 337863f commit dbe80c8

File tree

1 file changed

+23
-23
lines changed

1 file changed

+23
-23
lines changed

tests/jobs/test_scheduler_job.py

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -4099,42 +4099,42 @@ def test_catchup_works_correctly(self, dag_maker):
40994099
) > (timezone.utcnow() - timedelta(days=2))
41004100

41014101

4102-
@pytest.mark.xfail(reason="Work out where this goes")
4103-
def test_task_with_upstream_skip_process_task_instances():
4102+
@pytest.mark.need_serialized_dag
4103+
def test_schedule_dag_run_with_upstream_skip(dag_maker, session):
41044104
"""
4105-
Test if _process_task_instances puts a task instance into SKIPPED state if any of its
4105+
Test if _schedule_dag_run puts a task instance into SKIPPED state if any of its
41064106
upstream tasks are skipped according to TriggerRuleDep.
41074107
"""
4108-
clear_db_runs()
4109-
with DAG(
4110-
dag_id='test_task_with_upstream_skip_dag', start_date=DEFAULT_DATE, schedule_interval=None
4111-
) as dag:
4108+
with dag_maker(
4109+
dag_id='test_task_with_upstream_skip_process_task_instances',
4110+
start_date=DEFAULT_DATE,
4111+
session=session,
4112+
):
41124113
dummy1 = EmptyOperator(task_id='dummy1')
41134114
dummy2 = EmptyOperator(task_id="dummy2")
41144115
dummy3 = EmptyOperator(task_id="dummy3")
41154116
[dummy1, dummy2] >> dummy3
4116-
41174117
# dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
4118-
dag.clear()
4119-
dr = dag.create_dagrun(run_type=DagRunType.MANUAL, state=State.RUNNING, execution_date=DEFAULT_DATE)
4118+
dr = dag_maker.create_dagrun(state=State.RUNNING)
41204119
assert dr is not None
41214120

4122-
with create_session() as session:
4123-
tis = {ti.task_id: ti for ti in dr.get_task_instances(session=session)}
4124-
# Set dummy1 to skipped and dummy2 to success. dummy3 remains as none.
4125-
tis[dummy1.task_id].state = State.SKIPPED
4126-
tis[dummy2.task_id].state = State.SUCCESS
4127-
assert tis[dummy3.task_id].state == State.NONE
4121+
tis = {ti.task_id: ti for ti in dr.get_task_instances(session=session)}
4122+
# Set dummy1 to skipped and dummy2 to success. dummy3 remains as none.
4123+
tis[dummy1.task_id].state = State.SKIPPED
4124+
tis[dummy2.task_id].state = State.SUCCESS
4125+
assert tis[dummy3.task_id].state == State.NONE
4126+
session.flush()
41284127

41294128
# dag_runs = DagRun.find(dag_id='test_task_with_upstream_skip_dag')
41304129
# dag_file_processor._process_task_instances(dag, dag_runs=dag_runs)
4131-
4132-
with create_session() as session:
4133-
tis = {ti.task_id: ti for ti in dr.get_task_instances(session=session)}
4134-
assert tis[dummy1.task_id].state == State.SKIPPED
4135-
assert tis[dummy2.task_id].state == State.SUCCESS
4136-
# dummy3 should be skipped because dummy1 is skipped.
4137-
assert tis[dummy3.task_id].state == State.SKIPPED
4130+
scheduler_job = SchedulerJob(subdir=os.devnull)
4131+
scheduler_job._schedule_dag_run(dr, session)
4132+
session.flush()
4133+
tis = {ti.task_id: ti for ti in dr.get_task_instances(session=session)}
4134+
assert tis[dummy1.task_id].state == State.SKIPPED
4135+
assert tis[dummy2.task_id].state == State.SUCCESS
4136+
# dummy3 should be skipped because dummy1 is skipped.
4137+
assert tis[dummy3.task_id].state == State.SKIPPED
41384138

41394139

41404140
class TestSchedulerJobQueriesCount:

0 commit comments

Comments
 (0)