Skip to content

Deferrable Operators don't respect execution_timeout after being deferred #19382

@eskarimov

Description

@eskarimov

Apache Airflow version

2.2.0

Operating System

Debian GNU/Linux 10 (buster)

Versions of Apache Airflow Providers

No response

Deployment

Docker-Compose

Deployment details

./breeze start-airflow --python 3.7 --backend postgres

What happened

When a task is resumed after being deferred, its start_time is not equal to the original start_time, but to the timestamp when a task has resumed.

In case a task has execution_timeout set up and it's running longer, it might not raise a timeout error, because technically a brand new task instance starts after being deferred.
I know it's expected that it'd be a brand new task instance, but the documentation describes the behaviour with execution_timeout set differently (see below in "What you expected to happen")

It is especially true, if an Operator needs to be deferred multiple times, so every time it continues after defer, time starts to count again.

Some task instance details after an example task has completed:

Attribute Value
execution_date 2021-11-03, 14:45:29
trigger_timeout 2021-11-03, 14:46:30
start_date 2021-11-03, 14:46:32
end_date 2021-11-03, 14:47:02
execution_timeout 0:01:00
duration 30.140004
state success

What you expected to happen

Task failure with Timeout Exception.
Documentation says:

  • Note that execution_timeout on Operators is considered over the total runtime, not individual executions in-between deferrals - this means that if execution_timeout is set, an Operator may fail while it's deferred or while it's running after a deferral, even if it's only been resumed for a few seconds.

Also, I see the following code part trying to check the timeout value after the task is coming back from the deferral state:

            # If we are coming in with a next_method (i.e. from a deferral),
            # calculate the timeout from our start_date.
            if self.next_method:
                timeout_seconds = (
                    task_copy.execution_timeout - (timezone.utcnow() - self.start_date)
                ).total_seconds()

But the issue is that self.start_date isn't equal to the original task's start_date

How to reproduce

DAG:

from datetime import datetime, timedelta

from airflow import DAG
from airflow.sensors.time_delta import TimeDeltaSensorAsync

with DAG(
    dag_id='time_delta_async_bug',
    schedule_interval=None,
    start_date=datetime(2021, 1, 1),
    catchup=False,
) as dag:
    time_delta_async_sensor = TimeDeltaSensorAsync(task_id='time_delta_task_id',
                                                   delta=timedelta(seconds=60),
                                                   execution_timeout=timedelta(seconds=60),
                                                   )

Since there're not so many async Operators at the moment I slightly modified TimeDeltaSensorAsync in order to simulate task work after defer.
Here is the full code for TimeDeltaSensorAsync class I used for to reproduce the issue, the only difference is the line with time.sleep(30) to simulate post-processing after a trigger has completed.

class TimeDeltaSensorAsync(TimeDeltaSensor):
    """
    A drop-in replacement for TimeDeltaSensor that defers itself to avoid
    taking up a worker slot while it is waiting.

    :param delta: time length to wait after the data interval before succeeding.
    :type delta: datetime.timedelta
    """

    def execute(self, context):
        target_dttm = context['data_interval_end']
        target_dttm += self.delta
        self.defer(trigger=DateTimeTrigger(moment=target_dttm), method_name="execute_complete")

    def execute_complete(self, context, event=None):  # pylint: disable=unused-argument
        """Callback for when the trigger fires - returns immediately."""
        time.sleep(30) # Simulate processing event after trigger completed
        return None

Anything else

I've checked the mark box "I'm willing to submit a PR", but not sure where to start, would be happy if someone could help me with the guidance in which direction I should look at.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions