Skip to content

Partial of a KubernetesPodOperator does not allow for limit_cpu and limit_memory in the resources argument #23783

@kevin-woodward

Description

@kevin-woodward

Apache Airflow version

2.3.0 (latest released)

What happened

When performing dynamic task mapping and providing Kubernetes limits to the resources argument, the DAG raises an import error:

Broken DAG: [/opt/airflow/dags/bulk_image_processing.py] Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 287, in partial
    partial_kwargs["resources"] = coerce_resources(partial_kwargs["resources"])
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 133, in coerce_resources
    return Resources(**resources)
TypeError: __init__() got an unexpected keyword argument 'limit_cpu'

The offending code is:

KubernetesPodOperator.partial(
            get_logs: True,
            in_cluster: True,
            is_delete_operator_pod: True,
            namespace: settings.namespace,
            resources={'limit_cpu': settings.IMAGE_PROCESSING_OPERATOR_CPU, 'limit_memory': settings.IMAGE_PROCESSING_OPERATOR_MEMORY},
            service_account_name: settings.SERVICE_ACCOUNT_NAME,
            startup_timeout_seconds: 600,
            **kwargs,
    )

But you can see this in any DAG utilizing a KubernetesPodOperator.partial where the partial contains the resources argument.

What you think should happen instead

The resources argument should be taken at face value and applied to the OperatorPartial and subsequently the MappedOperator.

How to reproduce

Try to import this DAG using Airflow 2.3.0:

from datetime import datetime
from airflow import XComArg
from airflow.models import DAG
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator


def make_operator(
    **kwargs
):
    return KubernetesPodOperator(
        **{
            'get_logs': True,
            'in_cluster': True,
            'is_delete_operator_pod': True,
            'namespace': 'default',
            'startup_timeout_seconds': 600,
            **kwargs,
        }
    )


def make_partial_operator(
    **kwargs
):
    return KubernetesPodOperator.partial(
        **{
            'get_logs': True,
            'in_cluster': True,
            'is_delete_operator_pod': True,
            'namespace': 'default',
            'startup_timeout_seconds': 600,
            **kwargs,
        }
    )


with DAG(dag_id='bulk_image_processing',
         schedule_interval=None,
         start_date=datetime(2020, 1, 1),
         max_active_tasks=20) as dag:

    op1 = make_operator(
        arguments=['--bucket-name', f'{{{{ dag_run.conf.get("bucket", "some-fake-default") }}}}'],
        cmds=['python3', 'some_entrypoint'],
        image='some-image',
        name='airflow-private-image-pod-1',
        task_id='some-task',
        do_xcom_push=True
    )

    op2 = make_partial_operator(
        image='another-image',
        name=f'airflow-private-image-pod-2',
        resources={'limit_cpu': '2000m', 'limit_memory': '16Gi'},
        task_id='another-task',
        cmds=[
            'some',
            'set',
            'of',
            'cmds'
        ]
    ).expand(arguments=XComArg(op1))

Operating System

MacOS 11.6.5

Versions of Apache Airflow Providers

Relevant:

apache-airflow-providers-cncf-kubernetes==4.0.1

Full:

apache-airflow-providers-amazon==3.3.0
apache-airflow-providers-celery==2.1.4
apache-airflow-providers-cncf-kubernetes==4.0.1
apache-airflow-providers-docker==2.6.0
apache-airflow-providers-elasticsearch==3.0.3
apache-airflow-providers-ftp==2.1.2
apache-airflow-providers-google==6.8.0
apache-airflow-providers-grpc==2.0.4
apache-airflow-providers-hashicorp==2.2.0
apache-airflow-providers-http==2.1.2
apache-airflow-providers-imap==2.2.3
apache-airflow-providers-microsoft-azure==3.8.0
apache-airflow-providers-mysql==2.2.3
apache-airflow-providers-odbc==2.0.4
apache-airflow-providers-postgres==4.1.0
apache-airflow-providers-redis==2.0.4
apache-airflow-providers-sendgrid==2.0.4
apache-airflow-providers-sftp==2.6.0
apache-airflow-providers-slack==4.2.3
apache-airflow-providers-sqlite==2.1.3
apache-airflow-providers-ssh==2.4.3

Deployment

Official Apache Airflow Helm Chart

Deployment details

Docker (Docker Desktop)

  • Server Version: 20.10.13
  • API Version: 1.41
  • Builder: 2

Kubernetes (Docker Desktop)

  • Env: docker-desktop
  • Context: docker-desktop
  • Cluster Name: docker-desktop
  • Namespace: default
  • Container Runtime: docker
  • Version: v1.22.5

Helm:

  • version.BuildInfo{Version:"v3.6.3", GitCommit:"d506314abfb5d21419df8c7e7e68012379db2354", GitTreeState:"dirty", GoVersion:"go1.16.5"}

Anything else

You can get around this by creating the partial first without calling expand on it, setting the resources via the kwargs parameter, then calling expand. Example:

from datetime import datetime
from airflow import XComArg
from airflow.models import DAG
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator


def make_operator(
    **kwargs
):
    return KubernetesPodOperator(
        **{
            'get_logs': True,
            'in_cluster': True,
            'is_delete_operator_pod': True,
            'namespace': 'default',
            'startup_timeout_seconds': 600,
            **kwargs,
        }
    )


def make_partial_operator(
    **kwargs
):
    return KubernetesPodOperator.partial(
        **{
            'get_logs': True,
            'in_cluster': True,
            'is_delete_operator_pod': True,
            'namespace': 'default',
            'startup_timeout_seconds': 600,
            **kwargs,
        }
    )


with DAG(dag_id='bulk_image_processing',
         schedule_interval=None,
         start_date=datetime(2020, 1, 1),
         max_active_tasks=20) as dag:

    op1 = make_operator(
        arguments=['--bucket-name', f'{{{{ dag_run.conf.get("bucket", "some-fake-default") }}}}'],
        cmds=['python3', 'some_entrypoint'],
        image='some-image',
        name='airflow-private-image-pod-1',
        task_id='some-task',
        do_xcom_push=True
    )

    op2 = make_partial_operator(
        image='another-image',
        name=f'airflow-private-image-pod-2',
        task_id='another-task',
        cmds=[
            'some',
            'set',
            'of',
            'cmds'
        ]
    )
    
    op2.kwargs['resources'] = {'limit_cpu': '2000m', 'limit_memory': '16Gi'} 

    op2.expand(arguments=XComArg(op1))

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions