|
29 | 29 |
|
30 | 30 | from py4j.protocol import Py4JJavaError |
31 | 31 |
|
32 | | -from pyspark.testing.utils import ExecThread, ReusedPySparkTestCase, PySparkTestCase, QuietTest |
| 32 | +from pyspark.testing.utils import ReusedPySparkTestCase, PySparkTestCase, QuietTest |
33 | 33 |
|
34 | 34 | if sys.version_info[0] >= 3: |
35 | 35 | xrange = range |
@@ -152,26 +152,18 @@ def test_with_different_versions_of_python(self): |
152 | 152 | self.sc.pythonVer = version |
153 | 153 |
|
154 | 154 | def test_python_exception_non_hanging(self): |
155 | | - """ |
156 | | - SPARK-21045: exceptions with no ascii encoding shall not hanging PySpark. |
157 | | - """ |
158 | | - def f(): |
159 | | - raise Exception("exception with 中 and \xd6\xd0") |
| 155 | + # SPARK-21045: exceptions with no ascii encoding shall not hanging PySpark. |
| 156 | + try: |
| 157 | + def f(): |
| 158 | + raise Exception("exception with 中 and \xd6\xd0") |
160 | 159 |
|
161 | | - def run(): |
162 | 160 | self.sc.parallelize([1]).map(lambda x: f()).count() |
163 | | - |
164 | | - t = ExecThread(target=run) |
165 | | - t.daemon = True |
166 | | - t.start() |
167 | | - t.join(10) |
168 | | - self.assertFalse(t.isAlive(), "Spark should not be blocked") |
169 | | - self.assertIsInstance(t.exception, Py4JJavaError) |
170 | | - if sys.version_info.major < 3: |
171 | | - # we have to use unicode here to avoid UnicodeDecodeError |
172 | | - self.assertRegexpMatches(unicode(t.exception).encode("utf-8"), "exception with 中") |
173 | | - else: |
174 | | - self.assertRegexpMatches(str(t.exception), "exception with 中") |
| 161 | + except Py4JJavaError as e: |
| 162 | + if sys.version_info.major < 3: |
| 163 | + # we have to use unicode here to avoid UnicodeDecodeError |
| 164 | + self.assertRegexpMatches(unicode(e).encode("utf-8"), "exception with 中") |
| 165 | + else: |
| 166 | + self.assertRegexpMatches(str(e), "exception with 中") |
175 | 167 |
|
176 | 168 |
|
177 | 169 | class WorkerReuseTest(PySparkTestCase): |
|
0 commit comments