-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Rewrite decorated task mapping #21328
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
af50c6b to
6991022
Compare
b52938c to
f963a48
Compare
|
The serialization needs some work for this approach. Here is an incomplete test: def test_mapped_decorator_serde():
from airflow.models.xcom_arg import XComArg
from airflow.decorators import task
with DAG("test-dag", start_date=datetime(2020, 1, 1)) as dag:
task1 = BaseOperator(task_id="op1")
xcomarg = XComArg(task1, "test_key")
@task(retry_delay=30)
def x(arg1, arg2):
...
real_op = x.partial(arg1=1).map(arg2=xcomarg).operator
serialized = SerializedBaseOperator._serialize(real_op)
assert serialized == {
'_is_dummy': False,
'_is_mapped': True,
'_task_module': 'airflow.decorators.python',
'_task_type': '_PythonDecoratedOperator',
'downstream_task_ids': [],
'partial_kwargs': {
'multiple_outputs': False,
'op_args': [],
'op_kwargs': {
'arg1': [
1,
2,
{"__type": "dict", "__var": {'a': 'b'}},
],
},
'retry_delay': 30,
},
'mapped_kwargs': {
'op_args': [],
# We don't need the __type/__var here!
'op_kwargs': {
'arg2': {'__type': 'xcomref', '__var': {'task_id': 'op1', 'key': 'test_key'}},
}
},
'task_id': 'x',
'template_ext': [],
'template_fields': ['op_args', 'op_kwargs'],
# We don't want to include the python source code in the serialized representation
# TODO? Where does `retry_delay` go? We need to separate
}
op = SerializedBaseOperator.deserialize_operator(serialized)
assert isinstance(op, MappedOperator)
assert op.deps is MappedOperator.DEFAULT_DEPS
# TODO: add some more asserts hereThe serialization Trying to call |
f963a48 to
511f81e
Compare
|
Serialisation format is revised. The KeyError is also fixed by implementing slightly smarter logic to merge |
|
We should also implement some validation when a DAG is parsed to make sure the user pass reasonable values to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| assert deserialized.retry_delay == timedelta(seconds=30) |
(give or take.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At this point retry_delay is still 30 verbatim; it become a timedelta only after unmapped. I’ll add a test elsewhere for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be deserialized as a timedelta -- it's fine for this value since the constructor for BaseOperator handles it, but other variables might behave differently
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
start_date for instance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a test in tests/decorators/test_python.py for this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
start_datefor instance.
Hm some refactoring would be called for to extract the logic out of BaseOperator for reuse. (Note that this affects non-decorator MappedOperator as well.) I think this should be done in a separate PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah agreed. We can fix this up later, this unblocks a lot.
Mapping a traditional operator (where arguments go to the operator) and a task flow operator (where arguments go to the *function*) have very different semantics, so we need some special code for them.
Previously we were merging partial_kwargs and mapped_kwargs too naively and did not correctly handle op_args and op_kwargs; those need special logic due to the mapping semantics of decorated tasks.
f4da368 to
1bd6eb4
Compare
Some attributes are removed from serialization to match the format of the (unmapped) _PythonDecoratedOperator. Some simplication is implemented to op_kwargs to save some space.
1bd6eb4 to
0d3c624
Compare
|
The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease. |
|
Static check failures fixed in #21480. |
Need to get #21210 merged first and rebase this.DoneThis rewrites how
map()is implemented on_TaskDecorator. The previous implementation incorrectly assumed mapping a task decorator works like mapping a traditional operator, but in fact they have entirely different semantics.When mapping a traditional operator,
FooOperator.map(my_arg=[1, 2, 3]), the argument onFooOperatoris mapped. But when mapping a task decorator,my_task.map(my_val=[1, 2, 3]), it’s the argument on the function wrapped by the task object being mapped. Therefore, mapped (and also partial-ed) values formy_valshould go into theDecoratedOperator’sop_kwargs(andop_args) arguments instead.An end-to-end test is provided to validate the simplest case of this. Also fixed a test where the end-to-end test becomes broken after running a DAG (because I need to run a second DAG!)