-
Notifications
You must be signed in to change notification settings - Fork 16.6k
Description
Apache Airflow Provider(s)
cncf-kubernetes
Versions of Apache Airflow Providers
apache-airflow-providers-cncf-kubernetes==8.3.2
apache-airflow-providers-common-io==1.3.2
apache-airflow-providers-http==4.12.0
apache-airflow-providers-postgres==5.11.2
Apache Airflow version
2.9.2
Operating System
Debian GNU/Linux 12 (bookworm)
Deployment
Other Docker-based deployment
Deployment details
We deploy Docker containers based on the official Airflow images (apache/airflow:slim-2.9.2-python3.11) to an AWS EKS kubernetes cluster running Kubernetes 1.29.
What happened
When a sensor is stopped due to a spot interrupt, it can end up stuck in the queued state, never getting into the `running state. In the scheduler logs, this information is reported:
~(12 other attempts omitted)~
[2024-07-01 08:49:47,283] {base_executor.py:284} INFO - queued but still running; attempt=13 task=TaskInstanceKey(dag_id='test', task_id='python-sensor-1', run_id='scheduled__2024-06-30T00:00:00+00:00', try_number=1, map_index=-1)
[2024-07-01 08:49:48,316] {base_executor.py:287} ERROR - could not queue task TaskInstanceKey(dag_id='test', task_id='python-sensor-1', run_id='scheduled__2024-06-30T00:00:00+00:00', try_number=1, map_index=-1) (still running after 13 attempts)
Restarting the scheduler allows the sensors to recover and get into the running state again.
What you think should happen instead
Sensors should be able to recover from a spot interrupt without needing to restart the scheduler.
I have tested multiple versions of the apache-airflow-providers-cncf-kubernetes provider, and this used to work correctly up until version 8.0.1. Starting from version 8.1.0, this has been broken (I've tested this on 8.1.0, 8.2.0, 8.3.1 and 8.3.2). This seems to be a regression.
How to reproduce
I've used the following definition to test this behavior.
This will just start a number of Python sensors which will never succeed and keep running until timeout.
from airflow import DAG
from airflow.sensors.python import PythonSensor
with DAG(
"test",
schedule_interval="@daily",
max_active_runs=1,
) as dag:
for i in range(10):
PythonSensor(
task_id=f"python-sensor-{i}",
mode="reschedule",
timeout=timedelta(hours=6),
poke_interval=timedelta(seconds=60),
python_callable=lambda: time.sleep(1200)
)
To trigger the behavior, one needs to spot interrupt the node where a sensor is running, but not the one where the scheduler is running. Restarting the scheduler allows the task to recover, so we want to only spot interrupt the task, while the scheduler stays healthy. You can look for a node matching these two constraints using k9s or kubectl, and then use the AWS console to initiate an interruption on the spot request for that node (can be done from the EC2 service).
Anything else
By following the reproduction steps as described, I can trigger this issue every time.
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct