Skip to content

Conversation

@ashb
Copy link
Member

@ashb ashb commented Sep 15, 2020

Here it is. The "big" one. Smaller than I thought, but still a very fundamental change!

Depends upon #10949, ignore first commit when reviewing please.

This PR is still in draft -- there are one or two TODOs left in the code that will need to be fixed before final merge, and while they are import, there is still enough here to start reviewing.

  • Test that my forward-port of the changes actually works again. (I wanted to get the code up for review, so there may be some transcription errors breaking things.)
  • DAG SLAs need to happen via the parsing process
  • Adding back dagrun_timeout support
  • dag_run.verify_integrity is slow, and we don't want to call it every time, just when the dag structure changes (which we can know now thanks to DAG Serialization)
  • Add a savepoint in verify_integrity (to avoid rollback killing the whole transaction and releasing the locks.
  • Produce benchmark figures against this branch, not a 4 month old version of master branch.
  • Unit test the hell out of this.
  • Add config option to disable all locking (a.k.a. escape hatch) in case of unforseen MySQL performance issues.

This PR implements scheduler HA as proposed in AIP-15. The high level design is as follows:

  • Move all scheduling decisions into SchedulerJob (requiring DAG serialization in the scheduler)
  • Use row-level locks to ensure schedulers don't stomp on each other
    (SELECT ... FOR UPDATE)
  • Use SKIP LOCKED for better performance when multiple schedulers are
    running. (Mysql < 8 and MariaDB don't support this)
  • Scheduling decisions are not tied to the parsing speed, but can
    operate just on the database

DagFileProcessorProcess:

Previously this component was responsible for more than just parsing the
DAG files as it's name might imply. It also was responsible for creating
DagRuns, and also making scheduling decisions of TIs, sending them from
"None" to "scheduled" state.

This commit changes it so that the DagFileProcessorProcess now will
update the SerializedDAG row for this DAG, and make no scheduling
decisions itself.

To make the scheduler's job easier (so that it can make as many
decisions as possible without having to load the possibly-large
SerializedDAG row) we store/update some columns on the DagModel table:

  • next_dagrun: The execution_date of the next dag run that should be created (or
    None)
  • next_dagrun_create_after: The earliest point at which the next dag
    run can be created

Pre-computing these values (and updating them every time the DAG is
parsed) reduce the overall load on the DB as many decisions can be taken
by selecting just these two columns/the small DagModel row.

In case of max_active_runs, or @once these columns will be set to
null, meaning "don't create any dag runs"

SchedulerJob

The SchedulerJob used to only queue/send tasks to the executor after
they were parsed, and returned from the DagFileProcessorProcess.

This PR breaks the link between parsing and enqueuing of tasks, instead
of looking at DAGs as they are parsed, we now:

  • store a new datetime column, last_scheduling_decision on DagRun
    table, signifying when a scheduler last examined a DagRun
  • Each time around the loop the scheduler will get (and lock) the next
    n DagRuns via DagRun.next_dagruns_to_examine, prioritising DagRuns
    which haven't been touched by a scheduler in the longest period
  • SimpleTaskInstance etc have been almost entirely removed now, as we
    use the serialized versions

Part of #9630


^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.

@boring-cyborg boring-cyborg bot added area:docs area:Scheduler including HA (high availability) scheduler area:serialization labels Sep 15, 2020
@mik-laj mik-laj added the AIP-15 label Sep 15, 2020
@potiuk
Copy link
Member

potiuk commented Sep 15, 2020

Cool. Looking at it

@ryw
Copy link
Member

ryw commented Sep 15, 2020

We're working on a short doc that will provide an easy way to test the components/PRs of AIP-15 together, and some preliminary benchmarks for performance improvement, hopefully have that out tomorrow.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
:rtype: int or None
:rtype: Optional[None]

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doc typing uses a different syntax to type comments https://www.sphinx-doc.org/en/master/usage/restructuredtext/domains.html#info-field-lists:

Multiple types in a type field will be linked automatically if separated by the word “or”:

:type an_arg: int or None
:vartype a_var: str or int
:rtype: float or str

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, TIL 👌

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function returns tuple or None. The description in the pydocstring is incorrect.
int or None != Optional[Tuple[int, int]]

@ashb
Copy link
Member Author

ashb commented Sep 16, 2020

Process note: I intend to keep changes as fixups/separate commits even beyond rebasing/following master, hopefully to to make it easier to re-review this in future. (I.e. I will use git rebase -i origin/master --no-autosquash)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you tell a little more about it? How do you want serialization of the same object to a database to produce different results? Is it by adding the next_dagrun, next_dagrun_create_after fields to the DAG in the sync_to_db method?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ashb can you please take a look here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change was a bit of a hack here, and we'll tidy it up.

Since the scheduler now requires DAG serialization to operate, we need a way for dag bags to Load dags from given files only, and not from the DB, but then also a way to make them write things back to the DB.

We'll pull this out in to a separate PR, essentially removing the STORE_SERIALIZED_DAGS option entirely, and then this bulk_sync_to_db can then just always write to the DB.

@ashb ashb force-pushed the scheduler-ha branch 2 times, most recently from 50782a6 to 74dba8a Compare September 16, 2020 14:56
@ashb
Copy link
Member Author

ashb commented Sep 17, 2020

One of the parts of this PR: it moves creating of DAG runs out of the dag file parser process, and in to the scheduler -- if you have a large number of dag files (1000, for instance), all with the same scheduler_interval, then the time to parse each one sequentially can introduce significant delay in creating the DAG runs after the period ticks over.

Copy link
Member

@XD-DENG XD-DENG left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't have time to do thorough check yet, but have got a few minor suggestions. FYI

@ashb
Copy link
Member Author

ashb commented Sep 18, 2020

I'm working on updating/adding to the tests -- this fixup commit is going to be a bit large I'm afraid (at +444/-406 so far, about 60% through) -- as many tests have been re-organized between TestDagFileProcessor and TestSchedulerJob to reflect the location of new code (for example, test_dag_with_system_exit doesn't make sense in TestSchedulerJob anymore, as the scheduler doesn't parse the dags at all)

@ashb
Copy link
Member Author

ashb commented Sep 18, 2020

As discussed with @potiuk , I'll extend this PR to add config option to disable all locking (a.k.a. escape hatch) in case of unforeseen MySQL performance issues. Locking (SELECT ... FOR UPDATE) will be on by default, so out of the box on Mysql 8+ and Postgres you can Just Run More Schedulers.

@potiuk
Copy link
Member

potiuk commented Sep 18, 2020

Fantastic! I will have some time over the weekend to finally look through it myself :).

@github-actions
Copy link

github-actions bot commented Oct 9, 2020

The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks$,^Build docs$,^Spell check docs$,^Backport packages$,^Checks: Helm tests$,^Test OpenAPI*.

@github-actions
Copy link

github-actions bot commented Oct 9, 2020

The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks$,^Build docs$,^Spell check docs$,^Backport packages$,^Checks: Helm tests$,^Test OpenAPI*.

@ashb
Copy link
Member Author

ashb commented Oct 9, 2020

We're trying to get this PR green before merging, because although it doesn't matter we know it's possible and it feels like an important one to say "yes, this is all good" (and we know it's possible to get it green)

@ashb
Copy link
Member Author

ashb commented Oct 9, 2020

We got bored waiting for the Github Actions lottery (build timeouts, 137/OOM kill errors. We did have a green build just before though - https://github.com/apache/airflow/actions/runs/297696260

@ashb ashb merged commit 73b9163 into apache:master Oct 9, 2020
@potiuk
Copy link
Member

potiuk commented Oct 9, 2020

🎉

@raphaelauv
Copy link
Contributor

@ashb is this mean the localExecutor is now thread-safe in the use of pools ?

https://airflow.apache.org/docs/stable/concepts.html#pools

Pools are not thread-safe , in case of more than one scheduler in localExecutor Mode you can’t ensure the non-scheduling of task even if the pool is full.

@ashb
Copy link
Member Author

ashb commented Nov 29, 2020

@raphaelauv That comment should be removed now -- multiple schedulers use a "mutex" (actually a lock on some DB rows) to avoid that exact problem.

(Also Airflow doesn't use threads, but processes. Technicality though)

@raphaelauv
Copy link
Contributor

@ashb thank I will PR

@raphaelauv
Copy link
Contributor

@ashb One more question , how do we custom the number of threads ( that are in practice processes because of the GIL ) of the scheduler ?

there is no more max_threads in http://apache-airflow-docs.s3-website.eu-central-1.amazonaws.com/docs/apache-airflow/latest/configurations-ref.html#scheduler

Thank you

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:performance area:Scheduler including HA (high availability) scheduler area:serialization

Projects

None yet

Development

Successfully merging this pull request may close these issues.