Skip to content

Mapped tasks with operator_extra_links property object causes SerializationError #25243

@josh-fell

Description

@josh-fell

Apache Airflow Provider(s)

amazon

Versions of Apache Airflow Providers

apache-airflow-providers-amazon-aws==4.1.0

Apache Airflow version

main (development)

Operating System

Debian GNU/Linux 11 (bullseye)

Deployment

Other Docker-based deployment

Deployment details

Using Breeze on main branch.

What happened

Attempting to create dynamically-mapped tasks using the BatchOperator fails with the following DAG import error:

Broken DAG: [/files/dags/batchop_dtm.py] Traceback (most recent call last):
  File "/opt/airflow/airflow/serialization/serialized_objects.py", line 693, in _serialize_node
    op.operator_extra_links
  File "/opt/airflow/airflow/serialization/serialized_objects.py", line 999, in _serialize_operator_extra_links
    for operator_extra_link in operator_extra_links:
TypeError: 'property' object is not iterable

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/airflow/airflow/serialization/serialized_objects.py", line 1175, in to_dict
    json_dict = {"__version": cls.SERIALIZER_VERSION, "dag": cls.serialize_dag(var)}
  File "/opt/airflow/airflow/serialization/serialized_objects.py", line 1083, in serialize_dag
    raise SerializationError(f'Failed to serialize DAG {dag.dag_id!r}: {e}')
airflow.exceptions.SerializationError: Failed to serialize DAG 'batchop_dtm': 'property' object is not iterable

What you think should happen instead

Users should be able to use Dynamic Task Mapping to generate BatchOperator tasks without a DAG import/serialization error.

How to reproduce

  1. Create a DAG similar to the following in which BatchOperator tasks are dynamically-mapped. Note this is a "toy" example, but it should be applicable to more "real-world" use cases.
from pendulum import datetime

from airflow.decorators import dag
from airflow.providers.amazon.aws.operators.batch import BatchOperator


@dag(start_date=datetime(2022, 1, 1), schedule_interval=None)
def batchop_dtm():
    BatchOperator.partial(
        task_id='submit_batch_job',
        job_queue="batch_job_queue_name",
        job_definition="batch_job_definition_name",
        overrides={},
        # Set this flag to False, so we can test the sensor below
        wait_for_completion=False,
    ).expand(job_name=["job_1", "job_2", "job_3"])


_ = batchop_dtm()
  1. Startup an Airflow environment using Breeze: breeze start-airflow
  2. The following DAG import error is generated:

image

Anything else

A similar issue was created previously with related fixes in #24676 and #25215.

I suspect the same behavior would occur using the BigQueryExecuteQueryOperator as well.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions