Skip to content

Commit fb72447

Browse files
committed
remove ExecThread and rdd operations happens in main thread.
1 parent 90559c0 commit fb72447

File tree

2 files changed

+11
-34
lines changed

2 files changed

+11
-34
lines changed

python/pyspark/testing/utils.py

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -128,18 +128,3 @@ def search_jar(project_relative_path, sbt_jar_name_prefix, mvn_jar_name_prefix):
128128
raise Exception("Found multiple JARs: %s; please remove all but one" % (", ".join(jars)))
129129
else:
130130
return jars[0]
131-
132-
133-
class ExecThread(threading.Thread):
134-
""" A wrapper thread which stores exception info if any occurred.
135-
"""
136-
def __init__(self, target):
137-
self.target = target
138-
self.exception = None
139-
threading.Thread.__init__(self)
140-
141-
def run(self):
142-
try:
143-
self.target()
144-
except Exception as e: # captures any exceptions
145-
self.exception = e

python/pyspark/tests/test_worker.py

Lines changed: 11 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929

3030
from py4j.protocol import Py4JJavaError
3131

32-
from pyspark.testing.utils import ExecThread, ReusedPySparkTestCase, PySparkTestCase, QuietTest
32+
from pyspark.testing.utils import ReusedPySparkTestCase, PySparkTestCase, QuietTest
3333

3434
if sys.version_info[0] >= 3:
3535
xrange = range
@@ -152,26 +152,18 @@ def test_with_different_versions_of_python(self):
152152
self.sc.pythonVer = version
153153

154154
def test_python_exception_non_hanging(self):
155-
"""
156-
SPARK-21045: exceptions with no ascii encoding shall not hanging PySpark.
157-
"""
158-
def f():
159-
raise Exception("exception with 中 and \xd6\xd0")
155+
# SPARK-21045: exceptions with no ascii encoding shall not hanging PySpark.
156+
try:
157+
def f():
158+
raise Exception("exception with 中 and \xd6\xd0")
160159

161-
def run():
162160
self.sc.parallelize([1]).map(lambda x: f()).count()
163-
164-
t = ExecThread(target=run)
165-
t.daemon = True
166-
t.start()
167-
t.join(10)
168-
self.assertFalse(t.isAlive(), "Spark should not be blocked")
169-
self.assertIsInstance(t.exception, Py4JJavaError)
170-
if sys.version_info.major < 3:
171-
# we have to use unicode here to avoid UnicodeDecodeError
172-
self.assertRegexpMatches(unicode(t.exception).encode("utf-8"), "exception with 中")
173-
else:
174-
self.assertRegexpMatches(str(t.exception), "exception with 中")
161+
except Py4JJavaError as e:
162+
if sys.version_info.major < 3:
163+
# we have to use unicode here to avoid UnicodeDecodeError
164+
self.assertRegexpMatches(unicode(e).encode("utf-8"), "exception with 中")
165+
else:
166+
self.assertRegexpMatches(str(e), "exception with 中")
175167

176168

177169
class WorkerReuseTest(PySparkTestCase):

0 commit comments

Comments
 (0)