@@ -35,28 +35,28 @@ class MQTTStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter
3535 private val topic = " def"
3636
3737 private var ssc : StreamingContext = _
38- private var MQTTTestUtils : MQTTTestUtils = _
38+ private var mqttTestUtils : MQTTTestUtils = _
3939
4040 before {
4141 ssc = new StreamingContext (master, framework, batchDuration)
42- MQTTTestUtils = new MQTTTestUtils
43- MQTTTestUtils .setup()
42+ mqttTestUtils = new MQTTTestUtils
43+ mqttTestUtils .setup()
4444 }
4545
4646 after {
4747 if (ssc != null ) {
4848 ssc.stop()
4949 ssc = null
5050 }
51- if (MQTTTestUtils != null ) {
52- MQTTTestUtils .teardown()
53- MQTTTestUtils = null
51+ if (mqttTestUtils != null ) {
52+ mqttTestUtils .teardown()
53+ mqttTestUtils = null
5454 }
5555 }
5656
5757 test(" mqtt input stream" ) {
5858 val sendMessage = " MQTT demo for spark streaming"
59- val receiveStream = MQTTUtils .createStream(ssc, " tcp://" + MQTTTestUtils .brokerUri, topic,
59+ val receiveStream = MQTTUtils .createStream(ssc, " tcp://" + mqttTestUtils .brokerUri, topic,
6060 StorageLevel .MEMORY_ONLY )
6161
6262 @ volatile var receiveMessage : List [String ] = List ()
@@ -71,7 +71,7 @@ class MQTTStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter
7171
7272 // Retry it because we don't know when the receiver will start.
7373 eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
74- MQTTTestUtils .publishData(topic, sendMessage)
74+ mqttTestUtils .publishData(topic, sendMessage)
7575 assert(sendMessage.equals(receiveMessage(0 )))
7676 }
7777 ssc.stop()
0 commit comments