Skip to content

Commit f672d5d

Browse files
authored
fix: Improve the reliability of alerts & reports (#25239)
1 parent a724850 commit f672d5d

File tree

4 files changed

+106
-67
lines changed

4 files changed

+106
-67
lines changed

superset/config.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -914,6 +914,10 @@ class D3Format(TypedDict, total=False):
914914
[86400, "24 hours"],
915915
]
916916

917+
# This is used as a workaround for the alerts & reports scheduler task to get the time
918+
# celery beat triggered it, see https://github.com/celery/celery/issues/6974 for details
919+
CELERY_BEAT_SCHEDULER_EXPIRES = timedelta(weeks=1)
920+
917921
# Default celery config is to use SQLA as a broker, in a production setting
918922
# you'll want to use a proper broker as specified here:
919923
# https://docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/index.html
@@ -942,6 +946,7 @@ class CeleryConfig: # pylint: disable=too-few-public-methods
942946
"reports.scheduler": {
943947
"task": "reports.scheduler",
944948
"schedule": crontab(minute="*", hour="*"),
949+
"options": {"expires": int(CELERY_BEAT_SCHEDULER_EXPIRES.total_seconds())},
945950
},
946951
"reports.prune_log": {
947952
"task": "reports.prune_log",

superset/tasks/cron_util.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
import logging
1919
from collections.abc import Iterator
20-
from datetime import datetime, timedelta, timezone as dt_timezone
20+
from datetime import datetime, timedelta
2121

2222
from croniter import croniter
2323
from pytz import timezone as pytz_timezone, UnknownTimeZoneError
@@ -27,10 +27,10 @@
2727
logger = logging.getLogger(__name__)
2828

2929

30-
def cron_schedule_window(cron: str, timezone: str) -> Iterator[datetime]:
30+
def cron_schedule_window(
31+
triggered_at: datetime, cron: str, timezone: str
32+
) -> Iterator[datetime]:
3133
window_size = app.config["ALERT_REPORTS_CRON_WINDOW_SIZE"]
32-
# create a time-aware datetime in utc
33-
time_now = datetime.now(tz=dt_timezone.utc)
3434
try:
3535
tz = pytz_timezone(timezone)
3636
except UnknownTimeZoneError:
@@ -39,9 +39,9 @@ def cron_schedule_window(cron: str, timezone: str) -> Iterator[datetime]:
3939
logger.warning("Timezone %s was invalid. Falling back to 'UTC'", timezone)
4040
utc = pytz_timezone("UTC")
4141
# convert the current time to the user's local time for comparison
42-
time_now = time_now.astimezone(tz)
43-
start_at = time_now - timedelta(seconds=1)
44-
stop_at = time_now + timedelta(seconds=window_size)
42+
time_now = triggered_at.astimezone(tz)
43+
start_at = time_now - timedelta(seconds=window_size / 2)
44+
stop_at = time_now + timedelta(seconds=window_size / 2)
4545
crons = croniter(cron, start_at)
4646
for schedule in crons.all_next(datetime):
4747
if schedule >= stop_at:

superset/tasks/scheduler.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717
import logging
18+
from datetime import datetime
1819

1920
from celery import Celery
2021
from celery.exceptions import SoftTimeLimitExceeded
@@ -47,9 +48,15 @@ def scheduler() -> None:
4748
return
4849
with session_scope(nullpool=True) as session:
4950
active_schedules = ReportScheduleDAO.find_active(session)
51+
triggered_at = (
52+
datetime.fromisoformat(scheduler.request.expires)
53+
- app.config["CELERY_BEAT_SCHEDULER_EXPIRES"]
54+
if scheduler.request.expires
55+
else datetime.utcnow()
56+
)
5057
for active_schedule in active_schedules:
5158
for schedule in cron_schedule_window(
52-
active_schedule.crontab, active_schedule.timezone
59+
triggered_at, active_schedule.crontab, active_schedule.timezone
5360
):
5461
logger.info(
5562
"Scheduling alert %s eta: %s", active_schedule.name, schedule

tests/unit_tests/tasks/test_cron_util.py

Lines changed: 86 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,9 @@
1414
# KIND, either express or implied. See the License for the
1515
# specific language governing permissions and limitations
1616
# under the License.
17+
from datetime import datetime
1718

1819
import pytest
19-
import pytz
20-
from dateutil import parser
21-
from freezegun import freeze_time
2220
from freezegun.api import FakeDatetime
2321

2422
from superset.tasks.cron_util import cron_schedule_window
@@ -27,23 +25,28 @@
2725
@pytest.mark.parametrize(
2826
"current_dttm, cron, expected",
2927
[
30-
("2020-01-01T08:59:01Z", "0 1 * * *", []),
28+
("2020-01-01T08:59:01+00:00", "0 1 * * *", []),
3129
(
32-
"2020-01-01T08:59:02Z",
30+
"2020-01-01T08:59:32+00:00",
3331
"0 1 * * *",
3432
[FakeDatetime(2020, 1, 1, 9, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
3533
),
3634
(
37-
"2020-01-01T08:59:59Z",
35+
"2020-01-01T08:59:59+00:00",
3836
"0 1 * * *",
3937
[FakeDatetime(2020, 1, 1, 9, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
4038
),
4139
(
42-
"2020-01-01T09:00:00Z",
40+
"2020-01-01T09:00:00+00:00",
4341
"0 1 * * *",
4442
[FakeDatetime(2020, 1, 1, 9, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
4543
),
46-
("2020-01-01T09:00:01Z", "0 1 * * *", []),
44+
(
45+
"2020-01-01T09:00:01+00:00",
46+
"0 1 * * *",
47+
[FakeDatetime(2020, 1, 1, 9, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
48+
),
49+
("2020-01-01T09:00:30+00:00", "0 1 * * *", []),
4750
],
4851
)
4952
def test_cron_schedule_window_los_angeles(
@@ -53,34 +56,40 @@ def test_cron_schedule_window_los_angeles(
5356
Reports scheduler: Test cron schedule window for "America/Los_Angeles"
5457
"""
5558

56-
with freeze_time(current_dttm):
57-
datetimes = cron_schedule_window(cron, "America/Los_Angeles")
58-
assert (
59-
list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes)
60-
== expected
61-
)
59+
datetimes = cron_schedule_window(
60+
datetime.fromisoformat(current_dttm), cron, "America/Los_Angeles"
61+
)
62+
assert (
63+
list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes) == expected
64+
)
6265

6366

6467
@pytest.mark.parametrize(
6568
"current_dttm, cron, expected",
6669
[
67-
("2020-01-01T00:59:01Z", "0 1 * * *", []),
70+
("2020-01-01T00:59:01+00:00", "0 1 * * *", []),
71+
("2020-01-01T00:59:02+00:00", "0 1 * * *", []),
72+
(
73+
"2020-01-01T00:59:59+00:00",
74+
"0 1 * * *",
75+
[FakeDatetime(2020, 1, 1, 1, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
76+
),
6877
(
69-
"2020-01-01T00:59:02Z",
78+
"2020-01-01T01:00:00+00:00",
7079
"0 1 * * *",
7180
[FakeDatetime(2020, 1, 1, 1, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
7281
),
7382
(
74-
"2020-01-01T00:59:59Z",
83+
"2020-01-01T01:00:01+00:00",
7584
"0 1 * * *",
7685
[FakeDatetime(2020, 1, 1, 1, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
7786
),
7887
(
79-
"2020-01-01T01:00:00Z",
88+
"2020-01-01T01:00:29+00:00",
8089
"0 1 * * *",
8190
[FakeDatetime(2020, 1, 1, 1, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
8291
),
83-
("2020-01-01T01:00:01Z", "0 1 * * *", []),
92+
("2020-01-01T01:00:30+00:00", "0 1 * * *", []),
8493
],
8594
)
8695
def test_cron_schedule_window_invalid_timezone(
@@ -90,35 +99,41 @@ def test_cron_schedule_window_invalid_timezone(
9099
Reports scheduler: Test cron schedule window for "invalid timezone"
91100
"""
92101

93-
with freeze_time(current_dttm):
94-
datetimes = cron_schedule_window(cron, "invalid timezone")
95-
# it should default to UTC
96-
assert (
97-
list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes)
98-
== expected
99-
)
102+
datetimes = cron_schedule_window(
103+
datetime.fromisoformat(current_dttm), cron, "invalid timezone"
104+
)
105+
# it should default to UTC
106+
assert (
107+
list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes) == expected
108+
)
100109

101110

102111
@pytest.mark.parametrize(
103112
"current_dttm, cron, expected",
104113
[
105-
("2020-01-01T05:59:01Z", "0 1 * * *", []),
114+
("2020-01-01T05:59:01+00:00", "0 1 * * *", []),
115+
("2020-01-01T05:59:02+00:00", "0 1 * * *", []),
116+
(
117+
"2020-01-01T05:59:59+00:00",
118+
"0 1 * * *",
119+
[FakeDatetime(2020, 1, 1, 6, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
120+
),
106121
(
107-
"2020-01-01T05:59:02Z",
122+
"2020-01-01T06:00:00+00:00",
108123
"0 1 * * *",
109124
[FakeDatetime(2020, 1, 1, 6, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
110125
),
111126
(
112-
"2020-01-01T5:59:59Z",
127+
"2020-01-01T06:00:01+00:00",
113128
"0 1 * * *",
114129
[FakeDatetime(2020, 1, 1, 6, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
115130
),
116131
(
117-
"2020-01-01T6:00:00",
132+
"2020-01-01T06:00:29+00:00",
118133
"0 1 * * *",
119134
[FakeDatetime(2020, 1, 1, 6, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
120135
),
121-
("2020-01-01T6:00:01Z", "0 1 * * *", []),
136+
("2020-01-01T06:00:30+00:00", "0 1 * * *", []),
122137
],
123138
)
124139
def test_cron_schedule_window_new_york(
@@ -128,34 +143,40 @@ def test_cron_schedule_window_new_york(
128143
Reports scheduler: Test cron schedule window for "America/New_York"
129144
"""
130145

131-
with freeze_time(current_dttm, tz_offset=0):
132-
datetimes = cron_schedule_window(cron, "America/New_York")
133-
assert (
134-
list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes)
135-
== expected
136-
)
146+
datetimes = cron_schedule_window(
147+
datetime.fromisoformat(current_dttm), cron, "America/New_York"
148+
)
149+
assert (
150+
list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes) == expected
151+
)
137152

138153

139154
@pytest.mark.parametrize(
140155
"current_dttm, cron, expected",
141156
[
142-
("2020-01-01T06:59:01Z", "0 1 * * *", []),
157+
("2020-01-01T06:59:01+00:00", "0 1 * * *", []),
158+
("2020-01-01T06:59:02+00:00", "0 1 * * *", []),
159+
(
160+
"2020-01-01T06:59:59+00:00",
161+
"0 1 * * *",
162+
[FakeDatetime(2020, 1, 1, 7, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
163+
),
143164
(
144-
"2020-01-01T06:59:02Z",
165+
"2020-01-01T07:00:00+00:00",
145166
"0 1 * * *",
146167
[FakeDatetime(2020, 1, 1, 7, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
147168
),
148169
(
149-
"2020-01-01T06:59:59Z",
170+
"2020-01-01T07:00:01+00:00",
150171
"0 1 * * *",
151172
[FakeDatetime(2020, 1, 1, 7, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
152173
),
153174
(
154-
"2020-01-01T07:00:00",
175+
"2020-01-01T07:00:29+00:00",
155176
"0 1 * * *",
156177
[FakeDatetime(2020, 1, 1, 7, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
157178
),
158-
("2020-01-01T07:00:01Z", "0 1 * * *", []),
179+
("2020-01-01T07:00:30+00:00", "0 1 * * *", []),
159180
],
160181
)
161182
def test_cron_schedule_window_chicago(
@@ -165,34 +186,40 @@ def test_cron_schedule_window_chicago(
165186
Reports scheduler: Test cron schedule window for "America/Chicago"
166187
"""
167188

168-
with freeze_time(current_dttm, tz_offset=0):
169-
datetimes = cron_schedule_window(cron, "America/Chicago")
170-
assert (
171-
list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes)
172-
== expected
173-
)
189+
datetimes = cron_schedule_window(
190+
datetime.fromisoformat(current_dttm), cron, "America/Chicago"
191+
)
192+
assert (
193+
list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes) == expected
194+
)
174195

175196

176197
@pytest.mark.parametrize(
177198
"current_dttm, cron, expected",
178199
[
179-
("2020-07-01T05:59:01Z", "0 1 * * *", []),
200+
("2020-07-01T05:59:01+00:00", "0 1 * * *", []),
201+
("2020-07-01T05:59:02+00:00", "0 1 * * *", []),
202+
(
203+
"2020-07-01T05:59:59+00:00",
204+
"0 1 * * *",
205+
[FakeDatetime(2020, 7, 1, 6, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
206+
),
180207
(
181-
"2020-07-01T05:59:02Z",
208+
"2020-07-01T06:00:00+00:00",
182209
"0 1 * * *",
183210
[FakeDatetime(2020, 7, 1, 6, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
184211
),
185212
(
186-
"2020-07-01T05:59:59Z",
213+
"2020-07-01T06:00:01+00:00",
187214
"0 1 * * *",
188215
[FakeDatetime(2020, 7, 1, 6, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
189216
),
190217
(
191-
"2020-07-01T06:00:00",
218+
"2020-07-01T06:00:29+00:00",
192219
"0 1 * * *",
193220
[FakeDatetime(2020, 7, 1, 6, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
194221
),
195-
("2020-07-01T06:00:01Z", "0 1 * * *", []),
222+
("2020-07-01T06:00:30+00:00", "0 1 * * *", []),
196223
],
197224
)
198225
def test_cron_schedule_window_chicago_daylight(
@@ -202,9 +229,9 @@ def test_cron_schedule_window_chicago_daylight(
202229
Reports scheduler: Test cron schedule window for "America/Chicago"
203230
"""
204231

205-
with freeze_time(current_dttm, tz_offset=0):
206-
datetimes = cron_schedule_window(cron, "America/Chicago")
207-
assert (
208-
list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes)
209-
== expected
210-
)
232+
datetimes = cron_schedule_window(
233+
datetime.fromisoformat(current_dttm), cron, "America/Chicago"
234+
)
235+
assert (
236+
list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes) == expected
237+
)

0 commit comments

Comments
 (0)