1515# specific language governing permissions and limitations
1616# under the License.
1717import logging
18+ import time
19+ from typing import Generator
1820from unittest import mock
1921from unittest .mock import MagicMock
2022
@@ -312,7 +314,7 @@ def test_fetch_container_since_time(self, container_running, mock_now):
312314 args , kwargs = self .mock_kube_client .read_namespaced_pod_log .call_args_list [0 ]
313315 assert kwargs ['since_seconds' ] == 5
314316
315- @pytest .mark .parametrize ('follow, is_running_calls, exp_running' , [(True , 3 , False ), (False , 1 , True )])
317+ @pytest .mark .parametrize ('follow, is_running_calls, exp_running' , [(True , 4 , False ), (False , 1 , True )])
316318 @mock .patch ('airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_running' )
317319 def test_fetch_container_running_follow (
318320 self , container_running_mock , follow , is_running_calls , exp_running
@@ -322,13 +324,35 @@ def test_fetch_container_running_follow(
322324 When called with follow=False, should return immediately even though still running.
323325 """
324326 mock_pod = MagicMock ()
325- container_running_mock .side_effect = [True , True , False ] # only will be called once
327+ container_running_mock .side_effect = [True , False , False , False ] # called once when follow=False
326328 self .mock_kube_client .read_namespaced_pod_log .return_value = [b'2021-01-01 hi' ]
327329 ret = self .pod_manager .fetch_container_logs (pod = mock_pod , container_name = 'base' , follow = follow )
328330 assert len (container_running_mock .call_args_list ) == is_running_calls
329331 assert ret .last_log_time == DateTime (2021 , 1 , 1 , tzinfo = Timezone ('UTC' ))
330332 assert ret .running is exp_running
331333
334+ @pytest .mark .parametrize ('follow, is_running_calls, exp_running' , [(True , 3 , False )])
335+ @mock .patch ('airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_running' )
336+ def test_fetch_container_running_follow_when_kube_api_hangs (
337+ self , container_running_mock , follow , is_running_calls , exp_running
338+ ):
339+ """
340+ When called with follow, should keep looping even after disconnections, if pod still running.
341+ """
342+ mock_pod = MagicMock ()
343+ container_running_mock .side_effect = [True , False , False ]
344+
345+ def stream_logs () -> Generator :
346+ while True :
347+ time .sleep (1 ) # this is intentional: urllib3.response.stream() is not async
348+ yield b'2021-01-01 hi'
349+
350+ self .mock_kube_client .read_namespaced_pod_log .return_value = stream_logs ()
351+ ret = self .pod_manager .fetch_container_logs (pod = mock_pod , container_name = 'base' , follow = follow )
352+ assert len (container_running_mock .call_args_list ) == is_running_calls
353+ assert ret .running is exp_running
354+ assert ret .last_log_time is None
355+
332356
333357def params_for_test_container_is_running ():
334358 """The `container_is_running` method is designed to handle an assortment of bad objects
0 commit comments