-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Fully support running more than one scheduler concurrently #10956
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Cool. Looking at it |
|
We're working on a short doc that will provide an easy way to test the components/PRs of AIP-15 together, and some preliminary benchmarks for performance improvement, hopefully have that out tomorrow. |
airflow/jobs/scheduler_job.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| :rtype: int or None | |
| :rtype: Optional[None] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doc typing uses a different syntax to type comments https://www.sphinx-doc.org/en/master/usage/restructuredtext/domains.html#info-field-lists:
Multiple types in a type field will be linked automatically if separated by the word “or”:
:type an_arg: int or None :vartype a_var: str or int :rtype: float or str
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting, TIL 👌
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function returns tuple or None. The description in the pydocstring is incorrect.
int or None != Optional[Tuple[int, int]]
|
Process note: I intend to keep changes as fixups/separate commits even beyond rebasing/following master, hopefully to to make it easier to re-review this in future. (I.e. I will use |
airflow/models/dagbag.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you tell a little more about it? How do you want serialization of the same object to a database to produce different results? Is it by adding the next_dagrun, next_dagrun_create_after fields to the DAG in the sync_to_db method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ashb can you please take a look here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change was a bit of a hack here, and we'll tidy it up.
Since the scheduler now requires DAG serialization to operate, we need a way for dag bags to Load dags from given files only, and not from the DB, but then also a way to make them write things back to the DB.
We'll pull this out in to a separate PR, essentially removing the STORE_SERIALIZED_DAGS option entirely, and then this bulk_sync_to_db can then just always write to the DB.
50782a6 to
74dba8a
Compare
|
One of the parts of this PR: it moves creating of DAG runs out of the dag file parser process, and in to the scheduler -- if you have a large number of dag files (1000, for instance), all with the same scheduler_interval, then the time to parse each one sequentially can introduce significant delay in creating the DAG runs after the period ticks over. |
XD-DENG
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't have time to do thorough check yet, but have got a few minor suggestions. FYI
|
I'm working on updating/adding to the tests -- this fixup commit is going to be a bit large I'm afraid (at +444/-406 so far, about 60% through) -- as many tests have been re-organized between TestDagFileProcessor and TestSchedulerJob to reflect the location of new code (for example, |
|
As discussed with @potiuk , I'll extend this PR to add config option to disable all locking (a.k.a. escape hatch) in case of unforeseen MySQL performance issues. Locking ( |
|
Fantastic! I will have some time over the weekend to finally look through it myself :). |
|
The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks$,^Build docs$,^Spell check docs$,^Backport packages$,^Checks: Helm tests$,^Test OpenAPI*. |
|
The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks$,^Build docs$,^Spell check docs$,^Backport packages$,^Checks: Helm tests$,^Test OpenAPI*. |
|
We're trying to get this PR green before merging, because although it doesn't matter we know it's possible and it feels like an important one to say "yes, this is all good" (and we know it's possible to get it green) |
|
We got bored waiting for the Github Actions lottery (build timeouts, 137/OOM kill errors. We did have a green build just before though - https://github.com/apache/airflow/actions/runs/297696260 |
|
🎉 |
|
@ashb is this mean the localExecutor is now thread-safe in the use of pools ?
|
|
@raphaelauv That comment should be removed now -- multiple schedulers use a "mutex" (actually a lock on some DB rows) to avoid that exact problem. (Also Airflow doesn't use threads, but processes. Technicality though) |
|
@ashb thank I will PR |
|
@ashb One more question , how do we custom the number of threads ( that are in practice processes because of the GIL ) of the scheduler ? there is no more max_threads in http://apache-airflow-docs.s3-website.eu-central-1.amazonaws.com/docs/apache-airflow/latest/configurations-ref.html#scheduler Thank you |
|
Here it is. The "big" one. Smaller than I thought, but still a very fundamental change!
Depends upon #10949, ignore first commit when reviewing please.This PR is still in draft -- there are one or two TODOs left in the code that will need to be fixed before final merge, and while they are import, there is still enough here to start reviewing.This PR implements scheduler HA as proposed in AIP-15. The high level design is as follows:
(
SELECT ... FOR UPDATE)SKIP LOCKEDfor better performance when multiple schedulers arerunning. (Mysql < 8 and MariaDB don't support this)
operate just on the database
DagFileProcessorProcess:
Previously this component was responsible for more than just parsing the
DAG files as it's name might imply. It also was responsible for creating
DagRuns, and also making scheduling decisions of TIs, sending them from
"None" to "scheduled" state.
This commit changes it so that the DagFileProcessorProcess now will
update the SerializedDAG row for this DAG, and make no scheduling
decisions itself.
To make the scheduler's job easier (so that it can make as many
decisions as possible without having to load the possibly-large
SerializedDAG row) we store/update some columns on the DagModel table:
next_dagrun: The execution_date of the next dag run that should be created (orNone)
next_dagrun_create_after: The earliest point at which the next dagrun can be created
Pre-computing these values (and updating them every time the DAG is
parsed) reduce the overall load on the DB as many decisions can be taken
by selecting just these two columns/the small DagModel row.
In case of max_active_runs, or
@oncethese columns will be set tonull, meaning "don't create any dag runs"
SchedulerJob
The SchedulerJob used to only queue/send tasks to the executor after
they were parsed, and returned from the DagFileProcessorProcess.
This PR breaks the link between parsing and enqueuing of tasks, instead
of looking at DAGs as they are parsed, we now:
last_scheduling_decisionon DagRuntable, signifying when a scheduler last examined a DagRun
n DagRuns via
DagRun.next_dagruns_to_examine, prioritising DagRunswhich haven't been touched by a scheduler in the longest period
use the serialized versions
Part of #9630
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.