Skip to content

Conversation

@nailo2c
Copy link
Contributor

@nailo2c nailo2c commented Nov 29, 2025

Closes: #41706

Why

When users define k8s-conn-id via environment variables (e.g., AIRFLOW_CONN_...) and set deferrable=True in KubernetesPodOperator, the following exception occurs in the Triggerer:

AirflowNotFoundException: The conn_id `k8s-conn-id` isn't defined

This happens because the environment variable defining the connection exists in the Worker (where the task starts) but is missing in the Triggerer service (where the task is deferred). The Triggerer attempts to look up the connection ID but fails to find it.

How

This PR resolves the issue by resolving the connection details in the Worker before deferral and passing them explicitly to the Triggerer.

  1. Operator: In KubernetesPodOperator, resolved the connection object in the execute phase (Worker) and extracted the connection extras.
  2. Trigger: Added a connection_extras parameter to KubernetesPodTrigger to accept the resolved connection information.
  3. Hook: Updated AsyncKubernetesHook to accept connection_extras. If provided, the hook uses these extras directly instead of attempting to look up the connection ID from the database or environment variables.

What

I used the following DAG to reproduce the issue and verify the fix in the breeze k8s environment:

import os
from datetime import datetime
from airflow import DAG
from airflow.models.connection import Connection
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from airflow.providers.standard.operators.bash import BashOperator

def _set_k8s_conn_from_env() -> None:
    extra = {
        "in_cluster": True,
        "namespace": "airflow",
    }

    conn = Connection(
        conn_id="k8s-conn-id",
        conn_type="kubernetes",
        description="K8s connection defined only via AIRFLOW_CONN_* env var",
        extra=extra,
    )

    os.environ[f"AIRFLOW_CONN_{conn.conn_id.upper()}"] = conn.get_uri()


_set_k8s_conn_from_env()


with DAG(
    dag_id="k8s_env_deferrable_repro_41706",
    start_date=datetime(2024, 1, 1),
    schedule=None,
    catchup=False,
    tags=["k8s", "deferrable", "env-conn", "issue-41706"],
) as dag:
    k8s_pod = KubernetesPodOperator(
        task_id="k8s_pod_env_conn_deferrable",
        kubernetes_conn_id="k8s-conn-id",
        namespace="airflow",
        name="k8s-env-deferrable-pod",
        image="alpine",
        cmds=["sh", "-c"],
        arguments=["mkdir -p /airflow/xcom/; echo '[1,2,3,4]' > /airflow/xcom/return.json"],
        do_xcom_push=True,
        get_logs=True,
        deferrable=True,
        on_finish_action="keep_pod",
        startup_timeout_seconds=600,
    )

    print_xcom = BashOperator(
        task_id="print_xcom_result",
        bash_command="echo \"XCOM from KPO: {{ ti.xcom_pull('k8s_pod_env_conn_deferrable')[0] }}\"",
    )

    k8s_pod >> print_xcom

Before the fix, we can see the following error in the Airflow logs:

[2025-11-23 10:48:10] DEBUG - xcom result: [1,2,3,4] source=airflow.task.operators.airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator loc=pod.py:654 
[2025-11-23 10:48:10] ERROR - Trigger emitted an error event, failing the task: The conn_id `k8s-conn-id` isn't defined source=airflow.task.operators.airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator loc=pod.py:929 
[2025-11-23 10:48:11] INFO - Pod k8s-env-deferrable-pod-8k5u7y2d has phase Running source=airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager loc=pod_manager.py:763

After the fix, we can see the new log we added in the Triggerer logs, and this error log no longer appears.

defer_example

@boring-cyborg boring-cyborg bot added area:providers provider:cncf-kubernetes Kubernetes (k8s) provider related issues labels Nov 29, 2025
Copy link
Contributor

@Nataneljpwd Nataneljpwd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor nit picks, looks very good.

@nailo2c
Copy link
Contributor Author

nailo2c commented Dec 12, 2025

All the conflicts are resolved. If there is anything that can be improved, please don't hesitate to let me know :D

@romsharon98 romsharon98 merged commit d28c8fe into apache:main Dec 16, 2025
101 checks passed
FoxHelms pushed a commit to FoxHelms/airflow that referenced this pull request Dec 17, 2025
…on (apache#41706) (apache#58841)

* Fix AsyncKubernetesHook when Kubernetes connection is missing (apache#41706)

* fix CI error

* fix CI error apache#2

* Rename `conn_extras` to `connection_extras` for consistency across Kubernetes hooks and operators

* Handle AirflowNotFoundException when resolving connection extras in KubernetesPodOperator
Lohith625 pushed a commit to Lohith625/airflow that referenced this pull request Dec 19, 2025
…on (apache#41706) (apache#58841)

* Fix AsyncKubernetesHook when Kubernetes connection is missing (apache#41706)

* fix CI error

* fix CI error apache#2

* Rename `conn_extras` to `connection_extras` for consistency across Kubernetes hooks and operators

* Handle AirflowNotFoundException when resolving connection extras in KubernetesPodOperator
TempestShaw pushed a commit to TempestShaw/airflow that referenced this pull request Dec 24, 2025
…on (apache#41706) (apache#58841)

* Fix AsyncKubernetesHook when Kubernetes connection is missing (apache#41706)

* fix CI error

* fix CI error apache#2

* Rename `conn_extras` to `connection_extras` for consistency across Kubernetes hooks and operators

* Handle AirflowNotFoundException when resolving connection extras in KubernetesPodOperator
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:providers provider:cncf-kubernetes Kubernetes (k8s) provider related issues

Projects

None yet

Development

Successfully merging this pull request may close these issues.

kubernetes connection defined via env variable does not work with deferrable=True

4 participants