Skip to content

Backfill mode with mapped tasks: "Failed to populate all mapping metadata" #25698

@Gollum999

Description

@Gollum999

Apache Airflow version

2.3.3

What happened

I was backfilling some DAGs that use dynamic tasks when I got an exception like the following:

Traceback (most recent call last):
  File "/opt/conda/envs/production/bin/airflow", line 11, in <module>
    sys.exit(main())
  File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/__main__.py", line 38, in main
    args.func(args)
  File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/cli/cli_parser.py", line 51, in command
    return func(*args, **kwargs)
  File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/cli.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/cli/commands/dag_command.py", line 107, in dag_backfill
    dag.run(
  File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/models/dag.py", line 2288, in run
    job.run()
  File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/jobs/base_job.py", line 244, in run
    self._execute()
  File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/session.py", line 71, in wrapper
    return func(*args, session=session, **kwargs)
  File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/jobs/backfill_job.py", line 847, in _execute
    self._execute_dagruns(
  File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/session.py", line 68, in wrapper
    return func(*args, **kwargs)
  File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/jobs/backfill_job.py", line 737, in _execute_dagruns
    processed_dag_run_dates = self._process_backfill_task_instances(
  File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/session.py", line 68, in wrapper
    return func(*args, **kwargs)
  File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/jobs/backfill_job.py", line 612, in _process_backfill_task_instances
    for node, run_id, new_mapped_tis, max_map_index in self._manage_executor_state(
  File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/jobs/backfill_job.py", line 270, in _manage_executor_state
    new_tis, num_mapped_tis = node.expand_mapped_task(ti.run_id, session=session)
  File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/models/mappedoperator.py", line 614, in expand_mapped_task
    operator.mul, self._resolve_map_lengths(run_id, session=session).values()
  File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/models/mappedoperator.py", line 600, in _resolve_map_lengths
    raise RuntimeError(f"Failed to populate all mapping metadata; missing: {keys}")
RuntimeError: Failed to populate all mapping metadata; missing: 'x'

Digging further, it appears this always happens if the task used as input to an .expand raises an Exception. Airflow doesn't handle this exception gracefully like it does with exceptions in "normal" tasks, which can lead to other errors from deeper within Airflow. This also means that since this is not a "typical" failure case, things like --rerun-failed-tasks do not work as expected.

What you think should happen instead

Airflow should fail gracefully if exceptions are raised in dynamic task generators.

How to reproduce

#!/usr/bin/env python3

import datetime
import logging

from airflow.decorators import dag, task


logger = logging.getLogger(__name__)


@dag(
    schedule_interval='@daily',
    start_date=datetime.datetime(2022, 8, 12),
    default_args={
        'retries': 5,
        'retry_delay': 5.0,
    },
)
def test_backfill():
    @task
    def get_tasks(ti=None):
        logger.info(f'{ti.try_number=}')
        if ti.try_number < 3:
            raise RuntimeError('')
        return ['a', 'b', 'c']

    @task
    def do_stuff(x=None, ti=None):
        logger.info(f'do_stuff: {x=}, {ti.try_number=}')
        if ti.try_number < 3:
            raise RuntimeError('')

    do_stuff.expand(x=do_stuff.expand(x=get_tasks()))
    do_stuff() >> do_stuff()  # this works as expected


dag = test_backfill()


if __name__ == '__main__':
    dag.cli()
airflow dags backfill test_backfill -s 2022-08-05 -e 2022-08-07 --rerun-failed-tasks

You can repeat the backfill command multiple times to slowly make progress through the DAG. Things will eventually succeed (assuming the exception that triggers this bug stops being raised), but obviously this is a pain when trying to backfill a non-trivial number of DAG Runs.

Operating System

CentOS Stream 8

Versions of Apache Airflow Providers

None

Deployment

Other

Deployment details

Standalone

Anything else

I was able to reproduce this both with SQLite + SequentialExecutor as well as with Postgres + LocalExecutor.

I haven't yet been able to reproduce this outside of backfill mode.

Possibly related since they mention the same exception text:

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Labels

area:Schedulerincluding HA (high availability) schedulerkind:bugThis is a clearly a bug

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions