-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Description
Apache Airflow version
2.2.0
Operating System
Debian GNU/Linux 10 (buster)
Versions of Apache Airflow Providers
No response
Deployment
Docker-Compose
Deployment details
./breeze start-airflow --python 3.7 --backend postgresWhat happened
When a task is resumed after being deferred, its start_time is not equal to the original start_time, but to the timestamp when a task has resumed.
In case a task has execution_timeout set up and it's running longer, it might not raise a timeout error, because technically a brand new task instance starts after being deferred.
I know it's expected that it'd be a brand new task instance, but the documentation describes the behaviour with execution_timeout set differently (see below in "What you expected to happen")
It is especially true, if an Operator needs to be deferred multiple times, so every time it continues after defer, time starts to count again.
Some task instance details after an example task has completed:
| Attribute | Value |
|---|---|
| execution_date | 2021-11-03, 14:45:29 |
| trigger_timeout | 2021-11-03, 14:46:30 |
| start_date | 2021-11-03, 14:46:32 |
| end_date | 2021-11-03, 14:47:02 |
| execution_timeout | 0:01:00 |
| duration | 30.140004 |
| state | success |
What you expected to happen
Task failure with Timeout Exception.
Documentation says:
- Note that
execution_timeouton Operators is considered over the total runtime, not individual executions in-between deferrals - this means that ifexecution_timeoutis set, an Operator may fail while it's deferred or while it's running after a deferral, even if it's only been resumed for a few seconds.
Also, I see the following code part trying to check the timeout value after the task is coming back from the deferral state:
# If we are coming in with a next_method (i.e. from a deferral),
# calculate the timeout from our start_date.
if self.next_method:
timeout_seconds = (
task_copy.execution_timeout - (timezone.utcnow() - self.start_date)
).total_seconds()But the issue is that self.start_date isn't equal to the original task's start_date
How to reproduce
DAG:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.sensors.time_delta import TimeDeltaSensorAsync
with DAG(
dag_id='time_delta_async_bug',
schedule_interval=None,
start_date=datetime(2021, 1, 1),
catchup=False,
) as dag:
time_delta_async_sensor = TimeDeltaSensorAsync(task_id='time_delta_task_id',
delta=timedelta(seconds=60),
execution_timeout=timedelta(seconds=60),
)Since there're not so many async Operators at the moment I slightly modified TimeDeltaSensorAsync in order to simulate task work after defer.
Here is the full code for TimeDeltaSensorAsync class I used for to reproduce the issue, the only difference is the line with time.sleep(30) to simulate post-processing after a trigger has completed.
class TimeDeltaSensorAsync(TimeDeltaSensor):
"""
A drop-in replacement for TimeDeltaSensor that defers itself to avoid
taking up a worker slot while it is waiting.
:param delta: time length to wait after the data interval before succeeding.
:type delta: datetime.timedelta
"""
def execute(self, context):
target_dttm = context['data_interval_end']
target_dttm += self.delta
self.defer(trigger=DateTimeTrigger(moment=target_dttm), method_name="execute_complete")
def execute_complete(self, context, event=None): # pylint: disable=unused-argument
"""Callback for when the trigger fires - returns immediately."""
time.sleep(30) # Simulate processing event after trigger completed
return NoneAnything else
I've checked the mark box "I'm willing to submit a PR", but not sure where to start, would be happy if someone could help me with the guidance in which direction I should look at.
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct