Skip to content

Sensors stuck in queued after being spot interrupted #40516

@vlieven

Description

@vlieven

Apache Airflow Provider(s)

cncf-kubernetes

Versions of Apache Airflow Providers

apache-airflow-providers-cncf-kubernetes==8.3.2
apache-airflow-providers-common-io==1.3.2
apache-airflow-providers-http==4.12.0
apache-airflow-providers-postgres==5.11.2

Apache Airflow version

2.9.2

Operating System

Debian GNU/Linux 12 (bookworm)

Deployment

Other Docker-based deployment

Deployment details

We deploy Docker containers based on the official Airflow images (apache/airflow:slim-2.9.2-python3.11) to an AWS EKS kubernetes cluster running Kubernetes 1.29.

What happened

When a sensor is stopped due to a spot interrupt, it can end up stuck in the queued state, never getting into the `running state. In the scheduler logs, this information is reported:

~(12 other attempts omitted)~
[2024-07-01 08:49:47,283] {base_executor.py:284} INFO - queued but still running; attempt=13 task=TaskInstanceKey(dag_id='test', task_id='python-sensor-1', run_id='scheduled__2024-06-30T00:00:00+00:00', try_number=1, map_index=-1)
[2024-07-01 08:49:48,316] {base_executor.py:287} ERROR - could not queue task TaskInstanceKey(dag_id='test', task_id='python-sensor-1', run_id='scheduled__2024-06-30T00:00:00+00:00', try_number=1, map_index=-1) (still running after 13 attempts)

Restarting the scheduler allows the sensors to recover and get into the running state again.

What you think should happen instead

Sensors should be able to recover from a spot interrupt without needing to restart the scheduler.

I have tested multiple versions of the apache-airflow-providers-cncf-kubernetes provider, and this used to work correctly up until version 8.0.1. Starting from version 8.1.0, this has been broken (I've tested this on 8.1.0, 8.2.0, 8.3.1 and 8.3.2). This seems to be a regression.

How to reproduce

I've used the following definition to test this behavior.
This will just start a number of Python sensors which will never succeed and keep running until timeout.

from airflow import DAG
from airflow.sensors.python import PythonSensor

with DAG(
    "test",
    schedule_interval="@daily",
    max_active_runs=1,
) as dag:
    for i in range(10):
        PythonSensor(
            task_id=f"python-sensor-{i}",
            mode="reschedule",
            timeout=timedelta(hours=6),
            poke_interval=timedelta(seconds=60),
            python_callable=lambda: time.sleep(1200)
        )

To trigger the behavior, one needs to spot interrupt the node where a sensor is running, but not the one where the scheduler is running. Restarting the scheduler allows the task to recover, so we want to only spot interrupt the task, while the scheduler stays healthy. You can look for a node matching these two constraints using k9s or kubectl, and then use the AWS console to initiate an interruption on the spot request for that node (can be done from the EC2 service).

Anything else

By following the reproduction steps as described, I can trigger this issue every time.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions