Skip to content

Commit 5525f10

Browse files
committed
Fix flaky issue of Kafka real unit test
1 parent 4559310 commit 5525f10

File tree

3 files changed

+70
-31
lines changed

3 files changed

+70
-31
lines changed

external/kafka/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@
7373
<dependency>
7474
<groupId>net.sf.jopt-simple</groupId>
7575
<artifactId>jopt-simple</artifactId>
76-
<version>4.5</version>
76+
<version>3.2</version>
7777
<scope>test</scope>
7878
</dependency>
7979
<dependency>

external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,10 +61,11 @@ public void tearDown() {
6161
}
6262

6363
@Test
64-
public void testKafkaStream() {
64+
public void testKafkaStream() throws InterruptedException {
6565
String topic = "topic1";
6666
HashMap<String, Integer> topics = new HashMap<String, Integer>();
6767
topics.put(topic, 1);
68+
testSuite.createTopic(topic);
6869

6970
HashMap<String, Integer> sent = new HashMap<String, Integer>();
7071
sent.put("a", 5);
@@ -107,13 +108,15 @@ public Void call(JavaPairRDD<String, Long> rdd) throws Exception {
107108

108109
ssc.start();
109110

111+
// Sleep to let Receiver start first
112+
Thread.sleep(3000);
113+
110114
HashMap<String, Object> tmp = new HashMap<String, Object>(sent);
111-
testSuite.produceAndSendTestMessage(topic,
115+
testSuite.produceAndSendMessage(topic,
112116
JavaConverters.asScalaMapConverter(tmp).asScala().toMap(
113-
Predef.<Tuple2<String, Object>>conforms()
114-
));
117+
Predef.<Tuple2<String, Object>>conforms()));
115118

116-
ssc.awaitTermination(10000);
119+
ssc.awaitTermination(3000);
117120

118121
Assert.assertEquals(sent.size(), result.size());
119122
for (String k : sent.keySet()) {

external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala

Lines changed: 61 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -27,26 +27,32 @@ import kafka.admin.CreateTopicCommand
2727
import kafka.common.TopicAndPartition
2828
import kafka.producer.{KeyedMessage, ProducerConfig, Producer}
2929
import kafka.utils.ZKStringSerializer
30-
import kafka.serializer.StringEncoder
30+
import kafka.serializer.{StringDecoder, StringEncoder}
3131
import kafka.server.{KafkaConfig, KafkaServer}
3232

33-
import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
33+
import org.I0Itec.zkclient.ZkClient
34+
3435
import org.apache.zookeeper.server.ZooKeeperServer
3536
import org.apache.zookeeper.server.NIOServerCnxnFactory
3637

37-
import org.I0Itec.zkclient.ZkClient
38+
import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
39+
import org.apache.spark.storage.StorageLevel
3840

3941
class KafkaStreamSuite extends TestSuiteBase {
42+
import KafkaStreamSuite._
43+
4044
val zkConnect = "localhost:2181"
41-
var zookeeper: EmbeddedZookeeper = _
42-
var zkClient: ZkClient = _
4345
val zkConnectionTimeout = 6000
4446
val zkSessionTimeout = 6000
4547

4648
val brokerPort = 9092
4749
val brokerProps = getBrokerConfig(brokerPort)
4850
val brokerConf = new KafkaConfig(brokerProps)
49-
var server: KafkaServer = _
51+
52+
protected var zookeeper: EmbeddedZookeeper = _
53+
protected var zkClient: ZkClient = _
54+
protected var server: KafkaServer = _
55+
protected var producer: Producer[String, String] = _
5056

5157
override def useManualClock = false
5258

@@ -68,21 +74,32 @@ class KafkaStreamSuite extends TestSuiteBase {
6874
}
6975

7076
override def afterFunction() {
77+
producer.close()
7178
server.shutdown()
72-
brokerConf.logDirs.foreach { f => KafkaStreamSuite.deleteDir(new File(f)) }
79+
brokerConf.logDirs.foreach { f => deleteDir(new File(f)) }
7380

7481
zkClient.close()
7582
zookeeper.shutdown()
7683

7784
super.afterFunction()
7885
}
7986

80-
test("kafka input stream") {
87+
test("Kafka input stream") {
8188
val ssc = new StreamingContext(master, framework, batchDuration)
8289
val topic = "topic1"
8390
val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
84-
85-
val stream = KafkaUtils.createStream(ssc, zkConnect, "group", Map(topic -> 1))
91+
createTopic(topic)
92+
produceAndSendMessage(topic, sent)
93+
94+
val kafkaParams = Map("zookeeper.connect" -> zkConnect,
95+
"group.id" -> s"test-consumer-${random.nextInt(10000)}",
96+
"auto.offset.reset" -> "smallest")
97+
98+
val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
99+
ssc,
100+
kafkaParams,
101+
Map(topic -> 1),
102+
StorageLevel.MEMORY_ONLY)
86103
val result = new mutable.HashMap[String, Long]()
87104
stream.map { case (k, v) => v }
88105
.countByValue()
@@ -94,8 +111,7 @@ class KafkaStreamSuite extends TestSuiteBase {
94111
}
95112
}
96113
ssc.start()
97-
produceAndSendTestMessage(topic, sent)
98-
ssc.awaitTermination(10000)
114+
ssc.awaitTermination(3000)
99115

100116
assert(sent.size === result.size)
101117
sent.keys.foreach { k => assert(sent(k) === result(k).toInt) }
@@ -108,7 +124,7 @@ class KafkaStreamSuite extends TestSuiteBase {
108124
props.put("broker.id", "0")
109125
props.put("host.name", "localhost")
110126
props.put("port", port.toString)
111-
props.put("log.dir", KafkaStreamSuite.tmpDir().getAbsolutePath)
127+
props.put("log.dir", createTmpDir().getAbsolutePath)
112128
props.put("zookeeper.connect", zkConnect)
113129
props.put("log.flush.interval.messages", "1")
114130
props.put("replica.socket.timeout.ms", "1500")
@@ -130,28 +146,27 @@ class KafkaStreamSuite extends TestSuiteBase {
130146
messages.toSeq
131147
}
132148

133-
def produceAndSendTestMessage(topic: String, sent: Map[String, Int]) {
134-
val brokerAddr = brokerConf.hostName + ":" + brokerConf.port
135-
val producer = new Producer[String, String](new ProducerConfig(getProducerConfig(brokerAddr)))
149+
def createTopic(topic: String) {
136150
CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "0")
137151
logInfo("==================== 5 ====================")
138152
// wait until metadata is propagated
139-
Thread.sleep(1000)
140-
assert(server.apis.leaderCache.keySet.contains(TopicAndPartition(topic, 0)))
141-
producer.send(createTestMessage(topic, sent): _*)
142-
Thread.sleep(1000)
153+
waitUntilMetadataIsPropagated(Seq(server), topic, 0, 1000)
154+
}
143155

156+
def produceAndSendMessage(topic: String, sent: Map[String, Int]) {
157+
val brokerAddr = brokerConf.hostName + ":" + brokerConf.port
158+
producer = new Producer[String, String](new ProducerConfig(getProducerConfig(brokerAddr)))
159+
producer.send(createTestMessage(topic, sent): _*)
144160
logInfo("==================== 6 ====================")
145-
producer.close()
146161
}
147162
}
148163

149164
object KafkaStreamSuite {
150165
val random = new Random()
151166

152-
def tmpDir(): File = {
167+
def createTmpDir(): File = {
153168
val tmp = System.getProperty("java.io.tmpdir")
154-
val f = new File(tmp, "spark-kafka-" + random.nextInt(1000))
169+
val f = new File(tmp, "spark-kafka-" + random.nextInt(10000))
155170
f.mkdirs()
156171
f
157172
}
@@ -166,12 +181,33 @@ object KafkaStreamSuite {
166181
file.delete()
167182
}
168183
}
184+
185+
def waitUntilTrue(condition: () => Boolean, waitTime: Long): Boolean = {
186+
val startTime = System.currentTimeMillis()
187+
while (true) {
188+
if (condition())
189+
return true
190+
if (System.currentTimeMillis() > startTime + waitTime)
191+
return false
192+
Thread.sleep(waitTime.min(100L))
193+
}
194+
// Should never go to here
195+
throw new RuntimeException("unexpected error")
196+
}
197+
198+
def waitUntilMetadataIsPropagated(servers: Seq[KafkaServer], topic: String, partition: Int,
199+
timeout: Long) {
200+
assert(waitUntilTrue(() =>
201+
servers.foldLeft(true)(_ && _.apis.leaderCache.keySet.contains(
202+
TopicAndPartition(topic, partition))), timeout),
203+
s"Partition [$topic, $partition] metadata not propagated after timeout")
204+
}
169205
}
170206

171207
class EmbeddedZookeeper(val zkConnect: String) {
172208
val random = new Random()
173-
val snapshotDir = KafkaStreamSuite.tmpDir()
174-
val logDir = KafkaStreamSuite.tmpDir()
209+
val snapshotDir = KafkaStreamSuite.createTmpDir()
210+
val logDir = KafkaStreamSuite.createTmpDir()
175211

176212
val zookeeper = new ZooKeeperServer(snapshotDir, logDir, 500)
177213
val(ip, port) = {

0 commit comments

Comments
 (0)