Skip to content

kubernetes connection defined via env variable does not work with deferrable=True #41706

@mayorblock

Description

@mayorblock

Apache Airflow version

2.10.0

If "Other Airflow 2 version" selected, which one?

No response

What happened?

I try to run a Kubernetes Pod Operator (v8.4.0) with deferrable=True (and do_xcom_push=True which was also an issue in earlier versions but not now). I define my kubernetes cluster connection in the code using airflow.models.connection.Connection and supply it via an environment variable.

After all containers (init-container, sidecar and main container) have completed succesfully I get

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 767, in _execute_task
    result = _execute_callable(context=context, **execute_callable_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 733, in _execute_callable
    return ExecutionCallableRunner(
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/operator_helpers.py", line 252, in run
    return self.func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/baseoperator.py", line 1792, in resume_execution
    return execute_callable(context)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 773, in trigger_reentry
    raise AirflowException(message)
airflow.exceptions.AirflowException: Traceback (most recent call last):
  File "/home/***/.local/lib/python3.12/site-packages/***/providers/cncf/kubernetes/triggers/pod.py", line 162, in run
    state = await self._wait_for_pod_start()
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/***/.local/lib/python3.12/site-packages/***/providers/cncf/kubernetes/triggers/pod.py", line 223, in _wait_for_pod_start
    pod = await self.hook.get_pod(self.pod_name, self.pod_namespace)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/***/.local/lib/python3.12/site-packages/***/providers/cncf/kubernetes/hooks/kubernetes.py", line 747, in get_pod
    async with self.get_conn() as connection:
  File "/usr/local/lib/python3.12/contextlib.py", line 210, in __aenter__
    return await anext(self.gen)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/home/***/.local/lib/python3.12/site-packages/***/providers/cncf/kubernetes/hooks/kubernetes.py", line 734, in get_conn
    kube_client = await self._load_config() or async_client.ApiClient()
                  ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/***/.local/lib/python3.12/site-packages/***/providers/cncf/kubernetes/hooks/kubernetes.py", line 664, in _load_config
    in_cluster = self._coalesce_param(self.in_cluster, await self._get_field("in_cluster"))
                                                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/***/.local/lib/python3.12/site-packages/***/providers/cncf/kubernetes/hooks/kubernetes.py", line 724, in _get_field
    extras = await self.get_conn_extras()
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/***/.local/lib/python3.12/site-packages/***/providers/cncf/kubernetes/hooks/kubernetes.py", line 712, in get_conn_extras
    connection = await sync_to_async(self.get_connection)(self.conn_id)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/***/.local/lib/python3.12/site-packages/asgiref/sync.py", line 468, in __call__
    ret = await asyncio.shield(exec_coro)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/concurrent/futures/thread.py", line 58, in run
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/***/.local/lib/python3.12/site-packages/asgiref/sync.py", line 522, in thread_handler
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/home/***/.local/lib/python3.12/site-packages/***/providers/cncf/kubernetes/hooks/kubernetes.py", line 169, in get_connection
    return super().get_connection(conn_id)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/***/.local/lib/python3.12/site-packages/***/hooks/base.py", line 83, in get_connection
    conn = Connection.get_connection_from_secrets(conn_id)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/***/.local/lib/python3.12/site-packages/***/models/connection.py", line 537, in get_connection_from_secrets
    raise AirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined")
***.exceptions.AirflowNotFoundException: The conn_id `k8s-conn-id` isn't defined

Now if I:

  • set deferrable=False it works with environment variable connection
  • make an explicit k8s cluster connection in the UI, then deferrable=True works
image

What you think should happen instead?

supplying connection via environment variable as described in docs should work exactly like connections via UI

How to reproduce

make a kubernetes cluster connection

# create connection
import os
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
    KubernetesPodOperator,
)

extra = {
        "kube_config": <kube_config to your cluser>,
        "namespace": <your namespace>,  
        "in_cluster": False,
    }
conn = Connection(conn_id="k8s-conn-id", conn_type="kubernetes", description="k8s connection", extra=extra)
os.environ[f"AIRFLOW_CONN_{conn.conn_id.upper()}"] = conn.get_uri()

# define minimal DAG with  deferrable=True and do_xcom_push=True
with DAG( dag_id="k8s-example") as dag:
    k = KubernetesPodOperator(
        kubernetes_conn_id="k8s-conn-id",
        name="k8s-pod",
        cmds=["bash", "-cx"],
        arguments=["mkdir -p /airflow/xcom/;cat /tmp/mnt/hello.txt > /airflow/xcom/return.json"],
        task_id="k8s-pod",
        startup_timeout_seconds=1000,
        do_xcom_push=True,
        pod_template_file="k8s_tempalte.yaml"),
        on_finish_action="keep_pod", 
        deferrable=True
    )
   
    # just testing XCOM
    b = BashOperator(
        bash_command="echo \"{{ task_instance.xcom_pull('k8s-pod')[0] }}\"",
        task_id="pod_task_xcom_result",
    )

    k >> b

and the manifest

apiVersion: v1
kind: Pod
spec:
  initContainers:
    - name: init-container
      image: "ubuntu:latest"
      command: ["bash", "-cx"]
      args: ["echo '[1,2,3,4]' > /tmp/mnt/hello.txt"]
      resources:
        limits:
          cpu: 500m
          memory: 1Gi
        requests:
          cpu: 100m
          memory: 1Gi
      volumeMounts:
        - name: shared-volume
          mountPath: "/tmp/mnt"
  containers:
    - name: base
      image: "ubuntu:latest"
      imagePullPolicy: IfNotPresent
      ports: []
      resources:
        limits:
          cpu: 500m
          memory: 1Gi
        requests:
          cpu: 100m
          memory: 1Gi
      volumeMounts:
        - name: shared-volume
          mountPath: "/tmp/mnt"
  restartPolicy: Never
  volumes:
  - name: shared-volume
    emptyDir: {}

Operating System

docker

Versions of Apache Airflow Providers

apache-airflow-providers-cncf-kubernetes==8.4.0
apache-airflow-providers-microsoft-azure==10.3.0

Deployment

Docker-Compose

Deployment details

I am running Docker locally with the official docker-compose and deploying pods on Azure Kubernetes Services

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions