Skip to content

Commit 522af69

Browse files
committed
Revert "[SPARK-45167][CONNECT][PYTHON][FOLLOW-UP] Use lighter threading Rlock, and use the existing eventually util function"
This reverts commit 2a9dd2b.
1 parent c5203ab commit 522af69

File tree

2 files changed

+21
-7
lines changed

2 files changed

+21
-7
lines changed

python/pyspark/sql/connect/client/reattach.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,12 @@
1818

1919
check_dependencies(__name__)
2020

21-
from threading import RLock
2221
import warnings
2322
import uuid
2423
from collections.abc import Generator
2524
from typing import Optional, Dict, Any, Iterator, Iterable, Tuple, Callable, cast, Type, ClassVar
25+
from multiprocessing import RLock
26+
from multiprocessing.synchronize import RLock as RLockBase
2627
from multiprocessing.pool import ThreadPool
2728
import os
2829

@@ -55,7 +56,7 @@ class ExecutePlanResponseReattachableIterator(Generator):
5556
"""
5657

5758
# Lock to manage the pool
58-
_lock: ClassVar[RLock] = RLock()
59+
_lock: ClassVar[RLockBase] = RLock()
5960
_release_thread_pool: Optional[ThreadPool] = ThreadPool(os.cpu_count() if os.cpu_count() else 8)
6061

6162
@classmethod

python/pyspark/sql/tests/connect/client/test_client.py

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
from pyspark.sql.connect.client import SparkConnectClient, ChannelBuilder
2626
import pyspark.sql.connect.proto as proto
2727
from pyspark.testing.connectutils import should_test_connect, connect_requirement_message
28-
from pyspark.testing.utils import eventually
2928

3029
from pyspark.sql.connect.client.core import Retrying
3130
from pyspark.sql.connect.client.reattach import (
@@ -153,6 +152,20 @@ def _stub_with(self, execute=None, attach=None):
153152
attach_ops=ResponseGenerator(attach) if attach is not None else None,
154153
)
155154

155+
def assertEventually(self, callable, timeout_ms=1000):
156+
"""Helper method that will continuously evaluate the callable to not raise an
157+
exception."""
158+
import time
159+
160+
limit = time.monotonic_ns() + timeout_ms * 1000 * 1000
161+
while time.monotonic_ns() < limit:
162+
try:
163+
callable()
164+
break
165+
except Exception:
166+
time.sleep(0.1)
167+
callable()
168+
156169
def test_basic_flow(self):
157170
stub = self._stub_with([self.response, self.finished])
158171
ite = ExecutePlanResponseReattachableIterator(self.request, stub, self.policy, [])
@@ -165,7 +178,7 @@ def check_all():
165178
self.assertEqual(1, stub.release_calls)
166179
self.assertEqual(1, stub.execute_calls)
167180

168-
eventually(timeout=1, catch_assertions=True)(check_all)()
181+
self.assertEventually(check_all, timeout_ms=1000)
169182

170183
def test_fail_during_execute(self):
171184
def fatal():
@@ -183,7 +196,7 @@ def check():
183196
self.assertEqual(1, stub.release_until_calls)
184197
self.assertEqual(1, stub.execute_calls)
185198

186-
eventually(timeout=1, catch_assertions=True)(check)()
199+
self.assertEventually(check, timeout_ms=1000)
187200

188201
def test_fail_and_retry_during_execute(self):
189202
def non_fatal():
@@ -202,7 +215,7 @@ def check():
202215
self.assertEqual(3, stub.release_until_calls)
203216
self.assertEqual(1, stub.execute_calls)
204217

205-
eventually(timeout=1, catch_assertions=True)(check)()
218+
self.assertEventually(check, timeout_ms=1000)
206219

207220
def test_fail_and_retry_during_reattach(self):
208221
count = 0
@@ -228,7 +241,7 @@ def check():
228241
self.assertEqual(1, stub.release_calls)
229242
self.assertEqual(1, stub.execute_calls)
230243

231-
eventually(timeout=1, catch_assertions=True)(check)()
244+
self.assertEventually(check, timeout_ms=1000)
232245

233246

234247
class TestException(grpc.RpcError, grpc.Call):

0 commit comments

Comments
 (0)