-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Description
Apache Airflow version
2.10.0
If "Other Airflow 2 version" selected, which one?
No response
What happened?
I try to run a Kubernetes Pod Operator (v8.4.0) with deferrable=True (and do_xcom_push=True which was also an issue in earlier versions but not now). I define my kubernetes cluster connection in the code using airflow.models.connection.Connection and supply it via an environment variable.
After all containers (init-container, sidecar and main container) have completed succesfully I get
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 767, in _execute_task
result = _execute_callable(context=context, **execute_callable_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 733, in _execute_callable
return ExecutionCallableRunner(
^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/operator_helpers.py", line 252, in run
return self.func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/baseoperator.py", line 1792, in resume_execution
return execute_callable(context)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 773, in trigger_reentry
raise AirflowException(message)
airflow.exceptions.AirflowException: Traceback (most recent call last):
File "/home/***/.local/lib/python3.12/site-packages/***/providers/cncf/kubernetes/triggers/pod.py", line 162, in run
state = await self._wait_for_pod_start()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/***/.local/lib/python3.12/site-packages/***/providers/cncf/kubernetes/triggers/pod.py", line 223, in _wait_for_pod_start
pod = await self.hook.get_pod(self.pod_name, self.pod_namespace)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/***/.local/lib/python3.12/site-packages/***/providers/cncf/kubernetes/hooks/kubernetes.py", line 747, in get_pod
async with self.get_conn() as connection:
File "/usr/local/lib/python3.12/contextlib.py", line 210, in __aenter__
return await anext(self.gen)
^^^^^^^^^^^^^^^^^^^^^
File "/home/***/.local/lib/python3.12/site-packages/***/providers/cncf/kubernetes/hooks/kubernetes.py", line 734, in get_conn
kube_client = await self._load_config() or async_client.ApiClient()
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/***/.local/lib/python3.12/site-packages/***/providers/cncf/kubernetes/hooks/kubernetes.py", line 664, in _load_config
in_cluster = self._coalesce_param(self.in_cluster, await self._get_field("in_cluster"))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/***/.local/lib/python3.12/site-packages/***/providers/cncf/kubernetes/hooks/kubernetes.py", line 724, in _get_field
extras = await self.get_conn_extras()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/***/.local/lib/python3.12/site-packages/***/providers/cncf/kubernetes/hooks/kubernetes.py", line 712, in get_conn_extras
connection = await sync_to_async(self.get_connection)(self.conn_id)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/***/.local/lib/python3.12/site-packages/asgiref/sync.py", line 468, in __call__
ret = await asyncio.shield(exec_coro)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/concurrent/futures/thread.py", line 58, in run
result = self.fn(*self.args, **self.kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/***/.local/lib/python3.12/site-packages/asgiref/sync.py", line 522, in thread_handler
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/home/***/.local/lib/python3.12/site-packages/***/providers/cncf/kubernetes/hooks/kubernetes.py", line 169, in get_connection
return super().get_connection(conn_id)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/***/.local/lib/python3.12/site-packages/***/hooks/base.py", line 83, in get_connection
conn = Connection.get_connection_from_secrets(conn_id)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/***/.local/lib/python3.12/site-packages/***/models/connection.py", line 537, in get_connection_from_secrets
raise AirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined")
***.exceptions.AirflowNotFoundException: The conn_id `k8s-conn-id` isn't definedNow if I:
- set
deferrable=Falseit works with environment variable connection - make an explicit k8s cluster connection in the UI, then
deferrable=Trueworks
What you think should happen instead?
supplying connection via environment variable as described in docs should work exactly like connections via UI
How to reproduce
make a kubernetes cluster connection
# create connection
import os
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
KubernetesPodOperator,
)
extra = {
"kube_config": <kube_config to your cluser>,
"namespace": <your namespace>,
"in_cluster": False,
}
conn = Connection(conn_id="k8s-conn-id", conn_type="kubernetes", description="k8s connection", extra=extra)
os.environ[f"AIRFLOW_CONN_{conn.conn_id.upper()}"] = conn.get_uri()
# define minimal DAG with deferrable=True and do_xcom_push=True
with DAG( dag_id="k8s-example") as dag:
k = KubernetesPodOperator(
kubernetes_conn_id="k8s-conn-id",
name="k8s-pod",
cmds=["bash", "-cx"],
arguments=["mkdir -p /airflow/xcom/;cat /tmp/mnt/hello.txt > /airflow/xcom/return.json"],
task_id="k8s-pod",
startup_timeout_seconds=1000,
do_xcom_push=True,
pod_template_file="k8s_tempalte.yaml"),
on_finish_action="keep_pod",
deferrable=True
)
# just testing XCOM
b = BashOperator(
bash_command="echo \"{{ task_instance.xcom_pull('k8s-pod')[0] }}\"",
task_id="pod_task_xcom_result",
)
k >> band the manifest
apiVersion: v1
kind: Pod
spec:
initContainers:
- name: init-container
image: "ubuntu:latest"
command: ["bash", "-cx"]
args: ["echo '[1,2,3,4]' > /tmp/mnt/hello.txt"]
resources:
limits:
cpu: 500m
memory: 1Gi
requests:
cpu: 100m
memory: 1Gi
volumeMounts:
- name: shared-volume
mountPath: "/tmp/mnt"
containers:
- name: base
image: "ubuntu:latest"
imagePullPolicy: IfNotPresent
ports: []
resources:
limits:
cpu: 500m
memory: 1Gi
requests:
cpu: 100m
memory: 1Gi
volumeMounts:
- name: shared-volume
mountPath: "/tmp/mnt"
restartPolicy: Never
volumes:
- name: shared-volume
emptyDir: {}Operating System
docker
Versions of Apache Airflow Providers
apache-airflow-providers-cncf-kubernetes==8.4.0
apache-airflow-providers-microsoft-azure==10.3.0
Deployment
Docker-Compose
Deployment details
I am running Docker locally with the official docker-compose and deploying pods on Azure Kubernetes Services
Anything else?
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct