-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Description
Apache Airflow version
2.4.2
What happened
There is a mapped task is getting marked as upstream_failed when none of its upstream tasks are failed or upstream_failed.
In the above graph view, if first_task finishes before second_task, first_task immediately tries to expand middle_task. Note - this is an important step to reproduce - The order the tasks finish matter.
Note that the value of the Airflow configuration variable schedule_after_task_execution must be True (the default) for this to occur.
The expansion occurs when the Task supervisor performs the "mini scheduler", in this line in dagrun.py.
Which then marks middle_task as upstream_failed in this line in mappedoperator.py:
# If the map length cannot be calculated (due to unavailable
# upstream sources), fail the unmapped task.
I believe this was introduced by the PR Fail task if mapping upstream fails.
What you think should happen instead
The dynamic tasks should successfully execute. I don't think the mapped task should expand because its upstream task hasn't completed at the time it's expanded. If the upstream task were to complete earlier, it would expand successfully.
How to reproduce
Execute this DAG, making sure Airflow configuration schedule_after_task_execution is set to default value True.
from datetime import datetime, timedelta
import time
from airflow import DAG, XComArg
from airflow.operators.python import PythonOperator
class PrintIdOperator(PythonOperator):
def __init__(self, id, **kwargs) -> None:
super().__init__(**kwargs)
self.op_kwargs["id"] = id
DAG_ID = "test_upstream_failed_on_mapped_operator_expansion"
default_args = {
"owner": "airflow",
"depends_on_past": False,
"retry_delay": timedelta(minutes=1),
"retries": 0,
}
def nop(id):
print(f"{id=}")
def get_ids(delay: int = 0):
print(f"Delaying {delay} seconds...")
time.sleep(delay)
print("Done!")
return [0, 1, 2]
with DAG(
dag_id=DAG_ID,
default_args=default_args,
start_date=datetime(2022, 8, 3),
catchup=False,
schedule=None,
max_active_runs=1,
) as dag:
second_task = PythonOperator(
task_id="second_task",
python_callable=get_ids,
op_kwargs={"delay": 10}
)
first_task = PythonOperator(
task_id="first_task",
python_callable=get_ids,
)
middle_task = PrintIdOperator.partial(
task_id="middle_task",
python_callable=nop,
).expand(id=XComArg(second_task))
last_task = PythonOperator(
task_id="last_task",
python_callable=nop,
op_kwargs={"id": 1},
)
[first_task, middle_task] >> last_task
Operating System
debian buster
Versions of Apache Airflow Providers
No response
Deployment
Docker-Compose
Deployment details
No response
Anything else
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct
