Skip to content

ExternalTaskMarker with user_defined_macros do not work in Airflow 2.0 or with store_serialized_dags = True in Airflow 1.10.* #13827

@yuqian90

Description

@yuqian90

When ExternalTaskMarker is used together with user_defined_macros, we get an unpleasant traceback when trying to clear tasks across DAGs.

This example (modified from example_dags/example_external_task_marker_dag.py) reproduces the issue. When user click on parent_task and hit Clear, they get the following traceback because the serialized dag obtained by the webserver has no user_defined_macros.

from airflow.models import DAG
from airflow.utils.dates import days_ago
from airflow.operators.bash_operator import BashOperator


def get_child():
    return "child_task1"


from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.sensors.external_task_sensor import ExternalTaskMarker, ExternalTaskSensor

with DAG(
    "example_external_task_marker_parent",
    start_date=days_ago(2),
    user_defined_macros={"get_child": get_child},
) as parent_dag:
    parent_task = ExternalTaskMarker(
        task_id="parent_task",
        external_dag_id="example_external_task_marker_child",
        external_task_id="{{ get_child() }}",
    )

with DAG("example_external_task_marker_child", start_date=days_ago(2)) as child_dag:
    child_task1 = ExternalTaskSensor(
        task_id="child_task1",
        external_dag_id=parent_dag.dag_id,
        external_task_id=parent_task.task_id,
        mode="reschedule",
    )
    child_task2 = DummyOperator(task_id="child_task2")
    child_task1 >> child_task2
Traceback (most recent call last):
  File "/python/site-packages/flask/app.py", line 2447, in wsgi_app
    response = self.full_dispatch_request()
  File "/python/site-packages/flask/app.py", line 1952, in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "/python/site-packages/flask/app.py", line 1821, in handle_user_exception
    reraise(exc_type, exc_value, tb)
  File "/python/site-packages/flask/_compat.py", line 39, in reraise
    raise value
  File "/python/site-packages/flask/app.py", line 1950, in full_dispatch_request
    rv = self.dispatch_request()
  File "/python/site-packages/flask/app.py", line 1936, in dispatch_request
    return self.view_functions[rule.endpoint](**req.view_args)
  File "/python/site-packages/airflow/www_rbac/decorators.py", line 121, in wrapper
    return f(self, *args, **kwargs)
  File "/python/site-packages/flask_appbuilder/security/decorators.py", line 109, in wraps
    return f(self, *args, **kwargs)
  File "/python/site-packages/airflow/www_rbac/decorators.py", line 56, in wrapper
    return f(*args, **kwargs)
  File "/python/site-packages/airflow/www_rbac/views.py", line 1332, in clear
    return self._clear_dag_tis(dag, start_date, end_date, origin,
  File "/python/site-packages/airflow/www_rbac/views.py", line 1278, in _clear_dag_tis
    tis = dag.clear(
  File "/python/site-packages/airflow/utils/db.py", line 74, in wrapper
    return func(*args, **kwargs)
  File "/python/site-packages/airflow/models/dag.py", line 1086, in clear
    ti.render_templates()
  File "/python/site-packages/airflow/models/taskinstance.py", line 1424, in render_templates
    self.task.render_template_fields(context)
  File "/python/site-packages/airflow/models/baseoperator.py", line 714, in render_template_fields
    self._do_render_template_fields(self, self.template_fields, context, jinja_env, set())
  File "/python/site-packages/airflow/models/baseoperator.py", line 721, in _do_render_template_fields
    rendered_content = self.render_template(content, context, jinja_env, seen_oids)
  File "/python/site-packages/airflow/models/baseoperator.py", line 750, in render_template
    return jinja_env.from_string(content).render(**context)
  File "/python/site-packages/jinja2/asyncsupport.py", line 76, in render
    return original_render(self, *args, **kwargs)
  File "/python/site-packages/jinja2/environment.py", line 1008, in render
    return self.environment.handle_exception(exc_info, True)
  File "/python/site-packages/jinja2/environment.py", line 780, in handle_exception
    reraise(exc_type, exc_value, tb)
  File "/python/site-packages/jinja2/_compat.py", line 37, in reraise
    raise value.with_traceback(tb)
  File "<template>", line 1, in <module>
jinja2.exceptions.UndefinedError: 'get_child' is undefined

Apache Airflow version: Airflow 2.0 or 1.10.* with store_serialized_dags = True

Kubernetes version (if you are using kubernetes) (use kubectl version): NA

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions