-
Notifications
You must be signed in to change notification settings - Fork 16.5k
Closed
Labels
Description
Apache Airflow version
2.3.0 (latest released)
What happened
When performing dynamic task mapping and providing Kubernetes limits to the resources argument, the DAG raises an import error:
Broken DAG: [/opt/airflow/dags/bulk_image_processing.py] Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 287, in partial
partial_kwargs["resources"] = coerce_resources(partial_kwargs["resources"])
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 133, in coerce_resources
return Resources(**resources)
TypeError: __init__() got an unexpected keyword argument 'limit_cpu'
The offending code is:
KubernetesPodOperator.partial(
get_logs: True,
in_cluster: True,
is_delete_operator_pod: True,
namespace: settings.namespace,
resources={'limit_cpu': settings.IMAGE_PROCESSING_OPERATOR_CPU, 'limit_memory': settings.IMAGE_PROCESSING_OPERATOR_MEMORY},
service_account_name: settings.SERVICE_ACCOUNT_NAME,
startup_timeout_seconds: 600,
**kwargs,
)
But you can see this in any DAG utilizing a KubernetesPodOperator.partial where the partial contains the resources argument.
What you think should happen instead
The resources argument should be taken at face value and applied to the OperatorPartial and subsequently the MappedOperator.
How to reproduce
Try to import this DAG using Airflow 2.3.0:
from datetime import datetime
from airflow import XComArg
from airflow.models import DAG
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
def make_operator(
**kwargs
):
return KubernetesPodOperator(
**{
'get_logs': True,
'in_cluster': True,
'is_delete_operator_pod': True,
'namespace': 'default',
'startup_timeout_seconds': 600,
**kwargs,
}
)
def make_partial_operator(
**kwargs
):
return KubernetesPodOperator.partial(
**{
'get_logs': True,
'in_cluster': True,
'is_delete_operator_pod': True,
'namespace': 'default',
'startup_timeout_seconds': 600,
**kwargs,
}
)
with DAG(dag_id='bulk_image_processing',
schedule_interval=None,
start_date=datetime(2020, 1, 1),
max_active_tasks=20) as dag:
op1 = make_operator(
arguments=['--bucket-name', f'{{{{ dag_run.conf.get("bucket", "some-fake-default") }}}}'],
cmds=['python3', 'some_entrypoint'],
image='some-image',
name='airflow-private-image-pod-1',
task_id='some-task',
do_xcom_push=True
)
op2 = make_partial_operator(
image='another-image',
name=f'airflow-private-image-pod-2',
resources={'limit_cpu': '2000m', 'limit_memory': '16Gi'},
task_id='another-task',
cmds=[
'some',
'set',
'of',
'cmds'
]
).expand(arguments=XComArg(op1))
Operating System
MacOS 11.6.5
Versions of Apache Airflow Providers
Relevant:
apache-airflow-providers-cncf-kubernetes==4.0.1
Full:
apache-airflow-providers-amazon==3.3.0
apache-airflow-providers-celery==2.1.4
apache-airflow-providers-cncf-kubernetes==4.0.1
apache-airflow-providers-docker==2.6.0
apache-airflow-providers-elasticsearch==3.0.3
apache-airflow-providers-ftp==2.1.2
apache-airflow-providers-google==6.8.0
apache-airflow-providers-grpc==2.0.4
apache-airflow-providers-hashicorp==2.2.0
apache-airflow-providers-http==2.1.2
apache-airflow-providers-imap==2.2.3
apache-airflow-providers-microsoft-azure==3.8.0
apache-airflow-providers-mysql==2.2.3
apache-airflow-providers-odbc==2.0.4
apache-airflow-providers-postgres==4.1.0
apache-airflow-providers-redis==2.0.4
apache-airflow-providers-sendgrid==2.0.4
apache-airflow-providers-sftp==2.6.0
apache-airflow-providers-slack==4.2.3
apache-airflow-providers-sqlite==2.1.3
apache-airflow-providers-ssh==2.4.3
Deployment
Official Apache Airflow Helm Chart
Deployment details
Docker (Docker Desktop)
- Server Version: 20.10.13
- API Version: 1.41
- Builder: 2
Kubernetes (Docker Desktop)
- Env: docker-desktop
- Context: docker-desktop
- Cluster Name: docker-desktop
- Namespace: default
- Container Runtime: docker
- Version: v1.22.5
Helm:
- version.BuildInfo{Version:"v3.6.3", GitCommit:"d506314abfb5d21419df8c7e7e68012379db2354", GitTreeState:"dirty", GoVersion:"go1.16.5"}
Anything else
You can get around this by creating the partial first without calling expand on it, setting the resources via the kwargs parameter, then calling expand. Example:
from datetime import datetime
from airflow import XComArg
from airflow.models import DAG
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
def make_operator(
**kwargs
):
return KubernetesPodOperator(
**{
'get_logs': True,
'in_cluster': True,
'is_delete_operator_pod': True,
'namespace': 'default',
'startup_timeout_seconds': 600,
**kwargs,
}
)
def make_partial_operator(
**kwargs
):
return KubernetesPodOperator.partial(
**{
'get_logs': True,
'in_cluster': True,
'is_delete_operator_pod': True,
'namespace': 'default',
'startup_timeout_seconds': 600,
**kwargs,
}
)
with DAG(dag_id='bulk_image_processing',
schedule_interval=None,
start_date=datetime(2020, 1, 1),
max_active_tasks=20) as dag:
op1 = make_operator(
arguments=['--bucket-name', f'{{{{ dag_run.conf.get("bucket", "some-fake-default") }}}}'],
cmds=['python3', 'some_entrypoint'],
image='some-image',
name='airflow-private-image-pod-1',
task_id='some-task',
do_xcom_push=True
)
op2 = make_partial_operator(
image='another-image',
name=f'airflow-private-image-pod-2',
task_id='another-task',
cmds=[
'some',
'set',
'of',
'cmds'
]
)
op2.kwargs['resources'] = {'limit_cpu': '2000m', 'limit_memory': '16Gi'}
op2.expand(arguments=XComArg(op1))
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct
Reactions are currently unavailable