-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Make airflow dags test be able to execute Mapped Tasks
#21210
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
No tests yet -- that's what I'm working on now. |
airflow/executors/debug_executor.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if it’d be nicer to also implement unmap() on BaseOperator and have it simply return self.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made it raise an exception (as it should never be called?) but exist for typing.
I could have it return self and then remove the if task.is_mapped -- WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My OOP training tells me polymorphism is better than an if clause, so my vote goes to return self. But ultimately I guess either makes sense (depending on how your mental model is of the logic).
41d5177 to
083a26c
Compare
airflow/jobs/backfill_job.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this guaranteed (and why?) or just a temporary measure to get pass Mypy?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Temporary for now until we support mapping task groups (the DAG api supports it, but the runtime doesn't yet)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The why: Because expand_mapped_task doesn't exist on BaseOperator, only MappedOperator
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense
|
The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease. |
In order to do this there were two steps required: - The BackfillJob needs to know about mapped tasks, both to expand them, and in order to update it's TI tracking - The DebugExecutor needed to "unmap" the mapped task to get the real operator bacj I was testing this with the following dag: ``` from airflow import DAG from airflow.decorators import task from airflow.operators.python import PythonOperator import pendulum @task def make_list(): return list(map(lambda a: f'echo "{a!r}"', [1, 2, {'a': 'b'}])) def consumer(*args): print(repr(args)) with DAG(dag_id='maptest', start_date=pendulum.DateTime(2022, 1, 18)) as dag: PythonOperator(task_id='consumer', python_callable=consumer).map(op_args=make_list()) ``` It can't "unmap" decorated operators successfully yet, so we're using old-school PythonOperator We also just pass the whole value to the operator, not just the current mapping value(s)
And since TaskGroup is a DAGNode, we don't need to store parent group directly anymore -- it'll already be stored
Co-authored-by: Tzu-ping Chung <[email protected]>
d6b04d7 to
c915beb
Compare
|
Not to self: if you want CI to run a test dag file, it helps if you commit it too. |
|
A lot of SSH timeouts, and the MSSQL failures. |
|
Yeah those SSH things are popping up a lot recently, not sure what had changed 😞 |
Yeah. I tried to address it via #21262 and it seemed to help a bit but I believe this is a side effect of some unrelated test, which we should track down and fix. |
|
Maybe have had some un-expected luck via #21292 (that, or what ever environmental issue was causing it has vanished) |
Co-authored-by: Tzu-ping Chung <[email protected]>
|
Right, only SSH failures (which I have a PR open for) so I'm going to merge this now. |
In order to do this there were a few required:
I was testing this with the following dag:
It can't "unmap" decorated operators successfully yet, so we're using old-school PythonOperator
We also just pass the whole value to the operator, not just the current mapping value(s)
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.