File tree Expand file tree Collapse file tree 3 files changed +13
-1
lines changed
main/scala/org/apache/spark/streaming/mqtt
test/scala/org/apache/spark/streaming/mqtt Expand file tree Collapse file tree 3 files changed +13
-1
lines changed Original file line number Diff line number Diff line change @@ -19,7 +19,6 @@ package org.apache.spark.streaming.mqtt
1919
2020import scala .reflect .ClassTag
2121
22- import org .apache .spark .api .java .function .Function
2322import org .apache .spark .storage .StorageLevel
2423import org .apache .spark .streaming .StreamingContext
2524import org .apache .spark .streaming .api .java .{JavaReceiverInputDStream , JavaStreamingContext , JavaDStream }
Original file line number Diff line number Diff line change @@ -28,6 +28,7 @@ import org.eclipse.paho.client.mqttv3._
2828import org .eclipse .paho .client .mqttv3 .persist .MqttDefaultFilePersistence
2929
3030import org .apache .spark .streaming .StreamingContext
31+ import org .apache .spark .streaming .api .java .JavaStreamingContext
3132import org .apache .spark .streaming .scheduler .StreamingListener
3233import org .apache .spark .streaming .scheduler .StreamingListenerReceiverStarted
3334import org .apache .spark .util .Utils
@@ -121,4 +122,15 @@ private class MQTTTestUtils extends Logging {
121122
122123 assert(latch.await(10 , TimeUnit .SECONDS ), " Timeout waiting for receiver to start." )
123124 }
125+
126+ def waitForReceiverToStart (jssc : JavaStreamingContext ) : Unit = {
127+ val latch = new CountDownLatch (1 )
128+ jssc.addStreamingListener(new StreamingListener {
129+ override def onReceiverStarted (receiverStarted : StreamingListenerReceiverStarted ) {
130+ latch.countDown()
131+ }
132+ })
133+
134+ assert(latch.await(10 , TimeUnit .SECONDS ), " Timeout waiting for receiver to start." )
135+ }
124136}
Original file line number Diff line number Diff line change @@ -884,6 +884,7 @@ def test_mqtt_stream(self):
884884 sendData = "MQTT demo for spark streaming"
885885 topic = self ._randomTopic ()
886886 result = self ._startContext (topic )
887+ self ._MQTTTestUtils .waitForReceiverToStart (self .ssc ._jssc )
887888 self ._publishData (topic , sendData )
888889 self .wait_for (result , len (sendData ))
889890 self ._validateStreamResult (sendData , result )
You can’t perform that action at this time.
0 commit comments