File tree Expand file tree Collapse file tree 1 file changed +3
-2
lines changed
core/src/test/scala/org/apache/spark/scheduler Expand file tree Collapse file tree 1 file changed +3
-2
lines changed Original file line number Diff line number Diff line change @@ -50,9 +50,9 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
5050 (1 to 5 ).foreach { _ => bus.post(SparkListenerJobEnd (0 , JobSucceeded )) }
5151 assert(counter.count === 0 )
5252
53- // Starting listener bus should flush all buffered events (asynchronously, hence the sleep)
53+ // Starting listener bus should flush all buffered events
5454 bus.start()
55- Thread .sleep( 1000 )
55+ assert(bus.waitUntilEmpty( WAIT_TIMEOUT_MILLIS ) )
5656 assert(counter.count === 5 )
5757
5858 // After listener bus has stopped, posting events should not increment counter
@@ -177,6 +177,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
177177 listener.stageInfos.clear()
178178
179179 rdd3.count()
180+ assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS ))
180181 listener.stageInfos.size should be {2 } // Shuffle map stage + result stage
181182 val stageInfo3 = listener.stageInfos.keys.find(_.stageId == 2 ).get
182183 stageInfo3.rddInfos.size should be {2 } // ShuffledRDD, MapPartitionsRDD
You can’t perform that action at this time.
0 commit comments