Skip to content

Commit 2d579a8

Browse files
committed
Close ConsumerConnector in onStop; shutdown() the local Executor that is created so that its threads stop when done; close the Zookeeper client even on exception; fix a few typos; log exceptions that otherwise vanish
1 parent b77c19b commit 2d579a8

File tree

1 file changed

+34
-23
lines changed

1 file changed

+34
-23
lines changed

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala

Lines changed: 34 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ import org.apache.spark.streaming.receiver.Receiver
3838
/**
3939
* Input stream that pulls messages from a Kafka Broker.
4040
*
41-
* @param kafkaParams Map of kafka configuration paramaters.
41+
* @param kafkaParams Map of kafka configuration parameters.
4242
* See: http://kafka.apache.org/configuration.html
4343
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
4444
* in its own thread.
@@ -76,29 +76,31 @@ class KafkaReceiver[
7676
// Connection to Kafka
7777
var consumerConnector : ConsumerConnector = null
7878

79-
def onStop() { }
79+
def onStop() {
80+
if (consumerConnector != null) {
81+
consumerConnector.shutdown()
82+
}
83+
}
8084

8185
def onStart() {
8286

83-
// In case we are using multiple Threads to handle Kafka Messages
84-
val executorPool = Executors.newFixedThreadPool(topics.values.reduce(_ + _))
85-
8687
logInfo("Starting Kafka Consumer Stream with group: " + kafkaParams("group.id"))
8788

8889
// Kafka connection properties
8990
val props = new Properties()
9091
kafkaParams.foreach(param => props.put(param._1, param._2))
9192

93+
val zkConnect = kafkaParams("zookeeper.connect")
9294
// Create the connection to the cluster
93-
logInfo("Connecting to Zookeper: " + kafkaParams("zookeeper.connect"))
95+
logInfo("Connecting to Zookeeper: " + zkConnect)
9496
val consumerConfig = new ConsumerConfig(props)
9597
consumerConnector = Consumer.create(consumerConfig)
96-
logInfo("Connected to " + kafkaParams("zookeeper.connect"))
98+
logInfo("Connected to " + zkConnect)
9799

98-
// When autooffset.reset is defined, it is our responsibility to try and whack the
100+
// When auto.offset.reset is defined, it is our responsibility to try and whack the
99101
// consumer group zk node.
100102
if (kafkaParams.contains("auto.offset.reset")) {
101-
tryZookeeperConsumerGroupCleanup(kafkaParams("zookeeper.connect"), kafkaParams("group.id"))
103+
tryZookeeperConsumerGroupCleanup(zkConnect, kafkaParams("group.id"))
102104
}
103105

104106
val keyDecoder = manifest[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
@@ -112,42 +114,51 @@ class KafkaReceiver[
112114
val topicMessageStreams = consumerConnector.createMessageStreams(
113115
topics, keyDecoder, valueDecoder)
114116

115-
116-
// Start the messages handler for each partition
117-
topicMessageStreams.values.foreach { streams =>
118-
streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) }
117+
val executorPool = Executors.newFixedThreadPool(topics.values.sum)
118+
try {
119+
// Start the messages handler for each partition
120+
topicMessageStreams.values.foreach { streams =>
121+
streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) }
122+
}
123+
} finally {
124+
executorPool.shutdown() // Just causes threads to terminate after work is done
119125
}
120126
}
121127

122128
// Handles Kafka Messages
123-
private class MessageHandler[K: ClassTag, V: ClassTag](stream: KafkaStream[K, V])
129+
private class MessageHandler(stream: KafkaStream[K, V])
124130
extends Runnable {
125131
def run() {
126132
logInfo("Starting MessageHandler.")
127-
for (msgAndMetadata <- stream) {
128-
store((msgAndMetadata.key, msgAndMetadata.message))
133+
try {
134+
for (msgAndMetadata <- stream) {
135+
store((msgAndMetadata.key, msgAndMetadata.message))
136+
}
137+
} catch {
138+
case e: Throwable => logError("Error handling message; exiting", e)
129139
}
130140
}
131141
}
132142

133-
// It is our responsibility to delete the consumer group when specifying autooffset.reset. This
143+
// It is our responsibility to delete the consumer group when specifying auto.offset.reset. This
134144
// is because Kafka 0.7.2 only honors this param when the group is not in zookeeper.
135145
//
136146
// The kafka high level consumer doesn't expose setting offsets currently, this is a trick copied
137-
// from Kafkas' ConsoleConsumer. See code related to 'autooffset.reset' when it is set to
147+
// from Kafka's ConsoleConsumer. See code related to 'auto.offset.reset' when it is set to
138148
// 'smallest'/'largest':
139149
// scalastyle:off
140150
// https://github.com/apache/kafka/blob/0.7.2/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
141151
// scalastyle:on
142152
private def tryZookeeperConsumerGroupCleanup(zkUrl: String, groupId: String) {
153+
val dir = "/consumers/" + groupId
154+
logInfo("Cleaning up temporary Zookeeper data under " + dir + ".")
155+
val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer)
143156
try {
144-
val dir = "/consumers/" + groupId
145-
logInfo("Cleaning up temporary zookeeper data under " + dir + ".")
146-
val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer)
147157
zk.deleteRecursive(dir)
148-
zk.close()
149158
} catch {
150-
case _ : Throwable => // swallow
159+
case e: Throwable => logWarning("Error cleaning up temporary Zookeeper data", e)
160+
} finally {
161+
zk.close()
151162
}
152163
}
153164
}

0 commit comments

Comments
 (0)