@@ -38,7 +38,7 @@ import org.apache.spark.streaming.receiver.Receiver
3838/**
3939 * Input stream that pulls messages from a Kafka Broker.
4040 *
41- * @param kafkaParams Map of kafka configuration paramaters .
41+ * @param kafkaParams Map of kafka configuration parameters .
4242 * See: http://kafka.apache.org/configuration.html
4343 * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
4444 * in its own thread.
@@ -76,29 +76,31 @@ class KafkaReceiver[
7676 // Connection to Kafka
7777 var consumerConnector : ConsumerConnector = null
7878
79- def onStop () { }
79+ def onStop () {
80+ if (consumerConnector != null ) {
81+ consumerConnector.shutdown()
82+ }
83+ }
8084
8185 def onStart () {
8286
83- // In case we are using multiple Threads to handle Kafka Messages
84- val executorPool = Executors .newFixedThreadPool(topics.values.reduce(_ + _))
85-
8687 logInfo(" Starting Kafka Consumer Stream with group: " + kafkaParams(" group.id" ))
8788
8889 // Kafka connection properties
8990 val props = new Properties ()
9091 kafkaParams.foreach(param => props.put(param._1, param._2))
9192
93+ val zkConnect = kafkaParams(" zookeeper.connect" )
9294 // Create the connection to the cluster
93- logInfo(" Connecting to Zookeper : " + kafkaParams( " zookeeper.connect " ) )
95+ logInfo(" Connecting to Zookeeper : " + zkConnect )
9496 val consumerConfig = new ConsumerConfig (props)
9597 consumerConnector = Consumer .create(consumerConfig)
96- logInfo(" Connected to " + kafkaParams( " zookeeper.connect " ) )
98+ logInfo(" Connected to " + zkConnect )
9799
98- // When autooffset .reset is defined, it is our responsibility to try and whack the
100+ // When auto.offset .reset is defined, it is our responsibility to try and whack the
99101 // consumer group zk node.
100102 if (kafkaParams.contains(" auto.offset.reset" )) {
101- tryZookeeperConsumerGroupCleanup(kafkaParams( " zookeeper.connect " ) , kafkaParams(" group.id" ))
103+ tryZookeeperConsumerGroupCleanup(zkConnect , kafkaParams(" group.id" ))
102104 }
103105
104106 val keyDecoder = manifest[U ].runtimeClass.getConstructor(classOf [VerifiableProperties ])
@@ -112,42 +114,51 @@ class KafkaReceiver[
112114 val topicMessageStreams = consumerConnector.createMessageStreams(
113115 topics, keyDecoder, valueDecoder)
114116
115-
116- // Start the messages handler for each partition
117- topicMessageStreams.values.foreach { streams =>
118- streams.foreach { stream => executorPool.submit(new MessageHandler (stream)) }
117+ val executorPool = Executors .newFixedThreadPool(topics.values.sum)
118+ try {
119+ // Start the messages handler for each partition
120+ topicMessageStreams.values.foreach { streams =>
121+ streams.foreach { stream => executorPool.submit(new MessageHandler (stream)) }
122+ }
123+ } finally {
124+ executorPool.shutdown() // Just causes threads to terminate after work is done
119125 }
120126 }
121127
122128 // Handles Kafka Messages
123- private class MessageHandler [ K : ClassTag , V : ClassTag ] (stream : KafkaStream [K , V ])
129+ private class MessageHandler (stream : KafkaStream [K , V ])
124130 extends Runnable {
125131 def run () {
126132 logInfo(" Starting MessageHandler." )
127- for (msgAndMetadata <- stream) {
128- store((msgAndMetadata.key, msgAndMetadata.message))
133+ try {
134+ for (msgAndMetadata <- stream) {
135+ store((msgAndMetadata.key, msgAndMetadata.message))
136+ }
137+ } catch {
138+ case e : Throwable => logError(" Error handling message; exiting" , e)
129139 }
130140 }
131141 }
132142
133- // It is our responsibility to delete the consumer group when specifying autooffset .reset. This
143+ // It is our responsibility to delete the consumer group when specifying auto.offset .reset. This
134144 // is because Kafka 0.7.2 only honors this param when the group is not in zookeeper.
135145 //
136146 // The kafka high level consumer doesn't expose setting offsets currently, this is a trick copied
137- // from Kafkas' ConsoleConsumer. See code related to 'autooffset .reset' when it is set to
147+ // from Kafka's ConsoleConsumer. See code related to 'auto.offset .reset' when it is set to
138148 // 'smallest'/'largest':
139149 // scalastyle:off
140150 // https://github.com/apache/kafka/blob/0.7.2/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
141151 // scalastyle:on
142152 private def tryZookeeperConsumerGroupCleanup (zkUrl : String , groupId : String ) {
153+ val dir = " /consumers/" + groupId
154+ logInfo(" Cleaning up temporary Zookeeper data under " + dir + " ." )
155+ val zk = new ZkClient (zkUrl, 30 * 1000 , 30 * 1000 , ZKStringSerializer )
143156 try {
144- val dir = " /consumers/" + groupId
145- logInfo(" Cleaning up temporary zookeeper data under " + dir + " ." )
146- val zk = new ZkClient (zkUrl, 30 * 1000 , 30 * 1000 , ZKStringSerializer )
147157 zk.deleteRecursive(dir)
148- zk.close()
149158 } catch {
150- case _ : Throwable => // swallow
159+ case e : Throwable => logWarning(" Error cleaning up temporary Zookeeper data" , e)
160+ } finally {
161+ zk.close()
151162 }
152163 }
153164}
0 commit comments