Skip to content

Commit 7813f99

Browse files
authored
Fix scheduler crash when expanding with mapped task that returned none (#23486)
When task is expanded from a mapped task that returned no value, it crashes the scheduler. This PR fixes it by first checking if there's a return value from the mapped task, if no returned value, then error in the task itself instead of crashing the scheduler
1 parent 4485393 commit 7813f99

File tree

2 files changed

+21
-1
lines changed

2 files changed

+21
-1
lines changed

airflow/models/taskinstance.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2329,10 +2329,12 @@ def _record_task_map_for_downstreams(self, task: "Operator", value: Any, *, sess
23292329
# currently possible for a downstream to depend on one individual mapped
23302330
# task instance, only a task as a whole. This will change in AIP-42
23312331
# Phase 2, and we'll need to further analyze the mapped task case.
2332-
if task.is_mapped or next(task.iter_mapped_dependants(), None) is None:
2332+
if next(task.iter_mapped_dependants(), None) is None:
23332333
return
23342334
if value is None:
23352335
raise XComForMappingNotPushed()
2336+
if task.is_mapped:
2337+
return
23362338
if not isinstance(value, collections.abc.Collection) or isinstance(value, (bytes, str)):
23372339
raise UnmappableXComTypePushed(value)
23382340
task_map = TaskMap.from_task_instance_xcom(self, value)

tests/models/test_taskinstance.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2831,3 +2831,21 @@ def add_one(x):
28312831

28322832
query = XCom.get_many(run_id=dagrun.run_id, task_ids=["add_one__1"], session=session)
28332833
assert [x.value for x in query.order_by(None).order_by(XCom.map_index)] == [3, 4, 5]
2834+
2835+
2836+
def test_ti_mapped_depends_on_mapped_xcom_arg_XXX(dag_maker, session):
2837+
with dag_maker(session=session) as dag:
2838+
2839+
@dag.task
2840+
def add_one(x):
2841+
x + 1
2842+
2843+
two_three_four = add_one.expand(x=[1, 2, 3])
2844+
add_one.expand(x=two_three_four)
2845+
2846+
dagrun = dag_maker.create_dagrun()
2847+
for map_index in range(3):
2848+
ti = dagrun.get_task_instance("add_one", map_index=map_index)
2849+
ti.refresh_from_task(dag.get_task("add_one"))
2850+
with pytest.raises(XComForMappingNotPushed):
2851+
ti.run()

0 commit comments

Comments
 (0)