Skip to content

Commit 1507ca4

Browse files
viktorviauranusjr
andauthored
Fix StatD timing metric units (#21106)
Co-authored-by: Tzu-ping Chung <[email protected]> Co-authored-by: Tzu-ping Chung <[email protected]>
1 parent 6e96f04 commit 1507ca4

File tree

4 files changed

+35
-9
lines changed

4 files changed

+35
-9
lines changed

airflow/dag_processing/manager.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ class DagFileStat(NamedTuple):
7474
num_dags: int
7575
import_errors: int
7676
last_finish_time: Optional[datetime]
77-
last_duration: Optional[float]
77+
last_duration: Optional[timedelta]
7878
run_count: int
7979

8080

@@ -842,7 +842,7 @@ def get_last_runtime(self, file_path):
842842
:rtype: float
843843
"""
844844
stat = self._file_stats.get(file_path)
845-
return stat.last_duration if stat else None
845+
return stat.last_duration.total_seconds() if stat and stat.last_duration else None
846846

847847
def get_last_dag_count(self, file_path):
848848
"""
@@ -935,7 +935,7 @@ def _collect_results_from_processor(self, processor) -> None:
935935
count_import_errors = -1
936936
num_dags = 0
937937

938-
last_duration = (last_finish_time - processor.start_time).total_seconds()
938+
last_duration = last_finish_time - processor.start_time
939939
stat = DagFileStat(
940940
num_dags=num_dags,
941941
import_errors=count_import_errors,

airflow/sensors/smart_sensor.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -738,16 +738,17 @@ def execute(self, context: Context):
738738
for sensor_work in self.sensor_works:
739739
self._execute_sensor_work(sensor_work)
740740

741-
duration = (timezone.utcnow() - poke_start_time).total_seconds()
741+
duration = timezone.utcnow() - poke_start_time
742+
duration_seconds = duration.total_seconds()
742743

743-
self.log.info("Taking %s to execute %s tasks.", duration, len(self.sensor_works))
744+
self.log.info("Taking %s seconds to execute %s tasks.", duration_seconds, len(self.sensor_works))
744745

745746
Stats.timing("smart_sensor_operator.loop_duration", duration)
746747
Stats.gauge("smart_sensor_operator.executed_tasks", len(self.sensor_works))
747748
self._emit_loop_stats()
748749

749-
if duration < self.poke_interval:
750-
sleep(self.poke_interval - duration)
750+
if duration_seconds < self.poke_interval:
751+
sleep(self.poke_interval - duration_seconds)
751752
if (timezone.utcnow() - started_at).total_seconds() > self.timeout:
752753
self.log.info("Time is out for smart sensor.")
753754
return

tests/dag_processing/test_manager.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -427,7 +427,7 @@ def test_recently_modified_file_is_parsed_with_mtime_mode(
427427
# let's say the DAG was just parsed 2 seconds before the Freezed time
428428
last_finish_time = freezed_base_time - timedelta(seconds=10)
429429
manager._file_stats = {
430-
"file_1.py": DagFileStat(1, 0, last_finish_time, 1.0, 1),
430+
"file_1.py": DagFileStat(1, 0, last_finish_time, timedelta(seconds=1.0), 1),
431431
}
432432
with freeze_time(freezed_base_time):
433433
manager.set_file_paths(dag_files)
@@ -715,7 +715,9 @@ def test_send_file_processing_statsd_timing(self, statsd_timing_mock, tmpdir):
715715
child_pipe.close()
716716
parent_pipe.close()
717717

718-
statsd_timing_mock.assert_called_with('dag_processing.last_duration.temp_dag', last_runtime)
718+
statsd_timing_mock.assert_called_with(
719+
'dag_processing.last_duration.temp_dag', timedelta(seconds=last_runtime)
720+
)
719721

720722
def test_refresh_dags_dir_doesnt_delete_zipped_dags(self, tmpdir):
721723
"""Test DagFileProcessorManager._refresh_dag_dir method"""

tests/sensors/test_smart_sensor_operator.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import os
2121
import time
2222
import unittest
23+
from unittest import mock
2324
from unittest.mock import Mock
2425

2526
from freezegun import freeze_time
@@ -310,3 +311,25 @@ def test_register_in_sensor_service(self):
310311
assert sensor_instance is not None
311312
assert sensor_instance.state == State.SENSING
312313
assert sensor_instance.operator == "DummySensor"
314+
315+
@mock.patch('airflow.sensors.smart_sensor.Stats.timing')
316+
@mock.patch('airflow.sensors.smart_sensor.timezone.utcnow')
317+
def test_send_sensor_timing(self, timezone_utcnow_mock, statsd_timing_mock):
318+
initial_time = timezone.datetime(2022, 1, 5, 0, 0, 0)
319+
timezone_utcnow_mock.return_value = initial_time
320+
self._make_sensor_dag_run()
321+
smart = self._make_smart_operator(0)
322+
smart.timeout = 0
323+
duration = datetime.timedelta(seconds=3)
324+
timezone_utcnow_mock.side_effect = [
325+
# started_at
326+
initial_time,
327+
# poke_start_time
328+
initial_time,
329+
# duration
330+
initial_time + duration,
331+
# timeout check
332+
initial_time + duration,
333+
]
334+
smart.execute(None)
335+
statsd_timing_mock.assert_called_with('smart_sensor_operator.loop_duration', duration)

0 commit comments

Comments
 (0)