-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Perform "mini scheduling run" after task has finished #11589
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Perform "mini scheduling run" after task has finished #11589
Conversation
|
The Workflow run is cancelling this PR. It in earlier duplicate of 1029499 run. |
|
The Workflow run is cancelling this PR. It in earlier duplicate of 2794935 run. |
|
The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks$,^Build docs$,^Spell check docs$,^Backport packages$,^Checks: Helm tests$,^Test OpenAPI*. |
airflow/jobs/scheduler_job.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems to me that we should also move the comment. Now it has lost context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment doesn't make sense when called on a (instance) method on DagRun, as that almost by definition only operators on a single dag run. The comment is kept here in the scheduler because that's where might think we want to batch the queries up, but shouldn't.
airflow/models/dagrun.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason we need to use TypedDict? NamedTuple is much easier to use in many cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No reason -- just happened to be the example we were looking at in PoolStats and copied that.
6a08a30 to
4a1d48d
Compare
|
The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks$,^Build docs$,^Spell check docs$,^Backport packages$,^Checks: Helm tests$,^Test OpenAPI*. |
58894bc to
df73a15
Compare
|
The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*. |
ashb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can also possibly reduce the duplication in tests between test_dagrun_fast_follow and test_dagrun_fast_follow_deactiveated by using @parameterized.expand and with conf_vars(...):
Oh also, these new tests should probably be in tests/models/test_taskinstance.py -- as the code we are testing is in TaskInstance primarily.
tests/models/test_dagrun.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| task_instance_a.task = dag.get_task(task_a.task_id) | |
| task_instance_a.task = task_a |
tests/models/test_dagrun.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| task_instance_b.task = dag.get_task(task_b.task_id) | |
| task_instance_b.task = task_b |
tests/models/test_dagrun.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this block? I think this test would be clearer if we instead just directly set task_instance_a to a runnable state: For example:
task_instance_a.state = State.QUEUED
session.commit()My reason here is that this is the "pre-condition/setup" for the test, not part of what are actually testing here, so by having these asserts and calling the scheduler job code we are not testing this feature in isolation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(For this to work the TI would need to be attached to the session you would need to pass session=session to dag_run.get_task_instance)
a2cc203 to
929e47d
Compare
|
The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*. |
929e47d to
d12b898
Compare
|
Whoops, alot of tests are failing: https://github.com/apache/airflow/pull/11589/checks?check_run_id=1330520635 |
1c060f0 to
78668d9
Compare
airflow/models/dag.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| if include_direct_parents and not include_upstream: | |
| also_include += t.get_flat_relatives(upstream=True, recurse=False) | |
| elif include_direct_parents: | |
| also_include += t.upstream_list |
And then we don't need to add recurse=True parameter to get_flat_relatives (the "flat" part is to do the flattening/recursing part)
airflow/models/dagrun.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| class _TISchedulingDecision(NamedTuple): | |
| class TISchedulingDecision(NamedTuple): |
I don't know why I made this "private" -- and it looks a bit odd to use it in a return value as it was
tests/models/test_dagrun.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this still needed, or can we revert the changes to this file?
tests/models/test_taskinstance.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add a comment here saying what we're testing, why we expect B to not be scheduled etc -- this will help Future Us
b407901 to
c5b046b
Compare
c5b046b to
21d6b47
Compare
|
The PR needs to run all tests because it modifies core of Airflow! Please rebase it to latest master or ask committer to re-run it! |
airflow/models/taskinstance.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about encapsulating this into separate method? The _run_raw_task is already a long one 😉
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that's a good point. Will do.
airflow/models/taskinstance.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I user I would be worried about seeing such info logs. Should it be debug?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it depends what the exception was
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It always will be a "database exception" which is rather critical one imho. And here we are telling users "your database refused something but you don't have to worry about it". I think we either have to make it less "critical" like Skipping mini scheduling run due to exception: %s. But still, logging the exception will show problem with database...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The most likely case I expect here is a "cannot reach DB" network error.
But yeah, I like your message better.
airflow/models/dagrun.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will it work for operators that inherit from DummyOperator?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, there is already an issue for that though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
airflow/models/dagrun.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this comment still valid?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, not any more. Good catch
In order to further reduce intra-dag task scheduling lag we add an optimization: when a task has just finished executing (success or failure) we can look at the downstream tasks of just that task, and then make scheduling decisions for those tasks there -- we've already got the dag loaded, and we know they are likely actionable as we just finished. We should set tasks to scheduled if we can (but no further, i.e. not to queued, as the scheduler has to make that decision with info about the Pool usage etc.). Co-authored-by: Ash Berlin-Taylor <[email protected]>
21d6b47 to
613da86
Compare
| session.commit() | ||
|
|
||
| self._run_mini_scheduler_on_child_tasks(session) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may happened that we will do rollback on session we already committed, is is expected @jhtimmins ?
closes #11498
In order to further reduce intra-dag task scheduling lag we add an
optimization: when a task has just finished executing (success or
failure) we can look at the downstream tasks of just that task, and then
make scheduling decisions for those tasks there -- we've already got the
dag loaded, and we know they are likely actionable as we just finished.
We should set tasks to scheduled if we can (but no further, i.e. not to
queued, as the scheduler has to make that decision with info about the
Pool usage etc.).
Co-authored-by: Ash Berlin-Taylor [email protected]
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.