-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Description
Apache Airflow version
Other Airflow 2 version
What happened
Airflow Version: 2.2.5
MySQL Version: 8.0.18
In the Scheduler, we are coming across instances where MySQL is inefficiently optimizing the critical section task queuing query. When a large number of task instances are scheduled, MySQL failing to use the ti_state index to filter the task_instance table, resulting in a full table scan (about 7.3 million rows).
Normally, when running the critical section query the index on task_instance.state is used to filter scheduled task_instances.
| -> Limit: 512 row(s) (actual time=5.290..5.413 rows=205 loops=1)
-> Sort row IDs: <temporary>.tmp_field_0, <temporary>.execution_date, limit input to 512 row(s) per chunk (actual time=5.289..5.391 rows=205 loops=1)
-> Table scan on <temporary> (actual time=0.003..0.113 rows=205 loops=1)
-> Temporary table (actual time=5.107..5.236 rows=205 loops=1)
-> Nested loop inner join (cost=20251.99 rows=1741) (actual time=0.100..4.242 rows=205 loops=1)
-> Nested loop inner join (cost=161.70 rows=12) (actual time=0.071..2.436 rows=205 loops=1)
-> Index lookup on task_instance using ti_state (state='scheduled') (cost=80.85 rows=231) (actual time=0.051..1.992 rows=222 loops=1)
-> Filter: ((dag_run.run_type <> 'backfill') and (dag_run.state = 'running')) (cost=0.25 rows=0) (actual time=0.002..0.002 rows=1 loops=222)
-> Single-row index lookup on dag_run using dag_run_dag_id_run_id_key (dag_id=task_instance.dag_id, run_id=task_instance.run_id) (cost=0.25 rows=1) (actual time=0.001..0.001 rows=1 loops=222)
-> Filter: ((dag.is_paused = 0) and (task_instance.dag_id = dag.dag_id)) (cost=233.52 rows=151) (actual time=0.008..0.008 rows=1 loops=205)
-> Index range scan on dag (re-planned for each iteration) (cost=233.52 rows=15072) (actual time=0.008..0.008 rows=1 loops=205)
1 row in set, 1 warning (0.03 sec)When a large number of task_instances are in scheduled state at the same time, the index on task_instance.state is not being used to filter scheduled task_instances.
| -> Limit: 512 row(s) (actual time=12110.251..12110.573 rows=512 loops=1)
-> Sort row IDs: <temporary>.tmp_field_0, <temporary>.execution_date, limit input to 512 row(s) per chunk (actual time=12110.250..12110.526 rows=512 loops=1)
-> Table scan on <temporary> (actual time=0.005..0.800 rows=1176 loops=1)
-> Temporary table (actual time=12109.022..12109.940 rows=1176 loops=1)
-> Nested loop inner join (cost=10807.83 rows=3) (actual time=1.328..12097.528 rows=1176 loops=1)
-> Nested loop inner join (cost=10785.34 rows=64) (actual time=1.293..12084.371 rows=1193 loops=1)
-> Filter: (dag.is_paused = 0) (cost=1371.40 rows=1285) (actual time=0.087..22.409 rows=13264 loops=1)
-> Table scan on dag (cost=1371.40 rows=12854) (actual time=0.085..15.796 rows=13508 loops=1)
-> Filter: ((task_instance.state = 'scheduled') and (task_instance.dag_id = dag.dag_id)) (cost=0.32 rows=0) (actual time=0.907..0.909 rows=0 loops=13264)
-> Index lookup on task_instance using PRIMARY (dag_id=dag.dag_id) (cost=0.32 rows=70) (actual time=0.009..0.845 rows=553 loops=13264)
-> Filter: ((dag_run.run_type <> 'backfill') and (dag_run.state = 'running')) (cost=0.25 rows=0) (actual time=0.010..0.011 rows=1 loops=1193)
-> Single-row index lookup on dag_run using dag_run_dag_id_run_id_key (dag_id=task_instance.dag_id, run_id=task_instance.run_id) (cost=0.25 rows=1) (actual time=0.009..0.010 rows=1 loops=1193)
1 row in set, 1 warning (12.14 sec)What you think should happen instead
To resolve this, I added a patch on the scheduler_job.py file, adding a MySQL index hint to use the ti_state index.
--- /usr/local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py
+++ /usr/local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py
@@ -293,6 +293,7 @@ class SchedulerJob(BaseJob):
# and the dag is not paused
query = (
session.query(TI)
+ .with_hint(TI, 'USE INDEX (ti_state)', dialect_name='mysql')
.join(TI.dag_run)
.filter(DR.run_type != DagRunType.BACKFILL_JOB, DR.state == DagRunState.RUNNING)
.join(TI.dag_model)I think it makes sense to add this index hint upstream.
How to reproduce
Schedule a large number of dag runs and tasks in a short period of time.
Operating System
Debian GNU/Linux 10 (buster)
Versions of Apache Airflow Providers
No response
Deployment
Other 3rd-party Helm chart
Deployment details
Airflow 2.2.5 on Kubernetes
MySQL Version: 8.0.18
Anything else
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct