Skip to content

Commit 7bd165f

Browse files
authored
Remove RefreshConfiguration workaround for K8s token refreshing (#20759)
A workaround was added (#5731) to handle the refreshing of EKS tokens. It was necessary because of an upstream bug. It has since been fixed (kubernetes-client/python-base@70b78cd) and released in v21.7.0 (https://github.com/kubernetes-client/python/blob/master/CHANGELOG.md#v2170).
1 parent 6d1d53b commit 7bd165f

File tree

7 files changed

+26
-287
lines changed

7 files changed

+26
-287
lines changed

UPDATING.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,10 @@ https://developers.google.com/style/inclusive-documentation
8181
8282
-->
8383

84+
### Minimum kubernetes version bumped from 3.0.0 to 21.7.0
85+
86+
No change in behavior is expected. This was necessary in order to take advantage of a [bugfix](https://github.com/kubernetes-client/python-base/commit/70b78cd8488068c014b6d762a0c8d358273865b4) concerning refreshing of Kubernetes API tokens with EKS, which enabled the removal of some [workaround code](https://github.com/apache/airflow/pull/20759).
87+
8488
### Deprecation: `Connection.extra` must be JSON-encoded dict
8589

8690
#### TLDR

airflow/kubernetes/kube_client.py

Lines changed: 10 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -25,39 +25,10 @@
2525
try:
2626
from kubernetes import client, config
2727
from kubernetes.client import Configuration
28-
from kubernetes.client.api_client import ApiClient
2928
from kubernetes.client.rest import ApiException
3029

31-
from airflow.kubernetes.refresh_config import RefreshConfiguration, load_kube_config
32-
3330
has_kubernetes = True
3431

35-
def _get_kube_config(
36-
in_cluster: bool, cluster_context: Optional[str], config_file: Optional[str]
37-
) -> Optional[Configuration]:
38-
if in_cluster:
39-
# load_incluster_config set default configuration with config populated by k8s
40-
config.load_incluster_config()
41-
return None
42-
else:
43-
# this block can be replaced with just config.load_kube_config once
44-
# refresh_config module is replaced with upstream fix
45-
cfg = RefreshConfiguration()
46-
load_kube_config(client_configuration=cfg, config_file=config_file, context=cluster_context)
47-
return cfg
48-
49-
def _get_client_with_patched_configuration(cfg: Optional[Configuration]) -> client.CoreV1Api:
50-
"""
51-
This is a workaround for supporting api token refresh in k8s client.
52-
53-
The function can be replace with `return client.CoreV1Api()` once the
54-
upstream client supports token refresh.
55-
"""
56-
if cfg:
57-
return client.CoreV1Api(api_client=ApiClient(configuration=cfg))
58-
else:
59-
return client.CoreV1Api()
60-
6132
def _disable_verify_ssl() -> None:
6233
configuration = Configuration()
6334
configuration.verify_ssl = False
@@ -126,17 +97,19 @@ def get_kube_client(
12697
if not has_kubernetes:
12798
raise _import_err
12899

129-
if not in_cluster:
130-
if cluster_context is None:
131-
cluster_context = conf.get('kubernetes', 'cluster_context', fallback=None)
132-
if config_file is None:
133-
config_file = conf.get('kubernetes', 'config_file', fallback=None)
134-
135100
if conf.getboolean('kubernetes', 'enable_tcp_keepalive'):
136101
_enable_tcp_keepalive()
137102

138103
if not conf.getboolean('kubernetes', 'verify_ssl'):
139104
_disable_verify_ssl()
140105

141-
client_conf = _get_kube_config(in_cluster, cluster_context, config_file)
142-
return _get_client_with_patched_configuration(client_conf)
106+
if in_cluster:
107+
config.load_incluster_config()
108+
else:
109+
if cluster_context is None:
110+
cluster_context = conf.get('kubernetes', 'cluster_context', fallback=None)
111+
if config_file is None:
112+
config_file = conf.get('kubernetes', 'config_file', fallback=None)
113+
config.load_kube_config(config_file=config_file, context=cluster_context)
114+
115+
return client.CoreV1Api()

airflow/kubernetes/refresh_config.py

Lines changed: 0 additions & 124 deletions
This file was deleted.

airflow/providers/cncf/kubernetes/utils/pod_manager.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,7 @@
3838
from airflow.utils.log.logging_mixin import LoggingMixin
3939

4040
if TYPE_CHECKING:
41-
try:
42-
# Kube >= 19
43-
from kubernetes.client.models.core_v1_event_list import CoreV1EventList as V1EventList
44-
except ImportError:
45-
from kubernetes.client.models.v1_event_list import V1EventList
41+
from kubernetes.client.models.core_v1_event_list import CoreV1EventList
4642

4743

4844
class PodLaunchFailedException(AirflowException):
@@ -298,7 +294,7 @@ def read_pod_logs(
298294
raise
299295

300296
@tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True)
301-
def read_pod_events(self, pod: V1Pod) -> "V1EventList":
297+
def read_pod_events(self, pod: V1Pod) -> "CoreV1EventList":
302298
"""Reads events from the POD"""
303299
try:
304300
return self._client.list_namespaced_event(

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -414,7 +414,7 @@ def write_version(filename: str = os.path.join(*[my_dir, "airflow", "git_version
414414
]
415415
kubernetes = [
416416
'cryptography>=2.0.0',
417-
'kubernetes>=3.0.0',
417+
'kubernetes>=21.7.0',
418418
]
419419
kylin = ['kylinpy>=2.6']
420420
ldap = [

tests/kubernetes/test_client.py

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,25 +22,21 @@
2222
from kubernetes.client import Configuration
2323
from urllib3.connection import HTTPConnection, HTTPSConnection
2424

25-
from airflow.kubernetes.kube_client import (
26-
RefreshConfiguration,
27-
_disable_verify_ssl,
28-
_enable_tcp_keepalive,
29-
get_kube_client,
30-
)
25+
from airflow.kubernetes.kube_client import _disable_verify_ssl, _enable_tcp_keepalive, get_kube_client
3126

3227

3328
class TestClient(unittest.TestCase):
3429
@mock.patch('airflow.kubernetes.kube_client.config')
35-
def test_load_cluster_config(self, _):
36-
client = get_kube_client(in_cluster=True)
37-
assert not isinstance(client.api_client.configuration, RefreshConfiguration)
30+
def test_load_cluster_config(self, config):
31+
get_kube_client(in_cluster=True)
32+
assert config.load_incluster_config.called
33+
assert config.load_kube_config.not_called
3834

3935
@mock.patch('airflow.kubernetes.kube_client.config')
40-
@mock.patch('airflow.kubernetes.refresh_config._get_kube_config_loader_for_yaml_file')
41-
def test_load_file_config(self, _, _2):
42-
client = get_kube_client(in_cluster=False)
43-
assert isinstance(client.api_client.configuration, RefreshConfiguration)
36+
def test_load_file_config(self, config):
37+
get_kube_client(in_cluster=False)
38+
assert config.load_incluster_config.not_called
39+
assert config.load_kube_config.called
4440

4541
def test_enable_tcp_keepalive(self):
4642
socket_options = [

tests/kubernetes/test_refresh_config.py

Lines changed: 0 additions & 106 deletions
This file was deleted.

0 commit comments

Comments
 (0)