-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Closed
Labels
area:corearea:dynamic-task-mappingAIP-42AIP-42kind:bugThis is a clearly a bugThis is a clearly a bug
Milestone
Description
Apache Airflow version
2.3.0 (latest released)
What happened
Returning a list from a PythonOperator and passing it on to a PostgresOperator using expand causes the scheduler to crash:
scheduler | [2022-05-06 15:44:47,317] {dag.py:2915} INFO - Setting next_dagrun for sum-dag to 2022-05-06T00:00:00+00:00, run_after=2022-05-07T00:00:00+00:00
scheduler | [2022-05-06 15:44:47,384] {scheduler_job.py:753} ERROR - Exception when executing SchedulerJob._run_scheduler_loop
scheduler | Traceback (most recent call last):
scheduler | File "/Users/example/aaa/vvv/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 736, in _execute
scheduler | self._run_scheduler_loop()
scheduler | File "/Users/example/aaa/vvv/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 824, in _run_scheduler_loop
scheduler | num_queued_tis = self._do_scheduling(session)
scheduler | File "/Users/example/aaa/vvv/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 906, in _do_scheduling
scheduler | callback_to_run = self._schedule_dag_run(dag_run, session)
scheduler | File "/Users/example/aaa/vvv/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 1148, in _schedule_dag_run
scheduler | schedulable_tis, callback_to_run = dag_run.update_state(session=session, execute_callbacks=False)
scheduler | File "/Users/example/aaa/vvv/lib/python3.9/site-packages/airflow/utils/session.py", line 68, in wrapper
scheduler | return func(*args, **kwargs)
scheduler | File "/Users/example/aaa/vvv/lib/python3.9/site-packages/airflow/models/dagrun.py", line 522, in update_state
scheduler | info = self.task_instance_scheduling_decisions(session)
scheduler | File "/Users/example/aaa/vvv/lib/python3.9/site-packages/airflow/utils/session.py", line 68, in wrapper
scheduler | return func(*args, **kwargs)
scheduler | File "/Users/example/aaa/vvv/lib/python3.9/site-packages/airflow/models/dagrun.py", line 658, in task_instance_scheduling_decisions
scheduler | schedulable_tis, changed_tis, expansion_happened = self._get_ready_tis(
scheduler | File "/Users/example/aaa/vvv/lib/python3.9/site-packages/airflow/models/dagrun.py", line 714, in _get_ready_tis
scheduler | expanded_tis, _ = schedulable.task.expand_mapped_task(self.run_id, session=session)
scheduler | File "/Users/example/aaa/vvv/lib/python3.9/site-packages/airflow/models/mappedoperator.py", line 609, in expand_mapped_task
scheduler | operator.mul, self._resolve_map_lengths(run_id, session=session).values()
scheduler | File "/Users/example/aaa/vvv/lib/python3.9/site-packages/airflow/models/mappedoperator.py", line 595, in _resolve_map_lengths
scheduler | raise RuntimeError(f"Failed to populate all mapping metadata; missing: {keys}")
scheduler | RuntimeError: Failed to populate all mapping metadata; missing: 'parameters'
scheduler | [2022-05-06 15:44:48,410] {process_utils.py:125} INFO - Sending Signals.SIGTERM to group 92610. PIDs of all processes in the group: [92610]
scheduler | [2022-05-06 15:44:48,412] {process_utils.py:80} INFO - Sending the signal Signals.SIGTERM to group 92610
scheduler | [2022-05-06 15:44:48,684] {process_utils.py:75} INFO - Process psutil.Process(pid=92610, status='terminated', exitcode=0, started='15:44:25') (92610) terminated with exit code 0
scheduler | [2022-05-06 15:44:48,685] {scheduler_job.py:765} INFO - Exited execute loop
What you think should happen instead
The scheduler should not crash and the PostgresOperator should get executed.
How to reproduce
The execution of this DAG fails with the exception posted above:
import pendulum
from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.decorators import task
with DAG(
dag_id="sum-dag",
start_date=pendulum.datetime(2021, 11, 19, tz="UTC"),
schedule_interval="@daily",
catchup=False,
) as dag:
@task
def input_list():
return [{"v": 1}]
PostgresOperator.partial(
task_id="test2",
sql="SELECT %(v)s",
).expand(parameters=input_list())Passing a list directly to the PostgresOperator (instead of having it returned from a PythonOperator) does work successfully:
import pendulum
from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.decorators import task
with DAG(
dag_id="sum-dag",
start_date=pendulum.datetime(2021, 11, 19, tz="UTC"),
schedule_interval="@daily",
catchup=False,
) as dag:
input_list_simple = [{"v": 1}]
PostgresOperator.partial(
task_id="test2",
sql="SELECT %(v)s",
).expand(parameters=input_list_simple)Operating System
macOS 12.3.1
Versions of Apache Airflow Providers
apache-airflow-providers-postgres==4.1.0
Deployment
Virtualenv installation
Deployment details
No response
Anything else
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct
Metadata
Metadata
Assignees
Labels
area:corearea:dynamic-task-mappingAIP-42AIP-42kind:bugThis is a clearly a bugThis is a clearly a bug