-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Description
Apache Airflow version
2.3.3
What happened
I was backfilling some DAGs that use dynamic tasks when I got an exception like the following:
Traceback (most recent call last):
File "/opt/conda/envs/production/bin/airflow", line 11, in <module>
sys.exit(main())
File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/__main__.py", line 38, in main
args.func(args)
File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/cli/cli_parser.py", line 51, in command
return func(*args, **kwargs)
File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/cli.py", line 99, in wrapper
return f(*args, **kwargs)
File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/cli/commands/dag_command.py", line 107, in dag_backfill
dag.run(
File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/models/dag.py", line 2288, in run
job.run()
File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/jobs/base_job.py", line 244, in run
self._execute()
File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/session.py", line 71, in wrapper
return func(*args, session=session, **kwargs)
File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/jobs/backfill_job.py", line 847, in _execute
self._execute_dagruns(
File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/session.py", line 68, in wrapper
return func(*args, **kwargs)
File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/jobs/backfill_job.py", line 737, in _execute_dagruns
processed_dag_run_dates = self._process_backfill_task_instances(
File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/session.py", line 68, in wrapper
return func(*args, **kwargs)
File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/jobs/backfill_job.py", line 612, in _process_backfill_task_instances
for node, run_id, new_mapped_tis, max_map_index in self._manage_executor_state(
File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/jobs/backfill_job.py", line 270, in _manage_executor_state
new_tis, num_mapped_tis = node.expand_mapped_task(ti.run_id, session=session)
File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/models/mappedoperator.py", line 614, in expand_mapped_task
operator.mul, self._resolve_map_lengths(run_id, session=session).values()
File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/models/mappedoperator.py", line 600, in _resolve_map_lengths
raise RuntimeError(f"Failed to populate all mapping metadata; missing: {keys}")
RuntimeError: Failed to populate all mapping metadata; missing: 'x'
Digging further, it appears this always happens if the task used as input to an .expand raises an Exception. Airflow doesn't handle this exception gracefully like it does with exceptions in "normal" tasks, which can lead to other errors from deeper within Airflow. This also means that since this is not a "typical" failure case, things like --rerun-failed-tasks do not work as expected.
What you think should happen instead
Airflow should fail gracefully if exceptions are raised in dynamic task generators.
How to reproduce
#!/usr/bin/env python3
import datetime
import logging
from airflow.decorators import dag, task
logger = logging.getLogger(__name__)
@dag(
schedule_interval='@daily',
start_date=datetime.datetime(2022, 8, 12),
default_args={
'retries': 5,
'retry_delay': 5.0,
},
)
def test_backfill():
@task
def get_tasks(ti=None):
logger.info(f'{ti.try_number=}')
if ti.try_number < 3:
raise RuntimeError('')
return ['a', 'b', 'c']
@task
def do_stuff(x=None, ti=None):
logger.info(f'do_stuff: {x=}, {ti.try_number=}')
if ti.try_number < 3:
raise RuntimeError('')
do_stuff.expand(x=do_stuff.expand(x=get_tasks()))
do_stuff() >> do_stuff() # this works as expected
dag = test_backfill()
if __name__ == '__main__':
dag.cli()
airflow dags backfill test_backfill -s 2022-08-05 -e 2022-08-07 --rerun-failed-tasks
You can repeat the backfill command multiple times to slowly make progress through the DAG. Things will eventually succeed (assuming the exception that triggers this bug stops being raised), but obviously this is a pain when trying to backfill a non-trivial number of DAG Runs.
Operating System
CentOS Stream 8
Versions of Apache Airflow Providers
None
Deployment
Other
Deployment details
Standalone
Anything else
I was able to reproduce this both with SQLite + SequentialExecutor as well as with Postgres + LocalExecutor.
I haven't yet been able to reproduce this outside of backfill mode.
Possibly related since they mention the same exception text:
- Using
expandonPostgresOperatorwith return value fromPythonOperatorcrashes scheduler #23533 - Dynamic Task Crashes scheduler - Non Empty Return #23642
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct