-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Description
Apache Airflow version
2.3.2 (latest released)
What happened
Hi, I have a kind of issue with launching several subDags via mapping TriggerDagRunOperator (mapping over conf parameter). Here is the demo example of my typical DAG:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow import XComArg
from datetime import datetime
with DAG(
'triggerer',
schedule_interval=None,
catchup=False,
start_date=datetime(2019, 12, 2)
) as dag:
t1 = PythonOperator(
task_id='first',
python_callable=lambda : list(map(lambda i: {"x": i}, list(range(10)))),
)
t2 = TriggerDagRunOperator.partial(
task_id='second',
trigger_dag_id='mydag'
).expand(conf=XComArg(t1))
t1 >> t2But when Airflow tries to import such DAG it throws the following SerializationError (which I observed both in UI and in $AIRFLOW_HOME/logs/scheduler/latest/<my_dag_name>.py.log):
Broken DAG: [/home/aliona/airflow/dags/triggerer_dag.py] Traceback (most recent call last):
File "/home/aliona/airflow/venv/lib/python3.10/site-packages/airflow/serialization/serialized_objects.py", line 638, in _serialize_node
serialize_op['_operator_extra_links'] = cls._serialize_operator_extra_links(
File "/home/aliona/airflow/venv/lib/python3.10/site-packages/airflow/serialization/serialized_objects.py", line 933, in _serialize_operator_extra_links
for operator_extra_link in operator_extra_links:
TypeError: 'property' object is not iterable
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/aliona/airflow/venv/lib/python3.10/site-packages/airflow/serialization/serialized_objects.py", line 1106, in to_dict
json_dict = {"__version": cls.SERIALIZER_VERSION, "dag": cls.serialize_dag(var)}
File "/home/aliona/airflow/venv/lib/python3.10/site-packages/airflow/serialization/serialized_objects.py", line 1014, in serialize_dag
raise SerializationError(f'Failed to serialize DAG {dag.dag_id!r}: {e}')
airflow.exceptions.SerializationError: Failed to serialize DAG 'triggerer': 'property' object is not iterable
What you think should happen instead
I think that TriggerDagRunOperator mapped over conf parameter should serialize and work well by default.
During the debugging process and trying to make everything work I found out that simple non-mapped TriggerDagRunOperator has value ['Triggered DAG'] in operator_extra_links field, so, it is Lisr. But as for mapped TriggerDagRunOperator, it is 'property'. I don't have any idea why during the serialization process Airflow cannot get value of this property, but I tried to reinitialize this field with ['Triggered DAG'] value and finally I fixed this issue in a such way.
For now, for every case of using mapped TriggerDagRunOperator I also use such code at the end of my dag file:
# here 'second' is the name of corresponding mapped TriggerDagRunOperator task (see demo code above)
t2_patch = dag.task_dict['second']
t2_patch.operator_extra_links=['Triggered DAG']
dag.task_dict.update({'second': t2_patch})So, for every mapped TriggerDagRunOperator task I manually change value of operator_extra_links property to ['Triggered DAG'] and as a result there is no any SerializationError. I have a lot of such cases, and all of them are working good with this fix, all subDags are launched, mapped configuration is passed correctly. Also I can wait for end of their execution or not, all this options also work correctly.
How to reproduce
- Create dag with mapped TriggerDagRunOperator tasks (main parameters such as task_id, trigger_dag_id and others are in
partial section, inexpandsection use conf parameter with non-empty iterable value), as, for example:
t2 = TriggerDagRunOperator.partial(
task_id='second',
trigger_dag_id='mydag'
).expand(conf=[{'x': 1}])- Try to serialize dag, and error will appear.
The full example of failing dag file:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow import XComArg
from datetime import datetime
with DAG(
'triggerer',
schedule_interval=None,
catchup=False,
start_date=datetime(2019, 12, 2)
) as dag:
t1 = PythonOperator(
task_id='first',
python_callable=lambda : list(map(lambda i: {"x": i}, list(range(10)))),
)
t2 = TriggerDagRunOperator.partial(
task_id='second',
trigger_dag_id='mydag'
).expand(conf=[{'a': 1}])
t1 >> t2
# uncomment these lines to fix an error
# t2_patch = dag.task_dict['second']
# t2_patch.operator_extra_links=['Triggered DAG']
# dag.task_dict.update({'second': t2_patch})As subDag ('mydag') I use these DAG:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
with DAG(
'mydag',
schedule_interval=None,
catchup=False,
start_date=datetime(2019, 12, 2)
) as dag:
t1 = PythonOperator(
task_id='first',
python_callable=lambda : print("first"),
)
t2 = PythonOperator(
task_id='second',
python_callable=lambda : print("second"),
)
t1 >> t2Operating System
Ubuntu 22.04 LTS
Versions of Apache Airflow Providers
apache-airflow-providers-ftp==2.1.2
apache-airflow-providers-http==2.1.2
apache-airflow-providers-imap==2.2.3
apache-airflow-providers-sqlite==2.1.3
Deployment
Virtualenv installation
Deployment details
Python 3.10.4
pip 22.0.2
Anything else
Currently for demonstration purposes I am using fully local Airflow installation: single node, SequentialExecutor and SQLite database backend. But such issue also appeared for multi-node installation with either CeleryExecutor or LocalExecutor and PostgreSQL database in the backend.
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct
