Skip to content

Unable to access operator attrs within Jinja context for mapped tasks #24388

@josh-fell

Description

@josh-fell

Apache Airflow version

2.3.2 (latest released)

What happened

When attempting to generate mapped SQL tasks using a Jinja-templated query that access operator attributes, an exception like the following is thrown:

jinja2.exceptions.UndefinedError: 'airflow.models.mappedoperator.MappedOperator object' has no attribute '<operator attribute>'

For example, when attempting to map SQLValueCheckOperator tasks with respect to database using a query of SELECT COUNT(*) FROM {{ task.database }}.tbl;:
jinja2.exceptions.UndefinedError: 'airflow.models.mappedoperator.MappedOperator object' has no attribute 'database'

Or, when using SnowflakeOperator and mapping via parameters of a query like SELECT * FROM {{ task.parameters.tbl }};:
jinja2.exceptions.UndefinedError: 'airflow.models.mappedoperator.MappedOperator object' has no attribute 'parameters'

What you think should happen instead

When using Jinja-template SQL queries, the attribute that is being using for the mapping should be accessible via {{ task.<operator attribute> }}. Executing the same SQL query with classic, non-mapped tasks allows for this operator attr access from the task context object.

Ideally, the same interface should apply for both non-mapped and mapped tasks. Also with the preference of using parameters over params in SQL-type operators, having the ability to map over parameters will help folks move from using params to parameters.

How to reproduce

Consider the following DAG:

from pendulum import datetime

from airflow.decorators import dag
from airflow.operators.sql import SQLValueCheckOperator
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator


CORE_SQL = "SELECT COUNT(*) FROM {{ task.database }}.tbl;"
SNOWFLAKE_SQL = """SELECT * FROM {{ task.parameters.tbl }};"""


@dag(dag_id="map-city", start_date=datetime(2022, 6, 7), schedule_interval=None)
def map_city():
        classic_sql_value_check = SQLValueCheckOperator(
        task_id="classic_sql_value_check",
        conn_id="snowflake",
        sql=CORE_SQL,
        database="dev",
        pass_value=20000,
    )

    mapped_value_check = SQLValueCheckOperator.partial(
        task_id="check_row_count",
        conn_id="snowflake",
        sql=CORE_SQL,
        pass_value=20000,
    ).expand(database=["dev", "production"])

    classic_snowflake_task = SnowflakeOperator(
        task_id="classic_snowflake_task",
        snowflake_conn_id="snowflake",
        sql=SNOWFLAKE_SQL,
        parameters={"tbl": "foo"},
    )

    mapped_snowflake_task = SnowflakeOperator.partial(
        task_id="mapped_snowflake_task", snowflake_conn_id="snowflake", sql=SNOWFLAKE_SQL
    ).expand(
        parameters=[
            {"tbl": "foo"},
            {"tbl": "bar"},
        ]
    )


_ = map_city()

SQLValueCheckOperator tasks
The logs for the "classic_sql_value_check", non-mapped task show the query executing as expected:
[2022-06-11, 02:01:03 UTC] {sql.py:204} INFO - Executing SQL check: SELECT COUNT(*) FROM dev.tbl;
while the mapped "check_row_count" task fails with the following exception:

[2022-06-11, 02:01:03 UTC] {standard_task_runner.py:79} INFO - Running: ['airflow', 'tasks', 'run', 'map-city', 'check_row_count', 'manual__2022-06-11T02:01:01.831761+00:00', '--job-id', '350', '--raw', '--subdir', 'DAGS_FOLDER/map_city.py', '--cfg-path', '/tmp/tmpm5bg9mt5', '--map-index', '0', '--error-file', '/tmp/tmp2kbilt2l']
[2022-06-11, 02:01:03 UTC] {standard_task_runner.py:80} INFO - Job 350: Subtask check_row_count
[2022-06-11, 02:01:03 UTC] {task_command.py:370} INFO - Running <TaskInstance: map-city.check_row_count manual__2022-06-11T02:01:01.831761+00:00 map_index=0 [running]> on host 569596df5be5
[2022-06-11, 02:01:03 UTC] {taskinstance.py:1889} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1451, in _run_raw_task
    self._execute_task_with_callbacks(context, test_mode)
  File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1555, in _execute_task_with_callbacks
    task_orig = self.render_templates(context=context)
  File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 2212, in render_templates
    rendered_task = self.task.render_template_fields(context)
  File "/usr/local/lib/python3.9/site-packages/airflow/models/mappedoperator.py", line 726, in render_template_fields
    self._do_render_template_fields(
  File "/usr/local/lib/python3.9/site-packages/airflow/utils/session.py", line 68, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/airflow/models/abstractoperator.py", line 344, in _do_render_template_fields
    rendered_content = self.render_template(
  File "/usr/local/lib/python3.9/site-packages/airflow/models/abstractoperator.py", line 391, in render_template
    return render_template_to_string(template, context)
  File "/usr/local/lib/python3.9/site-packages/airflow/utils/helpers.py", line 296, in render_template_to_string
    return render_template(template, context, native=False)
  File "/usr/local/lib/python3.9/site-packages/airflow/utils/helpers.py", line 291, in render_template
    return "".join(nodes)
  File "<template>", line 13, in root
  File "/usr/local/lib/python3.9/site-packages/jinja2/runtime.py", line 903, in _fail_with_undefined_error
    raise self._undefined_exception(self._undefined_message)
jinja2.exceptions.UndefinedError: 'airflow.models.mappedoperator.MappedOperator object' has no attribute 'database'

SnowflakeOperator tasks
Similarly, the "classic_snowflake_task" non-mapped task is able to execute the SQL query as expected:
[2022-06-11, 02:01:04 UTC] {snowflake.py:324} INFO - Running statement: SELECT * FROM foo;, parameters: {'tbl': 'foo'}
while the mapped "mapped_snowflake_task task fails to execute the query:

[2022-06-11, 02:01:03 UTC] {standard_task_runner.py:79} INFO - Running: ['airflow', 'tasks', 'run', 'map-city', 'mapped_snowflake_task', 'manual__2022-06-11T02:01:01.831761+00:00', '--job-id', '347', '--raw', '--subdir', 'DAGS_FOLDER/map_city.py', '--cfg-path', '/tmp/tmp6kmqs5ew', '--map-index', '0', '--error-file', '/tmp/tmpkufg9xqx']
[2022-06-11, 02:01:03 UTC] {standard_task_runner.py:80} INFO - Job 347: Subtask mapped_snowflake_task
[2022-06-11, 02:01:03 UTC] {task_command.py:370} INFO - Running <TaskInstance: map-city.mapped_snowflake_task manual__2022-06-11T02:01:01.831761+00:00 map_index=0 [running]> on host 569596df5be5
[2022-06-11, 02:01:03 UTC] {taskinstance.py:1889} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1451, in _run_raw_task
    self._execute_task_with_callbacks(context, test_mode)
  File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1555, in _execute_task_with_callbacks
    task_orig = self.render_templates(context=context)
  File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 2212, in render_templates
    rendered_task = self.task.render_template_fields(context)
  File "/usr/local/lib/python3.9/site-packages/airflow/models/mappedoperator.py", line 726, in render_template_fields
    self._do_render_template_fields(
  File "/usr/local/lib/python3.9/site-packages/airflow/utils/session.py", line 68, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/airflow/models/abstractoperator.py", line 344, in _do_render_template_fields
    rendered_content = self.render_template(
  File "/usr/local/lib/python3.9/site-packages/airflow/models/abstractoperator.py", line 391, in render_template
    return render_template_to_string(template, context)
  File "/usr/local/lib/python3.9/site-packages/airflow/utils/helpers.py", line 296, in render_template_to_string
    return render_template(template, context, native=False)
  File "/usr/local/lib/python3.9/site-packages/airflow/utils/helpers.py", line 291, in render_template
    return "".join(nodes)
  File "<template>", line 13, in root
  File "/usr/local/lib/python3.9/site-packages/jinja2/sandbox.py", line 326, in getattr
    value = getattr(obj, attribute)
  File "/usr/local/lib/python3.9/site-packages/jinja2/runtime.py", line 910, in __getattr__
    return self._fail_with_undefined_error()
  File "/usr/local/lib/python3.9/site-packages/jinja2/runtime.py", line 903, in _fail_with_undefined_error
    raise self._undefined_exception(self._undefined_message)
jinja2.exceptions.UndefinedError: 'airflow.models.mappedoperator.MappedOperator object' has no attribute 'parameters'

Operating System

Debian GNU/Linux 10 (buster)

Versions of Apache Airflow Providers

apache-airflow-providers-snowflake==2.7.0

Deployment

Astronomer

Deployment details

Astronomer Runtime 5.0.3

Anything else

Even though using the {{ task.<operator attr> }} method does not work for mapped tasks, there is a workaround. Given the SnowflakeOperator example from above attempting to execute the query: SELECT * FROM {{ task.parameters.tbl }};, users can modify the templated query to SELECT * FROM {{ task.mapped_kwargs.parameters[ti.map_index].tbl }}; for successful execution. This workaround isn't very obvious though and requires from solid digging into the new 2.3.0 code.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions