Skip to content

Conversation

@uranusjr
Copy link
Member

@uranusjr uranusjr commented Feb 4, 2022

Need to get #21210 merged first and rebase this. Done

This rewrites how map() is implemented on _TaskDecorator. The previous implementation incorrectly assumed mapping a task decorator works like mapping a traditional operator, but in fact they have entirely different semantics.

When mapping a traditional operator, FooOperator.map(my_arg=[1, 2, 3]), the argument on FooOperator is mapped. But when mapping a task decorator, my_task.map(my_val=[1, 2, 3]), it’s the argument on the function wrapped by the task object being mapped. Therefore, mapped (and also partial-ed) values for my_val should go into the DecoratedOperator’s op_kwargs (and op_args) arguments instead.

An end-to-end test is provided to validate the simplest case of this. Also fixed a test where the end-to-end test becomes broken after running a DAG (because I need to run a second DAG!)

@boring-cyborg boring-cyborg bot added area:CLI provider:cncf-kubernetes Kubernetes (k8s) provider related issues area:Scheduler including HA (high availability) scheduler area:serialization labels Feb 4, 2022
@uranusjr uranusjr marked this pull request as ready for review February 4, 2022 14:32
@uranusjr uranusjr force-pushed the decorator-unmap branch 6 times, most recently from b52938c to f963a48 Compare February 6, 2022 12:11
@ashb
Copy link
Member

ashb commented Feb 7, 2022

The serialization needs some work for this approach. Here is an incomplete test:

def test_mapped_decorator_serde():
    from airflow.models.xcom_arg import XComArg
    from airflow.decorators import task

    with DAG("test-dag", start_date=datetime(2020, 1, 1)) as dag:
        task1 = BaseOperator(task_id="op1")
        xcomarg = XComArg(task1, "test_key")

        @task(retry_delay=30)
        def x(arg1, arg2):
            ...

    real_op = x.partial(arg1=1).map(arg2=xcomarg).operator

    serialized = SerializedBaseOperator._serialize(real_op)

    assert serialized == {
        '_is_dummy': False,
        '_is_mapped': True,
        '_task_module': 'airflow.decorators.python',
        '_task_type': '_PythonDecoratedOperator',
        'downstream_task_ids': [],
        'partial_kwargs': {
            'multiple_outputs': False,
            'op_args': [],
            'op_kwargs': {
                'arg1': [
                    1,
                    2,
                    {"__type": "dict", "__var": {'a': 'b'}},
                ],
            },
            'retry_delay': 30,
        },
        'mapped_kwargs': {
            'op_args': [],
            # We don't need the __type/__var here!
            'op_kwargs': {
                'arg2': {'__type': 'xcomref', '__var': {'task_id': 'op1', 'key': 'test_key'}},
            }
        },
        'task_id': 'x',
        'template_ext': [],
        'template_fields': ['op_args', 'op_kwargs'],
        # We don't want to include the python source code in the serialized representation
        # TODO? Where does `retry_delay` go? We need to separate 
    }


    op = SerializedBaseOperator.deserialize_operator(serialized)
    assert isinstance(op, MappedOperator)
    assert op.deps is MappedOperator.DEFAULT_DEPS

    # TODO: add some more asserts here

The serialization

Trying to call real_op.unmap() also throws a key error:

tests/serialization/test_dag_serialization.py:1670: in test_mapped_decorator_serde
    real_op.unmap()
airflow/models/baseoperator.py:1882: in unmap
    dag._remove_task(self.task_id)
airflow/models/dag.py:2167: in _remove_task
    task = self.task_dict.pop(task_id)
E   KeyError: 'x'

@uranusjr
Copy link
Member Author

uranusjr commented Feb 9, 2022

Serialisation format is revised. The KeyError is also fixed by implementing slightly smarter logic to merge op_kwargs from mapped_kwargs and partial_kwargs.

@uranusjr
Copy link
Member Author

uranusjr commented Feb 9, 2022

We should also implement some validation when a DAG is parsed to make sure the user pass reasonable values to partial_kwargs. This will be done in a separate PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
assert deserialized.retry_delay == timedelta(seconds=30)

(give or take.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this point retry_delay is still 30 verbatim; it become a timedelta only after unmapped. I’ll add a test elsewhere for this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be deserialized as a timedelta -- it's fine for this value since the constructor for BaseOperator handles it, but other variables might behave differently

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

start_date for instance.

Copy link
Member Author

@uranusjr uranusjr Feb 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a test in tests/decorators/test_python.py for this

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

start_date for instance.

Hm some refactoring would be called for to extract the logic out of BaseOperator for reuse. (Note that this affects non-decorator MappedOperator as well.) I think this should be done in a separate PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah agreed. We can fix this up later, this unblocks a lot.

Mapping a traditional operator (where arguments go to the operator)
and a task flow operator (where arguments go to the *function*) have
very different semantics, so we need some special code for them.
Previously we were merging partial_kwargs and mapped_kwargs too naively
and did not correctly handle op_args and op_kwargs; those need special
logic due to the mapping semantics of decorated tasks.
Some attributes are removed from serialization to match the format
of the (unmapped) _PythonDecoratedOperator. Some simplication is
implemented to op_kwargs to save some space.
@github-actions
Copy link

github-actions bot commented Feb 9, 2022

The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.

@github-actions github-actions bot added the full tests needed We need to run full set of tests for this PR to merge label Feb 9, 2022
@uranusjr
Copy link
Member Author

Static check failures fixed in #21480.

@uranusjr uranusjr merged commit fded2ca into apache:main Feb 10, 2022
@uranusjr uranusjr deleted the decorator-unmap branch February 10, 2022 07:07
ferruzzi pushed a commit to ferruzzi/airflow that referenced this pull request Feb 11, 2022
@jedcunningham jedcunningham added changelog:skip Changes that should be skipped from the changelog (CI, tests, etc..) area:dynamic-task-mapping AIP-42 labels Feb 28, 2022
@jedcunningham jedcunningham added this to the Airflow 2.3.0 milestone Apr 26, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:CLI area:dynamic-task-mapping AIP-42 area:Scheduler including HA (high availability) scheduler area:serialization changelog:skip Changes that should be skipped from the changelog (CI, tests, etc..) full tests needed We need to run full set of tests for this PR to merge provider:cncf-kubernetes Kubernetes (k8s) provider related issues

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants