@@ -27,26 +27,32 @@ import kafka.admin.CreateTopicCommand
2727import kafka .common .TopicAndPartition
2828import kafka .producer .{KeyedMessage , ProducerConfig , Producer }
2929import kafka .utils .ZKStringSerializer
30- import kafka .serializer .StringEncoder
30+ import kafka .serializer .{ StringDecoder , StringEncoder }
3131import kafka .server .{KafkaConfig , KafkaServer }
3232
33- import org .apache .spark .streaming .{StreamingContext , TestSuiteBase }
33+ import org .I0Itec .zkclient .ZkClient
34+
3435import org .apache .zookeeper .server .ZooKeeperServer
3536import org .apache .zookeeper .server .NIOServerCnxnFactory
3637
37- import org .I0Itec .zkclient .ZkClient
38+ import org .apache .spark .streaming .{StreamingContext , TestSuiteBase }
39+ import org .apache .spark .storage .StorageLevel
3840
3941class KafkaStreamSuite extends TestSuiteBase {
42+ import KafkaStreamSuite ._
43+
4044 val zkConnect = " localhost:2181"
41- var zookeeper : EmbeddedZookeeper = _
42- var zkClient : ZkClient = _
4345 val zkConnectionTimeout = 6000
4446 val zkSessionTimeout = 6000
4547
4648 val brokerPort = 9092
4749 val brokerProps = getBrokerConfig(brokerPort)
4850 val brokerConf = new KafkaConfig (brokerProps)
49- var server : KafkaServer = _
51+
52+ protected var zookeeper : EmbeddedZookeeper = _
53+ protected var zkClient : ZkClient = _
54+ protected var server : KafkaServer = _
55+ protected var producer : Producer [String , String ] = _
5056
5157 override def useManualClock = false
5258
@@ -68,21 +74,32 @@ class KafkaStreamSuite extends TestSuiteBase {
6874 }
6975
7076 override def afterFunction () {
77+ producer.close()
7178 server.shutdown()
72- brokerConf.logDirs.foreach { f => KafkaStreamSuite . deleteDir(new File (f)) }
79+ brokerConf.logDirs.foreach { f => deleteDir(new File (f)) }
7380
7481 zkClient.close()
7582 zookeeper.shutdown()
7683
7784 super .afterFunction()
7885 }
7986
80- test(" kafka input stream" ) {
87+ test(" Kafka input stream" ) {
8188 val ssc = new StreamingContext (master, framework, batchDuration)
8289 val topic = " topic1"
8390 val sent = Map (" a" -> 5 , " b" -> 3 , " c" -> 10 )
84-
85- val stream = KafkaUtils .createStream(ssc, zkConnect, " group" , Map (topic -> 1 ))
91+ createTopic(topic)
92+ produceAndSendMessage(topic, sent)
93+
94+ val kafkaParams = Map (" zookeeper.connect" -> zkConnect,
95+ " group.id" -> s " test-consumer- ${random.nextInt(10000 )}" ,
96+ " auto.offset.reset" -> " smallest" )
97+
98+ val stream = KafkaUtils .createStream[String , String , StringDecoder , StringDecoder ](
99+ ssc,
100+ kafkaParams,
101+ Map (topic -> 1 ),
102+ StorageLevel .MEMORY_ONLY )
86103 val result = new mutable.HashMap [String , Long ]()
87104 stream.map { case (k, v) => v }
88105 .countByValue()
@@ -94,8 +111,7 @@ class KafkaStreamSuite extends TestSuiteBase {
94111 }
95112 }
96113 ssc.start()
97- produceAndSendTestMessage(topic, sent)
98- ssc.awaitTermination(10000 )
114+ ssc.awaitTermination(3000 )
99115
100116 assert(sent.size === result.size)
101117 sent.keys.foreach { k => assert(sent(k) === result(k).toInt) }
@@ -108,7 +124,7 @@ class KafkaStreamSuite extends TestSuiteBase {
108124 props.put(" broker.id" , " 0" )
109125 props.put(" host.name" , " localhost" )
110126 props.put(" port" , port.toString)
111- props.put(" log.dir" , KafkaStreamSuite .tmpDir ().getAbsolutePath)
127+ props.put(" log.dir" , createTmpDir ().getAbsolutePath)
112128 props.put(" zookeeper.connect" , zkConnect)
113129 props.put(" log.flush.interval.messages" , " 1" )
114130 props.put(" replica.socket.timeout.ms" , " 1500" )
@@ -130,28 +146,27 @@ class KafkaStreamSuite extends TestSuiteBase {
130146 messages.toSeq
131147 }
132148
133- def produceAndSendTestMessage (topic : String , sent : Map [String , Int ]) {
134- val brokerAddr = brokerConf.hostName + " :" + brokerConf.port
135- val producer = new Producer [String , String ](new ProducerConfig (getProducerConfig(brokerAddr)))
149+ def createTopic (topic : String ) {
136150 CreateTopicCommand .createTopic(zkClient, topic, 1 , 1 , " 0" )
137151 logInfo(" ==================== 5 ====================" )
138152 // wait until metadata is propagated
139- Thread .sleep(1000 )
140- assert(server.apis.leaderCache.keySet.contains(TopicAndPartition (topic, 0 )))
141- producer.send(createTestMessage(topic, sent): _* )
142- Thread .sleep(1000 )
153+ waitUntilMetadataIsPropagated(Seq (server), topic, 0 , 1000 )
154+ }
143155
156+ def produceAndSendMessage (topic : String , sent : Map [String , Int ]) {
157+ val brokerAddr = brokerConf.hostName + " :" + brokerConf.port
158+ producer = new Producer [String , String ](new ProducerConfig (getProducerConfig(brokerAddr)))
159+ producer.send(createTestMessage(topic, sent): _* )
144160 logInfo(" ==================== 6 ====================" )
145- producer.close()
146161 }
147162}
148163
149164object KafkaStreamSuite {
150165 val random = new Random ()
151166
152- def tmpDir (): File = {
167+ def createTmpDir (): File = {
153168 val tmp = System .getProperty(" java.io.tmpdir" )
154- val f = new File (tmp, " spark-kafka-" + random.nextInt(1000 ))
169+ val f = new File (tmp, " spark-kafka-" + random.nextInt(10000 ))
155170 f.mkdirs()
156171 f
157172 }
@@ -166,12 +181,33 @@ object KafkaStreamSuite {
166181 file.delete()
167182 }
168183 }
184+
185+ def waitUntilTrue (condition : () => Boolean , waitTime : Long ): Boolean = {
186+ val startTime = System .currentTimeMillis()
187+ while (true ) {
188+ if (condition())
189+ return true
190+ if (System .currentTimeMillis() > startTime + waitTime)
191+ return false
192+ Thread .sleep(waitTime.min(100L ))
193+ }
194+ // Should never go to here
195+ throw new RuntimeException (" unexpected error" )
196+ }
197+
198+ def waitUntilMetadataIsPropagated (servers : Seq [KafkaServer ], topic : String , partition : Int ,
199+ timeout : Long ) {
200+ assert(waitUntilTrue(() =>
201+ servers.foldLeft(true )(_ && _.apis.leaderCache.keySet.contains(
202+ TopicAndPartition (topic, partition))), timeout),
203+ s " Partition [ $topic, $partition] metadata not propagated after timeout " )
204+ }
169205}
170206
171207class EmbeddedZookeeper (val zkConnect : String ) {
172208 val random = new Random ()
173- val snapshotDir = KafkaStreamSuite .tmpDir ()
174- val logDir = KafkaStreamSuite .tmpDir ()
209+ val snapshotDir = KafkaStreamSuite .createTmpDir ()
210+ val logDir = KafkaStreamSuite .createTmpDir ()
175211
176212 val zookeeper = new ZooKeeperServer (snapshotDir, logDir, 500 )
177213 val (ip, port) = {
0 commit comments