Skip to content

Commit 0367981

Browse files
committed
Merge pull request apache#482 from tdas/streaming-example-fix
Added StreamingContext.awaitTermination to streaming examples StreamingContext.start() currently starts a non-daemon thread which prevents termination of a Spark Streaming program even if main function has exited. Since the expected behavior of a streaming program is to run until explicitly killed, this was sort of fine when spark streaming applications are launched from the command line. However, when launched in Yarn-standalone mode, this did not work as the driver effectively got terminated when the main function exits. So SparkStreaming examples did not work on Yarn. This addition to the examples ensures that the examples work on Yarn and also ensures that everyone learns that StreamingContext.awaitTermination() being necessary for SparkStreaming programs to wait. The true bug-fix of making sure all threads by Spark Streaming are daemon threads is left for post-0.9.
2 parents 7373ffb + 2e95174 commit 0367981

17 files changed

+17
-0
lines changed

examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,5 +70,6 @@ public String call(Long in) {
7070
}).print();
7171

7272
ssc.start();
73+
ssc.awaitTermination();
7374
}
7475
}

examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,5 +104,6 @@ public Integer call(Integer i1, Integer i2) {
104104

105105
wordCounts.print();
106106
jssc.start();
107+
jssc.awaitTermination();
107108
}
108109
}

examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,5 +84,6 @@ public Integer call(Integer i1, Integer i2) {
8484

8585
wordCounts.print();
8686
ssc.start();
87+
ssc.awaitTermination();
8788
}
8889
}

examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,5 +80,6 @@ public Integer call(Integer i1, Integer i2) {
8080

8181
reducedStream.print();
8282
ssc.start();
83+
ssc.awaitTermination();
8384
}
8485
}

examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,5 +171,6 @@ object ActorWordCount {
171171
lines.flatMap(_.split("\\s+")).map(x => (x, 1)).reduceByKey(_ + _).print()
172172

173173
ssc.start()
174+
ssc.awaitTermination()
174175
}
175176
}

examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,5 +60,6 @@ object FlumeEventCount {
6060
stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
6161

6262
ssc.start()
63+
ssc.awaitTermination()
6364
}
6465
}

examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ object HdfsWordCount {
5050
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
5151
wordCounts.print()
5252
ssc.start()
53+
ssc.awaitTermination()
5354
}
5455
}
5556

examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ object KafkaWordCount {
6161
wordCounts.print()
6262

6363
ssc.start()
64+
ssc.awaitTermination()
6465
}
6566
}
6667

examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,5 +101,6 @@ object MQTTWordCount {
101101
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
102102
wordCounts.print()
103103
ssc.start()
104+
ssc.awaitTermination()
104105
}
105106
}

examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,5 +54,6 @@ object NetworkWordCount {
5454
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
5555
wordCounts.print()
5656
ssc.start()
57+
ssc.awaitTermination()
5758
}
5859
}

0 commit comments

Comments
 (0)