Skip to content

Commit dee05b2

Browse files
authored
Prevent KubernetesJobWatcher getting stuck on resource too old (#23521)
* Prevent KubernetesJobWatcher getting stuck on resource too old If the watch fails because "resource too old" the KubernetesJobWatcher should not retry with the same resource version as that will end up in loop where there is no progress. * Reset ResourceVersion().resource_version to 0
1 parent cfa95af commit dee05b2

File tree

2 files changed

+37
-0
lines changed

2 files changed

+37
-0
lines changed

airflow/executors/kubernetes_executor.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,8 @@ def run(self) -> None:
109109
time.sleep(1)
110110
except Exception:
111111
self.log.exception('Unknown error in KubernetesJobWatcher. Failing')
112+
self.resource_version = "0"
113+
ResourceVersion().resource_version = "0"
112114
raise
113115
else:
114116
self.log.warning(
@@ -288,6 +290,7 @@ def _health_check_kube_watcher(self):
288290
self.log.error(
289291
'Error while health checking kube watcher process. Process died for unknown reasons'
290292
)
293+
ResourceVersion().resource_version = "0"
291294
self.kube_watcher = self._make_kube_watcher()
292295

293296
def run_next(self, next_job: KubernetesJobType) -> None:

tests/executors/test_kubernetes_executor.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
AirflowKubernetesScheduler,
4040
KubernetesExecutor,
4141
KubernetesJobWatcher,
42+
ResourceVersion,
4243
create_pod_id,
4344
get_base_pod_from_template,
4445
)
@@ -957,3 +958,36 @@ def test_process_error_event_for_raise_if_not_410(self):
957958
f"Kubernetes failure for {raw_object['reason']} "
958959
f"with code {raw_object['code']} and message: {raw_object['message']}"
959960
)
961+
962+
def test_recover_from_resource_too_old(self):
963+
# too old resource
964+
mock_underscore_run = mock.MagicMock()
965+
966+
def effect():
967+
yield '500'
968+
while True:
969+
yield Exception('sentinel')
970+
971+
mock_underscore_run.side_effect = effect()
972+
973+
self.watcher._run = mock_underscore_run
974+
975+
with mock.patch('airflow.executors.kubernetes_executor.get_kube_client'):
976+
try:
977+
# self.watcher._run() is mocked and return "500" as last resource_version
978+
self.watcher.run()
979+
except Exception as e:
980+
assert e.args == ('sentinel',)
981+
982+
# both resource_version should be 0 after _run raises and exception
983+
assert self.watcher.resource_version == '0'
984+
assert ResourceVersion().resource_version == '0'
985+
986+
# check that in the next run, _run is invoked with resource_version = 0
987+
mock_underscore_run.reset_mock()
988+
try:
989+
self.watcher.run()
990+
except Exception as e:
991+
assert e.args == ('sentinel',)
992+
993+
mock_underscore_run.assert_called_once_with(mock.ANY, '0', mock.ANY, mock.ANY)

0 commit comments

Comments
 (0)