Skip to content

Conversation

@jhtimmins
Copy link
Contributor

@jhtimmins jhtimmins commented Oct 16, 2020

closes #11498

In order to further reduce intra-dag task scheduling lag we add an
optimization: when a task has just finished executing (success or
failure) we can look at the downstream tasks of just that task, and then
make scheduling decisions for those tasks there -- we've already got the
dag loaded, and we know they are likely actionable as we just finished.

We should set tasks to scheduled if we can (but no further, i.e. not to
queued, as the scheduler has to make that decision with info about the
Pool usage etc.).

Co-authored-by: Ash Berlin-Taylor [email protected]


^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.

@boring-cyborg boring-cyborg bot added the area:Scheduler including HA (high availability) scheduler label Oct 16, 2020
@jhtimmins jhtimmins closed this Oct 16, 2020
@jhtimmins jhtimmins reopened this Oct 16, 2020
@jhtimmins jhtimmins changed the title Perform "mini scheduling run" after task has finished WIP: Perform "mini scheduling run" after task has finished Oct 16, 2020
@github-actions
Copy link

The Workflow run is cancelling this PR. It in earlier duplicate of 1029499 run.

@github-actions
Copy link

The Workflow run is cancelling this PR. It in earlier duplicate of 2794935 run.

@github-actions
Copy link

The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks$,^Build docs$,^Spell check docs$,^Backport packages$,^Checks: Helm tests$,^Test OpenAPI*.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me that we should also move the comment. Now it has lost context.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment doesn't make sense when called on a (instance) method on DagRun, as that almost by definition only operators on a single dag run. The comment is kept here in the scheduler because that's where might think we want to batch the queries up, but shouldn't.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason we need to use TypedDict? NamedTuple is much easier to use in many cases.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No reason -- just happened to be the example we were looking at in PoolStats and copied that.

@kaxil kaxil added the AIP-15 label Oct 22, 2020
@jhtimmins jhtimmins force-pushed the mini-scheduler-after-task-completed branch from 6a08a30 to 4a1d48d Compare October 27, 2020 05:40
@github-actions
Copy link

The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks$,^Build docs$,^Spell check docs$,^Backport packages$,^Checks: Helm tests$,^Test OpenAPI*.

@jhtimmins jhtimmins changed the title WIP: Perform "mini scheduling run" after task has finished Perform "mini scheduling run" after task has finished Oct 27, 2020
@jhtimmins jhtimmins force-pushed the mini-scheduler-after-task-completed branch from 58894bc to df73a15 Compare October 28, 2020 03:20
@github-actions
Copy link

The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.

Copy link
Member

@ashb ashb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also possibly reduce the duplication in tests between test_dagrun_fast_follow and test_dagrun_fast_follow_deactiveated by using @parameterized.expand and with conf_vars(...):

Oh also, these new tests should probably be in tests/models/test_taskinstance.py -- as the code we are testing is in TaskInstance primarily.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
task_instance_a.task = dag.get_task(task_a.task_id)
task_instance_a.task = task_a

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
task_instance_b.task = dag.get_task(task_b.task_id)
task_instance_b.task = task_b

Comment on lines 262 to 271
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this block? I think this test would be clearer if we instead just directly set task_instance_a to a runnable state: For example:

        task_instance_a.state = State.QUEUED
        session.commit()

My reason here is that this is the "pre-condition/setup" for the test, not part of what are actually testing here, so by having these asserts and calling the scheduler job code we are not testing this feature in isolation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(For this to work the TI would need to be attached to the session you would need to pass session=session to dag_run.get_task_instance)

@jhtimmins jhtimmins force-pushed the mini-scheduler-after-task-completed branch 2 times, most recently from a2cc203 to 929e47d Compare October 29, 2020 18:50
@github-actions
Copy link

The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.

@jhtimmins jhtimmins force-pushed the mini-scheduler-after-task-completed branch from 929e47d to d12b898 Compare October 30, 2020 05:47
@kaxil
Copy link
Member

kaxil commented Oct 30, 2020

@jhtimmins jhtimmins force-pushed the mini-scheduler-after-task-completed branch 2 times, most recently from 1c060f0 to 78668d9 Compare October 31, 2020 01:27
Comment on lines 1446 to 1447
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if include_direct_parents and not include_upstream:
also_include += t.get_flat_relatives(upstream=True, recurse=False)
elif include_direct_parents:
also_include += t.upstream_list

And then we don't need to add recurse=True parameter to get_flat_relatives (the "flat" part is to do the flattening/recursing part)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
class _TISchedulingDecision(NamedTuple):
class TISchedulingDecision(NamedTuple):

I don't know why I made this "private" -- and it looks a bit odd to use it in a return value as it was

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this still needed, or can we revert the changes to this file?

Comment on lines 1778 to 1782
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a comment here saying what we're testing, why we expect B to not be scheduled etc -- this will help Future Us

@jhtimmins jhtimmins force-pushed the mini-scheduler-after-task-completed branch from b407901 to c5b046b Compare November 1, 2020 00:11
@jhtimmins jhtimmins force-pushed the mini-scheduler-after-task-completed branch from c5b046b to 21d6b47 Compare November 2, 2020 15:19
@paolaperaza paolaperaza added this to the Airflow 2.0.0-beta1 milestone Nov 2, 2020
@github-actions
Copy link

github-actions bot commented Nov 2, 2020

The PR needs to run all tests because it modifies core of Airflow! Please rebase it to latest master or ask committer to re-run it!

@github-actions github-actions bot added the full tests needed We need to run full set of tests for this PR to merge label Nov 2, 2020
Comment on lines 1142 to 1194
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about encapsulating this into separate method? The _run_raw_task is already a long one 😉

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that's a good point. Will do.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I user I would be worried about seeing such info logs. Should it be debug?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it depends what the exception was

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It always will be a "database exception" which is rather critical one imho. And here we are telling users "your database refused something but you don't have to worry about it". I think we either have to make it less "critical" like Skipping mini scheduling run due to exception: %s. But still, logging the exception will show problem with database...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The most likely case I expect here is a "cannot reach DB" network error.

But yeah, I like your message better.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it work for operators that inherit from DummyOperator?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, there is already an issue for that though

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this comment still valid?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, not any more. Good catch

jhtimmins and others added 13 commits November 2, 2020 12:35
In order to further reduce intra-dag task scheduling lag we add an
optimization: when a task has just finished executing (success or
failure) we can look at the downstream tasks of just that task, and then
make scheduling decisions for those tasks there -- we've already got the
dag loaded, and we know they are likely actionable as we just finished.

We should set tasks to scheduled if we can (but no further, i.e. not to
queued, as the scheduler has to make that decision with info about the
Pool usage etc.).

Co-authored-by: Ash Berlin-Taylor <[email protected]>
@jhtimmins jhtimmins force-pushed the mini-scheduler-after-task-completed branch from 21d6b47 to 613da86 Compare November 2, 2020 20:36
@ashb ashb merged commit eea6c4f into apache:master Nov 3, 2020
Comment on lines 1140 to +1142
session.commit()

self._run_mini_scheduler_on_child_tasks(session)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may happened that we will do rollback on session we already committed, is is expected @jhtimmins ?

@ashb ashb deleted the mini-scheduler-after-task-completed branch November 12, 2020 12:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:Scheduler including HA (high availability) scheduler full tests needed We need to run full set of tests for this PR to merge

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Perform "mini scheduling run" after task has finished

7 participants