Skip to content

Commit ee342b8

Browse files
authored
Fix k8s pod.execute randomly stuck indefinitely by logs consumption (#23497) (#23618)
1 parent e16eca2 commit ee342b8

File tree

2 files changed

+68
-7
lines changed

2 files changed

+68
-7
lines changed

airflow/providers/cncf/kubernetes/utils/pod_manager.py

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717
"""Launches PODs"""
18+
import asyncio
19+
import concurrent
1820
import json
1921
import math
2022
import time
@@ -193,6 +195,40 @@ def follow_container_logs(self, pod: V1Pod, container_name: str) -> PodLoggingSt
193195
)
194196
return self.fetch_container_logs(pod=pod, container_name=container_name, follow=True)
195197

198+
def log_iterable(self, logs: Iterable[bytes]) -> Optional[DateTime]:
199+
timestamp = None
200+
for line in logs:
201+
timestamp, message = self.parse_log_line(line.decode('utf-8', errors="backslashreplace"))
202+
self.log.info(message)
203+
return timestamp
204+
205+
def consume_container_logs_stream(
206+
self, pod: V1Pod, container_name: str, stream: Iterable[bytes]
207+
) -> Optional[DateTime]:
208+
async def async_await_container_completion() -> None:
209+
await asyncio.sleep(1)
210+
while self.container_is_running(pod=pod, container_name=container_name):
211+
await asyncio.sleep(1)
212+
213+
loop = asyncio.get_event_loop()
214+
await_container_completion = loop.create_task(async_await_container_completion())
215+
log_stream = asyncio.ensure_future(loop.run_in_executor(None, self.log_iterable, stream))
216+
tasks: Iterable[asyncio.Task] = {await_container_completion, log_stream}
217+
loop.run_until_complete(asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED))
218+
if log_stream.done():
219+
return log_stream.result()
220+
221+
log_stream.cancel()
222+
try:
223+
loop.run_until_complete(log_stream)
224+
except concurrent.futures.CancelledError:
225+
self.log.warning(
226+
"Container %s log read was interrupted at some point caused by log rotation "
227+
"see https://github.com/apache/airflow/issues/23497 for reference.",
228+
container_name,
229+
)
230+
return None
231+
196232
def fetch_container_logs(
197233
self, pod: V1Pod, container_name: str, *, follow=False, since_time: Optional[DateTime] = None
198234
) -> PodLoggingStatus:
@@ -220,10 +256,11 @@ def consume_logs(*, since_time: Optional[DateTime] = None, follow: bool = True)
220256
),
221257
follow=follow,
222258
)
223-
for raw_line in logs:
224-
line = raw_line.decode('utf-8', errors="backslashreplace")
225-
timestamp, message = self.parse_log_line(line)
226-
self.log.info(message)
259+
if follow:
260+
timestamp = self.consume_container_logs_stream(pod, container_name, logs)
261+
else:
262+
timestamp = self.log_iterable(logs)
263+
227264
except BaseHTTPError as e:
228265
self.log.warning(
229266
"Reading of logs interrupted with error %r; will retry. "
@@ -256,7 +293,7 @@ def consume_logs(*, since_time: Optional[DateTime] = None, follow: bool = True)
256293
time.sleep(1)
257294

258295
def await_container_completion(self, pod: V1Pod, container_name: str) -> None:
259-
while not self.container_is_running(pod=pod, container_name=container_name):
296+
while self.container_is_running(pod=pod, container_name=container_name):
260297
time.sleep(1)
261298

262299
def await_pod_completion(self, pod: V1Pod) -> V1Pod:

tests/providers/cncf/kubernetes/utils/test_pod_manager.py

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717
import logging
18+
import time
19+
from typing import Generator
1820
from unittest import mock
1921
from unittest.mock import MagicMock
2022

@@ -312,7 +314,7 @@ def test_fetch_container_since_time(self, container_running, mock_now):
312314
args, kwargs = self.mock_kube_client.read_namespaced_pod_log.call_args_list[0]
313315
assert kwargs['since_seconds'] == 5
314316

315-
@pytest.mark.parametrize('follow, is_running_calls, exp_running', [(True, 3, False), (False, 1, True)])
317+
@pytest.mark.parametrize('follow, is_running_calls, exp_running', [(True, 4, False), (False, 1, True)])
316318
@mock.patch('airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_running')
317319
def test_fetch_container_running_follow(
318320
self, container_running_mock, follow, is_running_calls, exp_running
@@ -322,13 +324,35 @@ def test_fetch_container_running_follow(
322324
When called with follow=False, should return immediately even though still running.
323325
"""
324326
mock_pod = MagicMock()
325-
container_running_mock.side_effect = [True, True, False] # only will be called once
327+
container_running_mock.side_effect = [True, False, False, False] # called once when follow=False
326328
self.mock_kube_client.read_namespaced_pod_log.return_value = [b'2021-01-01 hi']
327329
ret = self.pod_manager.fetch_container_logs(pod=mock_pod, container_name='base', follow=follow)
328330
assert len(container_running_mock.call_args_list) == is_running_calls
329331
assert ret.last_log_time == DateTime(2021, 1, 1, tzinfo=Timezone('UTC'))
330332
assert ret.running is exp_running
331333

334+
@pytest.mark.parametrize('follow, is_running_calls, exp_running', [(True, 3, False)])
335+
@mock.patch('airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_running')
336+
def test_fetch_container_running_follow_when_kube_api_hangs(
337+
self, container_running_mock, follow, is_running_calls, exp_running
338+
):
339+
"""
340+
When called with follow, should keep looping even after disconnections, if pod still running.
341+
"""
342+
mock_pod = MagicMock()
343+
container_running_mock.side_effect = [True, False, False]
344+
345+
def stream_logs() -> Generator:
346+
while True:
347+
time.sleep(1) # this is intentional: urllib3.response.stream() is not async
348+
yield b'2021-01-01 hi'
349+
350+
self.mock_kube_client.read_namespaced_pod_log.return_value = stream_logs()
351+
ret = self.pod_manager.fetch_container_logs(pod=mock_pod, container_name='base', follow=follow)
352+
assert len(container_running_mock.call_args_list) == is_running_calls
353+
assert ret.running is exp_running
354+
assert ret.last_log_time is None
355+
332356

333357
def params_for_test_container_is_running():
334358
"""The `container_is_running` method is designed to handle an assortment of bad objects

0 commit comments

Comments
 (0)