-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Closed
Closed
Copy link
Labels
affected_version:2.5Issues Reported for 2.5Issues Reported for 2.5area:corekind:bugThis is a clearly a bugThis is a clearly a bug
Description
Apache Airflow Provider(s)
amazon
Versions of Apache Airflow Providers
apache-airflow-providers-amazon-aws==4.1.0
Apache Airflow version
main (development)
Operating System
Debian GNU/Linux 11 (bullseye)
Deployment
Other Docker-based deployment
Deployment details
Using Breeze on main branch.
What happened
Attempting to create dynamically-mapped tasks using the BatchOperator fails with the following DAG import error:
Broken DAG: [/files/dags/batchop_dtm.py] Traceback (most recent call last):
File "/opt/airflow/airflow/serialization/serialized_objects.py", line 693, in _serialize_node
op.operator_extra_links
File "/opt/airflow/airflow/serialization/serialized_objects.py", line 999, in _serialize_operator_extra_links
for operator_extra_link in operator_extra_links:
TypeError: 'property' object is not iterable
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/airflow/airflow/serialization/serialized_objects.py", line 1175, in to_dict
json_dict = {"__version": cls.SERIALIZER_VERSION, "dag": cls.serialize_dag(var)}
File "/opt/airflow/airflow/serialization/serialized_objects.py", line 1083, in serialize_dag
raise SerializationError(f'Failed to serialize DAG {dag.dag_id!r}: {e}')
airflow.exceptions.SerializationError: Failed to serialize DAG 'batchop_dtm': 'property' object is not iterable
What you think should happen instead
Users should be able to use Dynamic Task Mapping to generate BatchOperator tasks without a DAG import/serialization error.
How to reproduce
- Create a DAG similar to the following in which
BatchOperatortasks are dynamically-mapped. Note this is a "toy" example, but it should be applicable to more "real-world" use cases.
from pendulum import datetime
from airflow.decorators import dag
from airflow.providers.amazon.aws.operators.batch import BatchOperator
@dag(start_date=datetime(2022, 1, 1), schedule_interval=None)
def batchop_dtm():
BatchOperator.partial(
task_id='submit_batch_job',
job_queue="batch_job_queue_name",
job_definition="batch_job_definition_name",
overrides={},
# Set this flag to False, so we can test the sensor below
wait_for_completion=False,
).expand(job_name=["job_1", "job_2", "job_3"])
_ = batchop_dtm()- Startup an Airflow environment using Breeze:
breeze start-airflow - The following DAG import error is generated:
Anything else
A similar issue was created previously with related fixes in #24676 and #25215.
I suspect the same behavior would occur using the BigQueryExecuteQueryOperator as well.
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct
ygesher-pw
Metadata
Metadata
Assignees
Labels
affected_version:2.5Issues Reported for 2.5Issues Reported for 2.5area:corekind:bugThis is a clearly a bugThis is a clearly a bug
