Skip to content

Using expand on PostgresOperator with return value from PythonOperator crashes scheduler #23533

@hammerhead

Description

@hammerhead

Apache Airflow version

2.3.0 (latest released)

What happened

Returning a list from a PythonOperator and passing it on to a PostgresOperator using expand causes the scheduler to crash:

scheduler | [2022-05-06 15:44:47,317] {dag.py:2915} INFO - Setting next_dagrun for sum-dag to 2022-05-06T00:00:00+00:00, run_after=2022-05-07T00:00:00+00:00
scheduler | [2022-05-06 15:44:47,384] {scheduler_job.py:753} ERROR - Exception when executing SchedulerJob._run_scheduler_loop
scheduler | Traceback (most recent call last):
scheduler | File "/Users/example/aaa/vvv/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 736, in _execute
scheduler | self._run_scheduler_loop()
scheduler | File "/Users/example/aaa/vvv/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 824, in _run_scheduler_loop
scheduler | num_queued_tis = self._do_scheduling(session)
scheduler | File "/Users/example/aaa/vvv/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 906, in _do_scheduling
scheduler | callback_to_run = self._schedule_dag_run(dag_run, session)
scheduler | File "/Users/example/aaa/vvv/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 1148, in _schedule_dag_run
scheduler | schedulable_tis, callback_to_run = dag_run.update_state(session=session, execute_callbacks=False)
scheduler | File "/Users/example/aaa/vvv/lib/python3.9/site-packages/airflow/utils/session.py", line 68, in wrapper
scheduler | return func(*args, **kwargs)
scheduler | File "/Users/example/aaa/vvv/lib/python3.9/site-packages/airflow/models/dagrun.py", line 522, in update_state
scheduler | info = self.task_instance_scheduling_decisions(session)
scheduler | File "/Users/example/aaa/vvv/lib/python3.9/site-packages/airflow/utils/session.py", line 68, in wrapper
scheduler | return func(*args, **kwargs)
scheduler | File "/Users/example/aaa/vvv/lib/python3.9/site-packages/airflow/models/dagrun.py", line 658, in task_instance_scheduling_decisions
scheduler | schedulable_tis, changed_tis, expansion_happened = self._get_ready_tis(
scheduler | File "/Users/example/aaa/vvv/lib/python3.9/site-packages/airflow/models/dagrun.py", line 714, in _get_ready_tis
scheduler | expanded_tis, _ = schedulable.task.expand_mapped_task(self.run_id, session=session)
scheduler | File "/Users/example/aaa/vvv/lib/python3.9/site-packages/airflow/models/mappedoperator.py", line 609, in expand_mapped_task
scheduler | operator.mul, self._resolve_map_lengths(run_id, session=session).values()
scheduler | File "/Users/example/aaa/vvv/lib/python3.9/site-packages/airflow/models/mappedoperator.py", line 595, in _resolve_map_lengths
scheduler | raise RuntimeError(f"Failed to populate all mapping metadata; missing: {keys}")
scheduler | RuntimeError: Failed to populate all mapping metadata; missing: 'parameters'
scheduler | [2022-05-06 15:44:48,410] {process_utils.py:125} INFO - Sending Signals.SIGTERM to group 92610. PIDs of all processes in the group: [92610]
scheduler | [2022-05-06 15:44:48,412] {process_utils.py:80} INFO - Sending the signal Signals.SIGTERM to group 92610
scheduler | [2022-05-06 15:44:48,684] {process_utils.py:75} INFO - Process psutil.Process(pid=92610, status='terminated', exitcode=0, started='15:44:25') (92610) terminated with exit code 0
scheduler | [2022-05-06 15:44:48,685] {scheduler_job.py:765} INFO - Exited execute loop

What you think should happen instead

The scheduler should not crash and the PostgresOperator should get executed.

How to reproduce

The execution of this DAG fails with the exception posted above:

import pendulum
from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.decorators import task


with DAG(
    dag_id="sum-dag",
    start_date=pendulum.datetime(2021, 11, 19, tz="UTC"),
    schedule_interval="@daily",
    catchup=False,
) as dag:


    @task
    def input_list():
        return [{"v": 1}]


    PostgresOperator.partial(
        task_id="test2",
        sql="SELECT %(v)s",
    ).expand(parameters=input_list())

Passing a list directly to the PostgresOperator (instead of having it returned from a PythonOperator) does work successfully:

import pendulum
from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.decorators import task


with DAG(
    dag_id="sum-dag",
    start_date=pendulum.datetime(2021, 11, 19, tz="UTC"),
    schedule_interval="@daily",
    catchup=False,
) as dag:


    input_list_simple = [{"v": 1}]
    PostgresOperator.partial(
        task_id="test2",
        sql="SELECT %(v)s",
    ).expand(parameters=input_list_simple)

Operating System

macOS 12.3.1

Versions of Apache Airflow Providers

apache-airflow-providers-postgres==4.1.0

Deployment

Virtualenv installation

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions