@@ -4099,42 +4099,42 @@ def test_catchup_works_correctly(self, dag_maker):
40994099 ) > (timezone .utcnow () - timedelta (days = 2 ))
41004100
41014101
4102- @pytest .mark .xfail ( reason = "Work out where this goes" )
4103- def test_task_with_upstream_skip_process_task_instances ( ):
4102+ @pytest .mark .need_serialized_dag
4103+ def test_schedule_dag_run_with_upstream_skip ( dag_maker , session ):
41044104 """
4105- Test if _process_task_instances puts a task instance into SKIPPED state if any of its
4105+ Test if _schedule_dag_run puts a task instance into SKIPPED state if any of its
41064106 upstream tasks are skipped according to TriggerRuleDep.
41074107 """
4108- clear_db_runs ()
4109- with DAG (
4110- dag_id = 'test_task_with_upstream_skip_dag' , start_date = DEFAULT_DATE , schedule_interval = None
4111- ) as dag :
4108+ with dag_maker (
4109+ dag_id = 'test_task_with_upstream_skip_process_task_instances' ,
4110+ start_date = DEFAULT_DATE ,
4111+ session = session ,
4112+ ):
41124113 dummy1 = EmptyOperator (task_id = 'dummy1' )
41134114 dummy2 = EmptyOperator (task_id = "dummy2" )
41144115 dummy3 = EmptyOperator (task_id = "dummy3" )
41154116 [dummy1 , dummy2 ] >> dummy3
4116-
41174117 # dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
4118- dag .clear ()
4119- dr = dag .create_dagrun (run_type = DagRunType .MANUAL , state = State .RUNNING , execution_date = DEFAULT_DATE )
4118+ dr = dag_maker .create_dagrun (state = State .RUNNING )
41204119 assert dr is not None
41214120
4122- with create_session () as session :
4123- tis = { ti . task_id : ti for ti in dr . get_task_instances ( session = session )}
4124- # Set dummy1 to skipped and dummy2 to success. dummy3 remains as none.
4125- tis [dummy1 .task_id ].state = State .SKIPPED
4126- tis [dummy2 .task_id ].state = State .SUCCESS
4127- assert tis [ dummy3 . task_id ]. state == State . NONE
4121+ tis = { ti . task_id : ti for ti in dr . get_task_instances ( session = session )}
4122+ # Set dummy1 to skipped and dummy2 to success. dummy3 remains as none.
4123+ tis [ dummy1 . task_id ]. state = State . SKIPPED
4124+ tis [dummy2 .task_id ].state = State .SUCCESS
4125+ assert tis [dummy3 .task_id ].state == State .NONE
4126+ session . flush ()
41284127
41294128 # dag_runs = DagRun.find(dag_id='test_task_with_upstream_skip_dag')
41304129 # dag_file_processor._process_task_instances(dag, dag_runs=dag_runs)
4131-
4132- with create_session () as session :
4133- tis = {ti .task_id : ti for ti in dr .get_task_instances (session = session )}
4134- assert tis [dummy1 .task_id ].state == State .SKIPPED
4135- assert tis [dummy2 .task_id ].state == State .SUCCESS
4136- # dummy3 should be skipped because dummy1 is skipped.
4137- assert tis [dummy3 .task_id ].state == State .SKIPPED
4130+ scheduler_job = SchedulerJob (subdir = os .devnull )
4131+ scheduler_job ._schedule_dag_run (dr , session )
4132+ session .flush ()
4133+ tis = {ti .task_id : ti for ti in dr .get_task_instances (session = session )}
4134+ assert tis [dummy1 .task_id ].state == State .SKIPPED
4135+ assert tis [dummy2 .task_id ].state == State .SUCCESS
4136+ # dummy3 should be skipped because dummy1 is skipped.
4137+ assert tis [dummy3 .task_id ].state == State .SKIPPED
41384138
41394139
41404140class TestSchedulerJobQueriesCount :
0 commit comments