Skip to content

Commit 7ef6354

Browse files
dimbermankaxil
authored andcommitted
Fix issue with empty Resources in executor_config (apache#12633)
Fixes an issue where if a user specifies a request but not a limit in resources for the executor_config, backwards compat can not parse (cherry picked from commit 84eecf94bab1a8c66b5161f03c6631448fb4850e) (cherry picked from commit 74e02477bdd83880c2bc2b57ded36ef730dcacbb)
1 parent 56e4468 commit 7ef6354

File tree

2 files changed

+152
-92
lines changed

2 files changed

+152
-92
lines changed

airflow/contrib/kubernetes/pod.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -250,8 +250,8 @@ def _extract_ports(ports):
250250

251251
def _extract_resources(resources):
252252
if isinstance(resources, k8s.V1ResourceRequirements):
253-
requests = resources.requests
254-
limits = resources.limits
253+
requests = resources.requests or {}
254+
limits = resources.limits or {}
255255
return Resources(
256256
request_memory=requests.get('memory', None),
257257
request_cpu=requests.get('cpu', None),

tests/kubernetes/models/test_pod.py

Lines changed: 150 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -19,71 +19,97 @@
1919
from kubernetes.client import ApiClient
2020
import kubernetes.client.models as k8s
2121
from airflow.kubernetes.pod import Port
22+
from airflow.kubernetes.pod import Resources
23+
from airflow.contrib.kubernetes.pod import _extract_resources
2224
from airflow.kubernetes.pod_generator import PodGenerator
2325
from airflow.kubernetes.k8s_model import append_to_pod
2426

2527

2628
class TestPod(unittest.TestCase):
29+
def test_extract_resources(self):
30+
res = _extract_resources(k8s.V1ResourceRequirements())
31+
self.assertEqual(
32+
res.to_k8s_client_obj().to_dict(), Resources().to_k8s_client_obj().to_dict()
33+
)
34+
res = _extract_resources(k8s.V1ResourceRequirements(limits={"memory": "1G"}))
35+
self.assertEqual(
36+
res.to_k8s_client_obj().to_dict(),
37+
Resources(limit_memory="1G").to_k8s_client_obj().to_dict(),
38+
)
39+
res = _extract_resources(k8s.V1ResourceRequirements(requests={"memory": "1G"}))
40+
self.assertEqual(
41+
res.to_k8s_client_obj().to_dict(),
42+
Resources(request_memory="1G").to_k8s_client_obj().to_dict(),
43+
)
44+
res = _extract_resources(
45+
k8s.V1ResourceRequirements(
46+
limits={"memory": "1G"}, requests={"memory": "1G"}
47+
)
48+
)
49+
self.assertEqual(
50+
res.to_k8s_client_obj().to_dict(),
51+
Resources(limit_memory="1G", request_memory="1G")
52+
.to_k8s_client_obj()
53+
.to_dict(),
54+
)
2755

2856
def test_port_to_k8s_client_obj(self):
29-
port = Port('http', 80)
57+
port = Port("http", 80)
3058
self.assertEqual(
3159
port.to_k8s_client_obj(),
32-
k8s.V1ContainerPort(
33-
name='http',
34-
container_port=80
35-
)
60+
k8s.V1ContainerPort(name="http", container_port=80),
3661
)
3762

38-
@mock.patch('uuid.uuid4')
63+
@mock.patch("uuid.uuid4")
3964
def test_port_attach_to_pod(self, mock_uuid):
4065
import uuid
41-
static_uuid = uuid.UUID('cf4a56d2-8101-4217-b027-2af6216feb48')
66+
67+
static_uuid = uuid.UUID("cf4a56d2-8101-4217-b027-2af6216feb48")
4268
mock_uuid.return_value = static_uuid
43-
pod = PodGenerator(image='airflow-worker:latest', name='base').gen_pod()
44-
ports = [
45-
Port('https', 443),
46-
Port('http', 80)
47-
]
69+
pod = PodGenerator(image="airflow-worker:latest", name="base").gen_pod()
70+
ports = [Port("https", 443), Port("http", 80)]
4871
k8s_client = ApiClient()
4972
result = append_to_pod(pod, ports)
5073
result = k8s_client.sanitize_for_serialization(result)
51-
self.assertEqual({
52-
'apiVersion': 'v1',
53-
'kind': 'Pod',
54-
'metadata': {'name': 'base-' + static_uuid.hex},
55-
'spec': {
56-
'containers': [{
57-
'args': [],
58-
'command': [],
59-
'env': [],
60-
'envFrom': [],
61-
'image': 'airflow-worker:latest',
62-
'name': 'base',
63-
'ports': [{
64-
'name': 'https',
65-
'containerPort': 443
66-
}, {
67-
'name': 'http',
68-
'containerPort': 80
69-
}],
70-
'volumeMounts': [],
71-
}],
72-
'hostNetwork': False,
73-
'imagePullSecrets': [],
74-
'volumes': []
75-
}
76-
}, result)
74+
self.assertEqual(
75+
{
76+
"apiVersion": "v1",
77+
"kind": "Pod",
78+
"metadata": {"name": "base-" + static_uuid.hex},
79+
"spec": {
80+
"containers": [
81+
{
82+
"args": [],
83+
"command": [],
84+
"env": [],
85+
"envFrom": [],
86+
"image": "airflow-worker:latest",
87+
"name": "base",
88+
"ports": [
89+
{"name": "https", "containerPort": 443},
90+
{"name": "http", "containerPort": 80},
91+
],
92+
"volumeMounts": [],
93+
}
94+
],
95+
"hostNetwork": False,
96+
"imagePullSecrets": [],
97+
"volumes": [],
98+
},
99+
},
100+
result,
101+
)
77102

78-
@mock.patch('uuid.uuid4')
103+
@mock.patch("uuid.uuid4")
79104
def test_to_v1_pod(self, mock_uuid):
80105
from airflow.contrib.kubernetes.pod import Pod as DeprecatedPod
81106
from airflow.kubernetes.volume import Volume
82107
from airflow.kubernetes.volume_mount import VolumeMount
83108
from airflow.kubernetes.secret import Secret
84109
from airflow.kubernetes.pod import Resources
85110
import uuid
86-
static_uuid = uuid.UUID('cf4a56d2-8101-4217-b027-2af6216feb48')
111+
112+
static_uuid = uuid.UUID("cf4a56d2-8101-4217-b027-2af6216feb48")
87113
mock_uuid.return_value = static_uuid
88114

89115
pod = DeprecatedPod(
@@ -94,72 +120,106 @@ def test_to_v1_pod(self, mock_uuid):
94120
envs={"test_key": "test_value"},
95121
cmds=["airflow"],
96122
resources=Resources(
97-
request_memory="1G",
98-
request_cpu="100Mi",
99-
limit_gpu="100G"
123+
request_memory="1G", request_cpu="100Mi", limit_gpu="100G"
100124
),
101125
init_containers=k8s.V1Container(
102126
name="test-container",
103-
volume_mounts=k8s.V1VolumeMount(mount_path="/foo/bar", name="init-volume-secret")
127+
volume_mounts=k8s.V1VolumeMount(
128+
mount_path="/foo/bar", name="init-volume-secret"
129+
),
104130
),
105131
volumes=[
106132
Volume(name="foo", configs={}),
107-
{"name": "bar", 'secret': {'secretName': 'volume-secret'}}
133+
{"name": "bar", "secret": {"secretName": "volume-secret"}},
108134
],
109135
secrets=[
110136
Secret("volume", None, "init-volume-secret"),
111-
Secret('env', "AIRFLOW_SECRET", 'secret_name', "airflow_config"),
112-
Secret("volume", "/opt/airflow", "volume-secret", "secret-key")
137+
Secret("env", "AIRFLOW_SECRET", "secret_name", "airflow_config"),
138+
Secret("volume", "/opt/airflow", "volume-secret", "secret-key"),
139+
],
140+
volume_mounts=[
141+
VolumeMount(name="foo", mount_path="/mnt", sub_path="/", read_only=True)
113142
],
114-
volume_mounts=[VolumeMount(name="foo", mount_path="/mnt", sub_path="/", read_only=True)]
115143
)
116144

117145
k8s_client = ApiClient()
118146

119147
result = pod.to_v1_kubernetes_pod()
120148
result = k8s_client.sanitize_for_serialization(result)
121149

122-
expected = \
123-
{'metadata': {'annotations': {},
124-
'labels': {},
125-
'name': 'bar',
126-
'namespace': 'baz'},
127-
'spec': {'affinity': {},
128-
'containers': [{'args': [],
129-
'command': ['airflow'],
130-
'env': [{'name': 'test_key', 'value': 'test_value'},
131-
{'name': 'AIRFLOW_SECRET',
132-
'valueFrom': {'secretKeyRef': {'key': 'airflow_config',
133-
'name': 'secret_name'}}}],
134-
'envFrom': [],
135-
'image': 'foo',
136-
'imagePullPolicy': 'Never',
137-
'name': 'base',
138-
'resources': {'limits': {'nvidia.com/gpu': '100G'},
139-
'requests': {'cpu': '100Mi',
140-
'memory': '1G'}},
141-
'volumeMounts': [{'mountPath': '/mnt',
142-
'name': 'foo',
143-
'readOnly': True,
144-
'subPath': '/'},
145-
{'mountPath': '/opt/airflow',
146-
'name': 'secretvol' + str(static_uuid),
147-
'readOnly': True}]}],
148-
'hostNetwork': False,
149-
'imagePullSecrets': [],
150-
'initContainers': {'name': 'test-container',
151-
'volumeMounts': {'mountPath': '/foo/bar',
152-
'name': 'init-volume-secret'}},
153-
'nodeSelector': {},
154-
'securityContext': {},
155-
'tolerations': [],
156-
'volumes': [{'name': 'foo'},
157-
{'name': 'bar',
158-
'secret': {'secretName': 'volume-secret'}},
159-
{'name': 'secretvol' + str(static_uuid),
160-
'secret': {'secretName': 'init-volume-secret'}},
161-
{'name': 'secretvol' + str(static_uuid),
162-
'secret': {'secretName': 'volume-secret'}}
163-
]}}
150+
expected = {
151+
"metadata": {
152+
"annotations": {},
153+
"labels": {},
154+
"name": "bar",
155+
"namespace": "baz",
156+
},
157+
"spec": {
158+
"affinity": {},
159+
"containers": [
160+
{
161+
"args": [],
162+
"command": ["airflow"],
163+
"env": [
164+
{"name": "test_key", "value": "test_value"},
165+
{
166+
"name": "AIRFLOW_SECRET",
167+
"valueFrom": {
168+
"secretKeyRef": {
169+
"key": "airflow_config",
170+
"name": "secret_name",
171+
}
172+
},
173+
},
174+
],
175+
"envFrom": [],
176+
"image": "foo",
177+
"imagePullPolicy": "Never",
178+
"name": "base",
179+
"resources": {
180+
"limits": {"nvidia.com/gpu": "100G"},
181+
"requests": {"cpu": "100Mi", "memory": "1G"},
182+
},
183+
"volumeMounts": [
184+
{
185+
"mountPath": "/mnt",
186+
"name": "foo",
187+
"readOnly": True,
188+
"subPath": "/",
189+
},
190+
{
191+
"mountPath": "/opt/airflow",
192+
"name": "secretvol" + str(static_uuid),
193+
"readOnly": True,
194+
},
195+
],
196+
}
197+
],
198+
"hostNetwork": False,
199+
"imagePullSecrets": [],
200+
"initContainers": {
201+
"name": "test-container",
202+
"volumeMounts": {
203+
"mountPath": "/foo/bar",
204+
"name": "init-volume-secret",
205+
},
206+
},
207+
"nodeSelector": {},
208+
"securityContext": {},
209+
"tolerations": [],
210+
"volumes": [
211+
{"name": "foo"},
212+
{"name": "bar", "secret": {"secretName": "volume-secret"}},
213+
{
214+
"name": "secretvol" + str(static_uuid),
215+
"secret": {"secretName": "init-volume-secret"},
216+
},
217+
{
218+
"name": "secretvol" + str(static_uuid),
219+
"secret": {"secretName": "volume-secret"},
220+
},
221+
],
222+
},
223+
}
164224
self.maxDiff = None
165225
self.assertEqual(expected, result)

0 commit comments

Comments
 (0)