Skip to content

KubernetesExecutor do not spawn Kubernetes Pod Operator pods. #17629

@SimonOsipov

Description

@SimonOsipov

Apache Airflow version: 2.1.2 (but happened on 2.0.2 as well)

OS: debian

Apache Airflow Provider versions:

apache-airflow-providers-celery==2.0.0
apache-airflow-providers-cncf-kubernetes==2.0.1
apache-airflow-providers-docker==2.1.0
apache-airflow-providers-elasticsearch==2.0.2
apache-airflow-providers-ftp==2.0.0
apache-airflow-providers-google==5.0.0
apache-airflow-providers-hashicorp==2.0.0
apache-airflow-providers-http==2.0.0
apache-airflow-providers-imap==2.0.0
apache-airflow-providers-mysql==2.1.0
apache-airflow-providers-postgres==2.0.0
apache-airflow-providers-redis==2.0.0
apache-airflow-providers-slack==4.0.0
apache-airflow-providers-sqlite==2.0.0
apache-airflow-providers-ssh==2.1.0

Deployment:

GKE

Client Version: version.Info{Major:"1", Minor:"20", GitVersion:"v1.20.7", GitCommit:"132a687512d7fb058d0f5890f07d4121b3f0a2e2", GitTreeState:"clean", BuildDate:"2021-05-12T12:40:09Z", GoVersion:"go1.15.12", Compiler:"gc", Platform:"darwin/amd64"}
Server Version: version.Info{Major:"1", Minor:"20+", GitVersion:"v1.20.8-gke.900", GitCommit:"28ab8501be88ea42e897ca8514d7cd0b436253d9", GitTreeState:"clean", BuildDate:"2021-06-30T09:23:36Z", GoVersion:"go1.15.13b5", Compiler:"gc", Platform:"linux/amd64"}

What happened:
Airflow was working fine, but out of blue, stopped working properly.
Basically, DAG starts and then tasks first task fail, but they were running okay a week ago. KubernetesExecutor stopped spawning KubernetesPodOperators and there are no logs or errors. If I run directly (kubectl apply -f) template I use for Operator, it runs successfully.
Airflow utilize the same service account for Operator and Executor, so if Executor created successfully, that means no problem with a service account
Also, since DAG syncs successfully, no problem with git-sync.
kubectl get events does not show any attempts to create any pods for the task
dag_processor_manager.log also free of any errors.

So, Executor successfully created and proceeded to Complete and no Pod Operator was created.

Executor

apiVersion: v1
kind: Pod
metadata:
  ...
spec:
  restartPolicy: Never
  serviceAccountName: airflow # this account have rights to create pods
  automountServiceAccountToken: true
  volumes:
      - name: dags
        emptyDir: {}
      - name: logs
        emptyDir: {}
      - configMap:
          name: airflow-git-sync-configmap
        name: airflow-git-sync-configmap
  initContainers:
    - name: git-sync-clone
      securityContext:
        runAsUser: 65533
        runAsGroup: 65533
      image: k8s.gcr.io/git-sync/git-sync:v3.3.1
      imagePullPolicy: Always
      volumeMounts:
        - mountPath: /tmp/git
          name: dags
      resources:
         ...
      args: ["--one-time"]
      envFrom:
      - configMapRef:
          name: airflow-git-sync-configmap
      - secretRef:
          name: airflow-git-sync-secret
  containers:
    - name: base
      image: <artifactory_url>/airflow:latest
      volumeMounts:
        - name: dags
          mountPath: /opt/airflow/dags
        - name: logs
          mountPath: /opt/airflow/logs
      imagePullPolicy: Always

Pod template

apiVersion: v1
kind: Pod
metadata:
   ....
spec:
  serviceAccountName: airflow
  automountServiceAccountToken: true
  volumes:
    - name: sql
      emptyDir: {}
  initContainers:
    - name: git-sync
      image: k8s.gcr.io/git-sync/git-sync:v3.3.1
      imagePullPolicy: Always
      args: ["--one-time"]
      volumeMounts:
        - name: sql
          mountPath: /tmp/git/
      resources:
        requests:
          memory: 300Mi
          cpu: 500m
        limits:
          memory: 600Mi
          cpu: 1000m
      envFrom:
      - configMapRef:
          name: git-sync-configmap
      - secretRef:
          name: airflow-git-sync-secret
  containers:
    - name: base
      imagePullPolicy: Always
      image: <artifactory_url>/clickhouse-client-gcloud:20.6.4.44
      volumeMounts:
        - name: sql
          mountPath: /opt/sql
      resources:
         ....
      env:
        - name: GS_SERVICE_ACCOUNT
          valueFrom:
            secretKeyRef:
              name: gs-service-account
              key: service_account.json
        - name: DB_CREDENTIALS
          valueFrom:
            secretKeyRef:
              name: estimation-db-secret
              key: db_cred.json

DAG

from textwrap import dedent

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator

TEMPLATE_PATH = "/opt/airflow/dags/airflow-dags.git/pod_templates"

args = {
    ...
}


def create_pipeline(dag_: DAG):

    task_startup_client = KubernetesPodOperator(
        name="clickhouse-client",
        task_id="clickhouse-client",
        labels={"application": "clickhouse-client-gsutil"},
        pod_template_file=f"{TEMPLATE_PATH}/template.yaml",
        cmds=["sleep", "60000"],
        reattach_on_restart=True,
        is_delete_operator_pod=False,
        get_logs=True,
        log_events_on_failure=True,
        dag=dag_,
    )
  
    task_startup_client


with DAG(
    dag_id="MANUAL-GKE-clickhouse-client",
    default_args=args,
    schedule_interval=None,
    max_active_runs=1,
    start_date=days_ago(2),
    tags=["utility"],
) as dag:

    create_pipeline(dag)

In scheduler:

...
Event: manualgkeclickhouseclientaticlickhouseclient.9959fa1fd13a4b6fbdaf40549a09d2f9 Succeeded
...

Executor logs:

[2021-08-15 18:40:27,045] {settings.py:208} DEBUG - Setting up DB connection pool (PID 1)
[2021-08-15 18:40:27,046] {settings.py:276} DEBUG - settings.prepare_engine_args(): Using pool settings. pool_size=5, max_overflow=10, pool_recycle=1800, pid=1
[2021-08-15 18:40:27,095] {cli_action_loggers.py:40} DEBUG - Adding <function default_action_log at 0x7f0556c5e280> to pre execution callback
[2021-08-15 18:40:28,070] {cli_action_loggers.py:66} DEBUG - Calling callbacks: [<function default_action_log at 0x7f0556c5e280>]
[2021-08-15 18:40:28,106] {settings.py:208} DEBUG - Setting up DB connection pool (PID 1)
[2021-08-15 18:40:28,107] {settings.py:244} DEBUG - settings.prepare_engine_args(): Using NullPool
[2021-08-15 18:40:28,109] {dagbag.py:496} INFO - Filling up the DagBag from /opt/airflow/dags/ati-airflow-dags.git/dag_clickhouse-client.py
[2021-08-15 18:40:28,110] {dagbag.py:311} DEBUG - Importing /opt/airflow/dags/ati-airflow-dags.git/dag_clickhouse-client.py
/usr/local/lib/python3.9/site-packages/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py:26 DeprecationWarning: This module is deprecated. Please use `kubernetes.client.models.V1Volume`.
/usr/local/lib/python3.9/site-packages/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py:27 DeprecationWarning: This module is deprecated. Please use `kubernetes.client.models.V1VolumeMount`.
[2021-08-15 18:40:28,135] {dagbag.py:461} DEBUG - Loaded DAG <DAG: MANUAL-GKE-clickhouse-client>
[2021-08-15 18:40:28,176] {plugins_manager.py:281} DEBUG - Loading plugins
[2021-08-15 18:40:28,176] {plugins_manager.py:225} DEBUG - Loading plugins from directory: /opt/airflow/plugins
[2021-08-15 18:40:28,177] {plugins_manager.py:205} DEBUG - Loading plugins from entrypoints
[2021-08-15 18:40:28,238] {plugins_manager.py:418} DEBUG - Integrate DAG plugins
Running <TaskInstance: MANUAL-GKE-clickhouse-client.clickhouse-client 2021-08-15T18:39:38.150950+00:00 [queued]> on host manualgkeclickhouseclientclickhouseclient.9959fa1fd13a4b6fbd
[2021-08-15 18:40:28,670] {cli_action_loggers.py:84} DEBUG - Calling callbacks: []
[2021-08-15 18:40:28,670] {settings.py:302} DEBUG - Disposing DB connection pool (PID 1)

What you expected to happen:
Expect to Executor to create a pod for Kubernetes Pod Operator or throw any type of error.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions