Skip to content

Conversation

@ashb
Copy link
Member

@ashb ashb commented Jan 29, 2022

In order to do this there were a few required:

  • The BackfillJob needs to know about mapped tasks, both to expand them, and in order to update it's TI tracking
  • The DebugExecutor, task_command.py, LocalTaskJob needed to "unmap" the mapped task to get the real operator back at the right time.

I was testing this with the following dag:

from airflow import DAG
from airflow.decorators import task
from airflow.operators.python import PythonOperator
import pendulum

@task
def make_list():
    return list(map(lambda a: f'echo "{a!r}"', [1, 2, {'a': 'b'}]))

def consumer(*args):
     print(repr(args))

with DAG(dag_id='maptest', start_date=pendulum.DateTime(2022, 1, 18)) as dag:
    PythonOperator(task_id='consumer', python_callable=consumer).map(op_args=make_list())

It can't "unmap" decorated operators successfully yet, so we're using old-school PythonOperator

We also just pass the whole value to the operator, not just the current mapping value(s)


^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.

@boring-cyborg boring-cyborg bot added the area:Scheduler including HA (high availability) scheduler label Jan 29, 2022
@ashb
Copy link
Member Author

ashb commented Jan 29, 2022

No tests yet -- that's what I'm working on now.

Comment on lines 82 to 83
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it’d be nicer to also implement unmap() on BaseOperator and have it simply return self.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made it raise an exception (as it should never be called?) but exist for typing.

I could have it return self and then remove the if task.is_mapped -- WDYT?

Copy link
Member

@uranusjr uranusjr Feb 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My OOP training tells me polymorphism is better than an if clause, so my vote goes to return self. But ultimately I guess either makes sense (depending on how your mental model is of the logic).

@ashb ashb force-pushed the run-mapped-tasks-debug-executor branch from 41d5177 to 083a26c Compare February 2, 2022 13:14
@ashb ashb marked this pull request as ready for review February 2, 2022 13:22
Copy link
Member

@uranusjr uranusjr Feb 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this guaranteed (and why?) or just a temporary measure to get pass Mypy?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Temporary for now until we support mapping task groups (the DAG api supports it, but the runtime doesn't yet)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The why: Because expand_mapped_task doesn't exist on BaseOperator, only MappedOperator

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense

@github-actions
Copy link

github-actions bot commented Feb 3, 2022

The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.

@github-actions github-actions bot added the full tests needed We need to run full set of tests for this PR to merge label Feb 3, 2022
ashb and others added 7 commits February 3, 2022 11:35
In order to do this there were two steps required:

- The BackfillJob needs to know about mapped tasks, both to expand them,
  and in order to update it's TI tracking
- The DebugExecutor needed to "unmap" the mapped task to get the real
  operator bacj

I was testing this with the following dag:

```
from airflow import DAG
from airflow.decorators import task
from airflow.operators.python import PythonOperator
import pendulum

@task
def make_list():
    return list(map(lambda a: f'echo "{a!r}"', [1, 2, {'a': 'b'}]))

def consumer(*args):
     print(repr(args))

with DAG(dag_id='maptest', start_date=pendulum.DateTime(2022, 1, 18)) as dag:
    PythonOperator(task_id='consumer', python_callable=consumer).map(op_args=make_list())
```

It can't "unmap" decorated operators successfully yet, so we're using
old-school PythonOperator

We also just pass the whole value to the operator, not just the current
mapping value(s)
And since TaskGroup is a DAGNode, we don't need to store parent group
directly anymore -- it'll already be stored
@ashb ashb force-pushed the run-mapped-tasks-debug-executor branch from d6b04d7 to c915beb Compare February 3, 2022 11:35
@ashb
Copy link
Member Author

ashb commented Feb 3, 2022

Not to self: if you want CI to run a test dag file, it helps if you commit it too.

@ashb
Copy link
Member Author

ashb commented Feb 3, 2022

A lot of SSH timeouts, and the MSSQL failures.

@uranusjr
Copy link
Member

uranusjr commented Feb 3, 2022

Yeah those SSH things are popping up a lot recently, not sure what had changed 😞

@potiuk
Copy link
Member

potiuk commented Feb 3, 2022

Yeah those SSH things are popping up a lot recently, not sure what had changed disappointed

Yeah. I tried to address it via #21262 and it seemed to help a bit but I believe this is a side effect of some unrelated test, which we should track down and fix.

@ashb
Copy link
Member Author

ashb commented Feb 3, 2022

Maybe have had some un-expected luck via #21292 (that, or what ever environmental issue was causing it has vanished)

Co-authored-by: Tzu-ping Chung <[email protected]>
@ashb
Copy link
Member Author

ashb commented Feb 4, 2022

Right, only SSH failures (which I have a PR open for) so I'm going to merge this now.

@ashb ashb merged commit 6fc6edf into apache:main Feb 4, 2022
@ashb ashb deleted the run-mapped-tasks-debug-executor branch February 4, 2022 14:24
@jedcunningham jedcunningham added the changelog:skip Changes that should be skipped from the changelog (CI, tests, etc..) label Mar 1, 2022
@jedcunningham jedcunningham added this to the Airflow 2.3.0 milestone Apr 26, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:dynamic-task-mapping AIP-42 area:Scheduler including HA (high availability) scheduler changelog:skip Changes that should be skipped from the changelog (CI, tests, etc..) full tests needed We need to run full set of tests for this PR to merge

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants