Skip to content

Mapped TriggerDagRunOperator causes SerializationError due to operator_extra_links 'property' object is not iterable  #24653

@consciencee

Description

@consciencee

Apache Airflow version

2.3.2 (latest released)

What happened

Hi, I have a kind of issue with launching several subDags via mapping TriggerDagRunOperator (mapping over conf parameter). Here is the demo example of my typical DAG:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow import XComArg
from datetime import datetime

with DAG(
    'triggerer',
    schedule_interval=None,
    catchup=False,
    start_date=datetime(2019, 12, 2)
) as dag:

    t1 = PythonOperator(
        task_id='first',
        python_callable=lambda : list(map(lambda i: {"x": i}, list(range(10)))),
    )

    t2 = TriggerDagRunOperator.partial(
        task_id='second',
        trigger_dag_id='mydag'
    ).expand(conf=XComArg(t1))

    t1 >> t2

But when Airflow tries to import such DAG it throws the following SerializationError (which I observed both in UI and in $AIRFLOW_HOME/logs/scheduler/latest/<my_dag_name>.py.log):

Broken DAG: [/home/aliona/airflow/dags/triggerer_dag.py] Traceback (most recent call last):
  File "/home/aliona/airflow/venv/lib/python3.10/site-packages/airflow/serialization/serialized_objects.py", line 638, in _serialize_node
    serialize_op['_operator_extra_links'] = cls._serialize_operator_extra_links(
  File "/home/aliona/airflow/venv/lib/python3.10/site-packages/airflow/serialization/serialized_objects.py", line 933, in _serialize_operator_extra_links
    for operator_extra_link in operator_extra_links:
TypeError: 'property' object is not iterable

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/aliona/airflow/venv/lib/python3.10/site-packages/airflow/serialization/serialized_objects.py", line 1106, in to_dict
    json_dict = {"__version": cls.SERIALIZER_VERSION, "dag": cls.serialize_dag(var)}
  File "/home/aliona/airflow/venv/lib/python3.10/site-packages/airflow/serialization/serialized_objects.py", line 1014, in serialize_dag
    raise SerializationError(f'Failed to serialize DAG {dag.dag_id!r}: {e}')
airflow.exceptions.SerializationError: Failed to serialize DAG 'triggerer': 'property' object is not iterable

How it appears in the UI:
image

What you think should happen instead

I think that TriggerDagRunOperator mapped over conf parameter should serialize and work well by default.

During the debugging process and trying to make everything work I found out that simple non-mapped TriggerDagRunOperator has value ['Triggered DAG'] in operator_extra_links field, so, it is Lisr. But as for mapped TriggerDagRunOperator, it is 'property'. I don't have any idea why during the serialization process Airflow cannot get value of this property, but I tried to reinitialize this field with ['Triggered DAG'] value and finally I fixed this issue in a such way.

For now, for every case of using mapped TriggerDagRunOperator I also use such code at the end of my dag file:

# here 'second' is the name of corresponding mapped TriggerDagRunOperator task (see demo code above)
t2_patch = dag.task_dict['second']
t2_patch.operator_extra_links=['Triggered DAG']
dag.task_dict.update({'second': t2_patch})

So, for every mapped TriggerDagRunOperator task I manually change value of operator_extra_links property to ['Triggered DAG'] and as a result there is no any SerializationError. I have a lot of such cases, and all of them are working good with this fix, all subDags are launched, mapped configuration is passed correctly. Also I can wait for end of their execution or not, all this options also work correctly.

How to reproduce

  1. Create dag with mapped TriggerDagRunOperator tasks (main parameters such as task_id, trigger_dag_id and others are in partial section, in expand section use conf parameter with non-empty iterable value), as, for example:
t2 = TriggerDagRunOperator.partial(
        task_id='second',
        trigger_dag_id='mydag'
    ).expand(conf=[{'x': 1}])
  1. Try to serialize dag, and error will appear.

The full example of failing dag file:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow import XComArg
from datetime import datetime

with DAG(
    'triggerer',
    schedule_interval=None,
    catchup=False,
    start_date=datetime(2019, 12, 2)
) as dag:

    t1 = PythonOperator(
        task_id='first',
        python_callable=lambda : list(map(lambda i: {"x": i}, list(range(10)))),
    )

    t2 = TriggerDagRunOperator.partial(
        task_id='second',
        trigger_dag_id='mydag'
    ).expand(conf=[{'a': 1}])

    t1 >> t2

# uncomment these lines to fix an error
# t2_patch = dag.task_dict['second']
# t2_patch.operator_extra_links=['Triggered DAG']
# dag.task_dict.update({'second': t2_patch})

As subDag ('mydag') I use these DAG:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

with DAG(
    'mydag',
    schedule_interval=None,
    catchup=False,
    start_date=datetime(2019, 12, 2)
) as dag:
    t1 = PythonOperator(
        task_id='first',
        python_callable=lambda : print("first"),
    )
    t2 = PythonOperator(
        task_id='second',
        python_callable=lambda : print("second"),
    )
    t1 >> t2

Operating System

Ubuntu 22.04 LTS

Versions of Apache Airflow Providers

apache-airflow-providers-ftp==2.1.2
apache-airflow-providers-http==2.1.2
apache-airflow-providers-imap==2.2.3
apache-airflow-providers-sqlite==2.1.3

Deployment

Virtualenv installation

Deployment details

Python 3.10.4
pip 22.0.2

Anything else

Currently for demonstration purposes I am using fully local Airflow installation: single node, SequentialExecutor and SQLite database backend. But such issue also appeared for multi-node installation with either CeleryExecutor or LocalExecutor and PostgreSQL database in the backend.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions