Skip to content

Commit b4b6100

Browse files
committed
Add/replace missing waitUntilEmpty() calls to listener bus
1 parent dd1b7a6 commit b4b6100

File tree

1 file changed

+3
-2
lines changed

1 file changed

+3
-2
lines changed

core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,9 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
5050
(1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, JobSucceeded)) }
5151
assert(counter.count === 0)
5252

53-
// Starting listener bus should flush all buffered events (asynchronously, hence the sleep)
53+
// Starting listener bus should flush all buffered events
5454
bus.start()
55-
Thread.sleep(1000)
55+
assert(bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
5656
assert(counter.count === 5)
5757

5858
// After listener bus has stopped, posting events should not increment counter
@@ -177,6 +177,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
177177
listener.stageInfos.clear()
178178

179179
rdd3.count()
180+
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
180181
listener.stageInfos.size should be {2} // Shuffle map stage + result stage
181182
val stageInfo3 = listener.stageInfos.keys.find(_.stageId == 2).get
182183
stageInfo3.rddInfos.size should be {2} // ShuffledRDD, MapPartitionsRDD

0 commit comments

Comments
 (0)