Skip to content

Commit aac3877

Browse files
authored
Add metric for scheduling delay between first run task & expected start time (#9544)
Co-authored-by: Ace Haidrey <[email protected]>
1 parent 438b0a0 commit aac3877

File tree

3 files changed

+106
-13
lines changed

3 files changed

+106
-13
lines changed

airflow/models/dagrun.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -480,6 +480,7 @@ def update_state(
480480
else:
481481
self.set_state(State.RUNNING)
482482

483+
self._emit_true_scheduling_delay_stats_for_finished_state(finished_tasks)
483484
self._emit_duration_stats_for_finished_state()
484485

485486
session.merge(self)
@@ -565,6 +566,41 @@ def _are_premature_tis(
565566
return True
566567
return False
567568

569+
def _emit_true_scheduling_delay_stats_for_finished_state(self, finished_tis):
570+
"""
571+
This is a helper method to emit the true scheduling delay stats, which is defined as
572+
the time when the first task in DAG starts minus the expected DAG run datetime.
573+
This method will be used in the update_state method when the state of the DagRun
574+
is updated to a completed status (either success or failure). The method will find the first
575+
started task within the DAG and calculate the expected DagRun start time (based on
576+
dag.execution_date & dag.schedule_interval), and minus these two values to get the delay.
577+
The emitted data may contains outlier (e.g. when the first task was cleared, so
578+
the second task's start_date will be used), but we can get rid of the the outliers
579+
on the stats side through the dashboards tooling built.
580+
Note, the stat will only be emitted if the DagRun is a scheduler triggered one
581+
(i.e. external_trigger is False).
582+
"""
583+
try:
584+
if self.state == State.RUNNING:
585+
return
586+
if self.external_trigger:
587+
return
588+
if not finished_tis:
589+
return
590+
dag = self.get_dag()
591+
ordered_tis_by_start_date = [ti for ti in finished_tis if ti.start_date]
592+
ordered_tis_by_start_date.sort(key=lambda ti: ti.start_date, reverse=False)
593+
first_start_date = ordered_tis_by_start_date[0].start_date
594+
if first_start_date:
595+
# dag.following_schedule calculates the expected start datetime for a scheduled dagrun
596+
# i.e. a daily flow for execution date 1/1/20 actually runs on 1/2/20 hh:mm:ss,
597+
# and ti.start_date will be 1/2/20 hh:mm:ss so the following schedule is comparison
598+
true_delay = (first_start_date - dag.following_schedule(self.execution_date)).total_seconds()
599+
if true_delay >= 0:
600+
Stats.timing(f'dagrun.{dag.dag_id}.first_task_scheduling_delay', true_delay)
601+
except Exception as e:
602+
self.log.warning(f'Failed to record first_task_scheduling_delay metric:\n{e}')
603+
568604
def _emit_duration_stats_for_finished_state(self):
569605
if self.state == State.RUNNING:
570606
return

docs/logging-monitoring/metrics.rst

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -133,16 +133,17 @@ Name Description
133133
Timers
134134
------
135135

136-
=========================================== =================================================================
137-
Name Description
138-
=========================================== =================================================================
139-
``dagrun.dependency-check.<dag_id>`` Milliseconds taken to check DAG dependencies
140-
``dag.<dag_id>.<task_id>.duration`` Milliseconds taken to finish a task
141-
``dag_processing.last_duration.<dag_file>`` Milliseconds taken to load the given DAG file
142-
``dagrun.duration.success.<dag_id>`` Milliseconds taken for a DagRun to reach success state
143-
``dagrun.duration.failed.<dag_id>`` Milliseconds taken for a DagRun to reach failed state
144-
``dagrun.schedule_delay.<dag_id>`` Milliseconds of delay between the scheduled DagRun start date and
145-
the actual DagRun start date
146-
``scheduler.critical_section_duration`` Milliseconds spent in the critical section of scheduler loop --
147-
only a single scheduler can enter this loop at a time
148-
=========================================== =================================================================
136+
=================================================== ========================================================================
137+
Name Description
138+
=================================================== ========================================================================
139+
``dagrun.dependency-check.<dag_id>`` Milliseconds taken to check DAG dependencies
140+
``dag.<dag_id>.<task_id>.duration`` Milliseconds taken to finish a task
141+
``dag_processing.last_duration.<dag_file>`` Milliseconds taken to load the given DAG file
142+
``dagrun.duration.success.<dag_id>`` Milliseconds taken for a DagRun to reach success state
143+
``dagrun.duration.failed.<dag_id>`` Milliseconds taken for a DagRun to reach failed state
144+
``dagrun.schedule_delay.<dag_id>`` Milliseconds of delay between the scheduled DagRun
145+
start date and the actual DagRun start date
146+
``scheduler.critical_section_duration`` Milliseconds spent in the critical section of scheduler loop --
147+
only a single scheduler can enter this loop at a time
148+
``dagrun.<dag_id>.first_task_scheduling_delay`` Seconds elapsed between first task start_date and dagrun expected start
149+
=================================================== ========================================================================

tests/models/test_dagrun.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import datetime
2020
import unittest
2121
from unittest import mock
22+
from unittest.mock import call
2223

2324
from parameterized import parameterized
2425

@@ -27,8 +28,10 @@
2728
from airflow.models.dagrun import DagRun
2829
from airflow.operators.dummy_operator import DummyOperator
2930
from airflow.operators.python import ShortCircuitOperator
31+
from airflow.stats import Stats
3032
from airflow.utils import timezone
3133
from airflow.utils.callback_requests import DagCallbackRequest
34+
from airflow.utils.dates import days_ago
3235
from airflow.utils.state import State
3336
from airflow.utils.trigger_rule import TriggerRule
3437
from airflow.utils.types import DagRunType
@@ -714,3 +717,56 @@ def test_next_dagruns_to_examine_only_unpaused(self):
714717

715718
session.rollback()
716719
session.close()
720+
721+
@mock.patch.object(Stats, 'timing')
722+
def test_no_scheduling_delay_for_nonscheduled_runs(self, stats_mock):
723+
"""
724+
Tests that dag scheduling delay stat is not called if the dagrun is not a scheduled run.
725+
This case is manual run. Simple test for sanity check.
726+
"""
727+
dag = DAG(dag_id='test_dagrun_stats', start_date=days_ago(1))
728+
dag_task = DummyOperator(task_id='dummy', dag=dag)
729+
730+
initial_task_states = {
731+
dag_task.task_id: State.SUCCESS,
732+
}
733+
734+
dag_run = self.create_dag_run(dag=dag, state=State.RUNNING, task_states=initial_task_states)
735+
dag_run.update_state()
736+
self.assertNotIn(call(f'dagrun.{dag.dag_id}.first_task_scheduling_delay'), stats_mock.mock_calls)
737+
738+
@mock.patch.object(Stats, 'timing')
739+
def test_emit_scheduling_delay(self, stats_mock):
740+
"""
741+
Tests that dag scheduling delay stat is set properly once running scheduled dag.
742+
dag_run.update_state() invokes the _emit_true_scheduling_delay_stats_for_finished_state method.
743+
"""
744+
dag = DAG(dag_id='test_emit_dag_stats', start_date=days_ago(1))
745+
dag_task = DummyOperator(task_id='dummy', dag=dag, owner='airflow')
746+
747+
session = settings.Session()
748+
orm_dag = DagModel(
749+
dag_id=dag.dag_id,
750+
has_task_concurrency_limits=False,
751+
next_dagrun=dag.start_date,
752+
next_dagrun_create_after=dag.following_schedule(dag.start_date),
753+
is_active=True,
754+
)
755+
session.add(orm_dag)
756+
session.flush()
757+
dag_run = dag.create_dagrun(
758+
run_type=DagRunType.SCHEDULED,
759+
state=State.SUCCESS,
760+
execution_date=dag.start_date,
761+
start_date=dag.start_date,
762+
session=session,
763+
)
764+
ti = dag_run.get_task_instance(dag_task.task_id)
765+
ti.set_state(State.SUCCESS, session)
766+
session.commit()
767+
session.close()
768+
dag_run.update_state()
769+
true_delay = (ti.start_date - dag.following_schedule(dag_run.execution_date)).total_seconds()
770+
stats_mock.assert_called()
771+
sched_delay_stat_call = call(f'dagrun.{dag.dag_id}.first_task_scheduling_delay', true_delay)
772+
self.assertIn(sched_delay_stat_call, stats_mock.mock_calls)

0 commit comments

Comments
 (0)