2525from pyspark .sql .connect .client import SparkConnectClient , ChannelBuilder
2626import pyspark .sql .connect .proto as proto
2727from pyspark .testing .connectutils import should_test_connect , connect_requirement_message
28- from pyspark .testing .utils import eventually
2928
3029from pyspark .sql .connect .client .core import Retrying
3130from pyspark .sql .connect .client .reattach import (
@@ -153,6 +152,20 @@ def _stub_with(self, execute=None, attach=None):
153152 attach_ops = ResponseGenerator (attach ) if attach is not None else None ,
154153 )
155154
155+ def assertEventually (self , callable , timeout_ms = 1000 ):
156+ """Helper method that will continuously evaluate the callable to not raise an
157+ exception."""
158+ import time
159+
160+ limit = time .monotonic_ns () + timeout_ms * 1000 * 1000
161+ while time .monotonic_ns () < limit :
162+ try :
163+ callable ()
164+ break
165+ except Exception :
166+ time .sleep (0.1 )
167+ callable ()
168+
156169 def test_basic_flow (self ):
157170 stub = self ._stub_with ([self .response , self .finished ])
158171 ite = ExecutePlanResponseReattachableIterator (self .request , stub , self .policy , [])
@@ -165,7 +178,7 @@ def check_all():
165178 self .assertEqual (1 , stub .release_calls )
166179 self .assertEqual (1 , stub .execute_calls )
167180
168- eventually ( timeout = 1 , catch_assertions = True )( check_all )( )
181+ self . assertEventually ( check_all , timeout_ms = 1000 )
169182
170183 def test_fail_during_execute (self ):
171184 def fatal ():
@@ -183,7 +196,7 @@ def check():
183196 self .assertEqual (1 , stub .release_until_calls )
184197 self .assertEqual (1 , stub .execute_calls )
185198
186- eventually ( timeout = 1 , catch_assertions = True )( check )( )
199+ self . assertEventually ( check , timeout_ms = 1000 )
187200
188201 def test_fail_and_retry_during_execute (self ):
189202 def non_fatal ():
@@ -202,7 +215,7 @@ def check():
202215 self .assertEqual (3 , stub .release_until_calls )
203216 self .assertEqual (1 , stub .execute_calls )
204217
205- eventually ( timeout = 1 , catch_assertions = True )( check )( )
218+ self . assertEventually ( check , timeout_ms = 1000 )
206219
207220 def test_fail_and_retry_during_reattach (self ):
208221 count = 0
@@ -228,7 +241,7 @@ def check():
228241 self .assertEqual (1 , stub .release_calls )
229242 self .assertEqual (1 , stub .execute_calls )
230243
231- eventually ( timeout = 1 , catch_assertions = True )( check )( )
244+ self . assertEventually ( check , timeout_ms = 1000 )
232245
233246
234247class TestException (grpc .RpcError , grpc .Call ):
0 commit comments