Skip to content

Commit 43f5290

Browse files
committed
Made functions that create input streams return InputDStream and NetworkInputDStream, for both Scala and Java.
1 parent 2c94579 commit 43f5290

File tree

24 files changed

+289
-92
lines changed

24 files changed

+289
-92
lines changed

external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ package org.apache.spark.streaming.flume
1919

2020
import org.apache.spark.storage.StorageLevel
2121
import org.apache.spark.streaming.StreamingContext
22-
import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaDStream}
23-
import org.apache.spark.streaming.dstream.DStream
22+
import org.apache.spark.streaming.api.java.{JavaNetworkInputDStream, JavaInputDStream, JavaStreamingContext, JavaDStream}
23+
import org.apache.spark.streaming.dstream.{NetworkInputDStream, DStream}
2424

2525
object FlumeUtils {
2626
/**
@@ -35,7 +35,7 @@ object FlumeUtils {
3535
hostname: String,
3636
port: Int,
3737
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
38-
): DStream[SparkFlumeEvent] = {
38+
): NetworkInputDStream[SparkFlumeEvent] = {
3939
val inputStream = new FlumeInputDStream[SparkFlumeEvent](ssc, hostname, port, storageLevel)
4040
inputStream
4141
}
@@ -50,7 +50,7 @@ object FlumeUtils {
5050
jssc: JavaStreamingContext,
5151
hostname: String,
5252
port: Int
53-
): JavaDStream[SparkFlumeEvent] = {
53+
): JavaNetworkInputDStream[SparkFlumeEvent] = {
5454
createStream(jssc.ssc, hostname, port)
5555
}
5656

@@ -65,7 +65,7 @@ object FlumeUtils {
6565
hostname: String,
6666
port: Int,
6767
storageLevel: StorageLevel
68-
): JavaDStream[SparkFlumeEvent] = {
68+
): JavaNetworkInputDStream[SparkFlumeEvent] = {
6969
createStream(jssc.ssc, hostname, port, storageLevel)
7070
}
7171
}

external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,15 @@
2121
import org.apache.spark.streaming.LocalJavaStreamingContext;
2222
import org.apache.spark.streaming.api.java.JavaDStream;
2323

24+
import org.apache.spark.streaming.api.java.JavaNetworkInputDStream;
2425
import org.junit.Test;
2526

2627
public class JavaFlumeStreamSuite extends LocalJavaStreamingContext {
2728
@Test
2829
public void testFlumeStream() {
2930
// tests the API, does not actually test data receiving
30-
JavaDStream<SparkFlumeEvent> test1 = FlumeUtils.createStream(ssc, "localhost", 12345);
31-
JavaDStream<SparkFlumeEvent> test2 = FlumeUtils.createStream(ssc, "localhost", 12345,
31+
JavaNetworkInputDStream<SparkFlumeEvent> test1 = FlumeUtils.createStream(ssc, "localhost", 12345);
32+
JavaNetworkInputDStream<SparkFlumeEvent> test2 = FlumeUtils.createStream(ssc, "localhost", 12345,
3233
StorageLevel.MEMORY_AND_DISK_SER_2());
3334
}
3435
}

external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol}
3131
import org.apache.spark.storage.StorageLevel
3232
import org.apache.spark.streaming.{TestOutputStream, StreamingContext, TestSuiteBase}
3333
import org.apache.spark.streaming.util.ManualClock
34+
import org.apache.spark.streaming.api.java.JavaNetworkInputDStream
3435

3536
class FlumeStreamSuite extends TestSuiteBase {
3637

@@ -39,10 +40,11 @@ class FlumeStreamSuite extends TestSuiteBase {
3940
test("flume input stream") {
4041
// Set up the streaming context and input streams
4142
val ssc = new StreamingContext(conf, batchDuration)
42-
val flumeStream = FlumeUtils.createStream(ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK)
43+
val flumeStream: JavaNetworkInputDStream[SparkFlumeEvent] =
44+
FlumeUtils.createStream(ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK)
4345
val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
4446
with SynchronizedBuffer[Seq[SparkFlumeEvent]]
45-
val outputStream = new TestOutputStream(flumeStream, outputBuffer)
47+
val outputStream = new TestOutputStream(flumeStream.networkInputDStream, outputBuffer)
4648
outputStream.register()
4749
ssc.start()
4850

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ import kafka.serializer.{Decoder, StringDecoder}
2727

2828
import org.apache.spark.storage.StorageLevel
2929
import org.apache.spark.streaming.StreamingContext
30-
import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaPairDStream}
31-
import org.apache.spark.streaming.dstream.DStream
30+
import org.apache.spark.streaming.api.java.{JavaPairNetworkInputDStream, JavaStreamingContext, JavaPairDStream}
31+
import org.apache.spark.streaming.dstream.{NetworkInputDStream, DStream}
3232

3333

3434
object KafkaUtils {
@@ -48,7 +48,7 @@ object KafkaUtils {
4848
groupId: String,
4949
topics: Map[String, Int],
5050
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
51-
): DStream[(String, String)] = {
51+
): NetworkInputDStream[(String, String)] = {
5252
val kafkaParams = Map[String, String](
5353
"zookeeper.connect" -> zkQuorum, "group.id" -> groupId,
5454
"zookeeper.connection.timeout.ms" -> "10000")
@@ -70,7 +70,7 @@ object KafkaUtils {
7070
kafkaParams: Map[String, String],
7171
topics: Map[String, Int],
7272
storageLevel: StorageLevel
73-
): DStream[(K, V)] = {
73+
): NetworkInputDStream[(K, V)] = {
7474
new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, storageLevel)
7575
}
7676

@@ -88,7 +88,7 @@ object KafkaUtils {
8888
zkQuorum: String,
8989
groupId: String,
9090
topics: JMap[String, JInt]
91-
): JavaPairDStream[String, String] = {
91+
): JavaPairNetworkInputDStream[String, String] = {
9292
implicit val cmt: ClassTag[String] =
9393
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
9494
createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*))
@@ -110,7 +110,7 @@ object KafkaUtils {
110110
groupId: String,
111111
topics: JMap[String, JInt],
112112
storageLevel: StorageLevel
113-
): JavaPairDStream[String, String] = {
113+
): JavaPairNetworkInputDStream[String, String] = {
114114
implicit val cmt: ClassTag[String] =
115115
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
116116
createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*),
@@ -139,7 +139,7 @@ object KafkaUtils {
139139
kafkaParams: JMap[String, String],
140140
topics: JMap[String, JInt],
141141
storageLevel: StorageLevel
142-
): JavaPairDStream[K, V] = {
142+
): JavaPairNetworkInputDStream[K, V] = {
143143
implicit val keyCmt: ClassTag[K] =
144144
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
145145
implicit val valueCmt: ClassTag[V] =

external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
package org.apache.spark.streaming.kafka;
1919

2020
import java.util.HashMap;
21+
22+
import org.apache.spark.streaming.api.java.JavaPairNetworkInputDStream;
2123
import org.junit.Test;
2224
import com.google.common.collect.Maps;
2325
import kafka.serializer.StringDecoder;
@@ -31,14 +33,15 @@ public void testKafkaStream() {
3133
HashMap<String, Integer> topics = Maps.newHashMap();
3234

3335
// tests the API, does not actually test data receiving
34-
JavaPairDStream<String, String> test1 = KafkaUtils.createStream(ssc, "localhost:12345", "group", topics);
35-
JavaPairDStream<String, String> test2 = KafkaUtils.createStream(ssc, "localhost:12345", "group", topics,
36+
JavaPairNetworkInputDStream<String, String> test1 =
37+
KafkaUtils.createStream(ssc, "localhost:12345", "group", topics);
38+
JavaPairNetworkInputDStream<String, String> test2 = KafkaUtils.createStream(ssc, "localhost:12345", "group", topics,
3639
StorageLevel.MEMORY_AND_DISK_SER_2());
3740

3841
HashMap<String, String> kafkaParams = Maps.newHashMap();
3942
kafkaParams.put("zookeeper.connect", "localhost:12345");
4043
kafkaParams.put("group.id","consumer-group");
41-
JavaPairDStream<String, String> test3 = KafkaUtils.createStream(ssc,
44+
JavaPairNetworkInputDStream<String, String> test3 = KafkaUtils.createStream(ssc,
4245
String.class, String.class, StringDecoder.class, StringDecoder.class,
4346
kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2());
4447
}

external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.streaming.kafka
2020
import kafka.serializer.StringDecoder
2121
import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
2222
import org.apache.spark.storage.StorageLevel
23+
import org.apache.spark.streaming.dstream.NetworkInputDStream
2324

2425
class KafkaStreamSuite extends TestSuiteBase {
2526

@@ -28,11 +29,15 @@ class KafkaStreamSuite extends TestSuiteBase {
2829
val topics = Map("my-topic" -> 1)
2930

3031
// tests the API, does not actually test data receiving
31-
val test1 = KafkaUtils.createStream(ssc, "localhost:1234", "group", topics)
32-
val test2 = KafkaUtils.createStream(ssc, "localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK_SER_2)
32+
val test1: NetworkInputDStream[(String, String)] =
33+
KafkaUtils.createStream(ssc, "localhost:1234", "group", topics)
34+
val test2: NetworkInputDStream[(String, String)] =
35+
KafkaUtils.createStream(ssc, "localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK_SER_2)
3336
val kafkaParams = Map("zookeeper.connect"->"localhost:12345","group.id"->"consumer-group")
34-
val test3 = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
37+
val test3: NetworkInputDStream[(String, String)] =
38+
KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
3539
ssc, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2)
40+
assert(test1.isInstanceOf)
3641

3742
// TODO: Actually test receiving data
3843
ssc.stop()

external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@ package org.apache.spark.streaming.mqtt
1919

2020
import org.apache.spark.storage.StorageLevel
2121
import org.apache.spark.streaming.StreamingContext
22-
import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaDStream}
22+
import org.apache.spark.streaming.api.java.{JavaNetworkInputDStream, JavaStreamingContext, JavaDStream}
2323
import scala.reflect.ClassTag
24-
import org.apache.spark.streaming.dstream.DStream
24+
import org.apache.spark.streaming.dstream.{NetworkInputDStream, DStream}
2525

2626
object MQTTUtils {
2727
/**
@@ -36,7 +36,7 @@ object MQTTUtils {
3636
brokerUrl: String,
3737
topic: String,
3838
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
39-
): DStream[String] = {
39+
): NetworkInputDStream[String] = {
4040
new MQTTInputDStream(ssc, brokerUrl, topic, storageLevel)
4141
}
4242

@@ -51,7 +51,7 @@ object MQTTUtils {
5151
jssc: JavaStreamingContext,
5252
brokerUrl: String,
5353
topic: String
54-
): JavaDStream[String] = {
54+
): JavaNetworkInputDStream[String] = {
5555
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
5656
createStream(jssc.ssc, brokerUrl, topic)
5757
}
@@ -68,7 +68,7 @@ object MQTTUtils {
6868
brokerUrl: String,
6969
topic: String,
7070
storageLevel: StorageLevel
71-
): JavaDStream[String] = {
71+
): JavaNetworkInputDStream[String] = {
7272
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
7373
createStream(jssc.ssc, brokerUrl, topic, storageLevel)
7474
}

external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
import org.apache.spark.storage.StorageLevel;
2121
import org.apache.spark.streaming.api.java.JavaDStream;
22+
import org.apache.spark.streaming.api.java.JavaNetworkInputDStream;
23+
import org.apache.spark.streaming.api.java.JavaPairNetworkInputDStream;
2224
import org.junit.Test;
2325

2426
import org.apache.spark.streaming.LocalJavaStreamingContext;
@@ -30,8 +32,8 @@ public void testMQTTStream() {
3032
String topic = "def";
3133

3234
// tests the API, does not actually test data receiving
33-
JavaDStream<String> test1 = MQTTUtils.createStream(ssc, brokerUrl, topic);
34-
JavaDStream<String> test2 = MQTTUtils.createStream(ssc, brokerUrl, topic,
35+
JavaNetworkInputDStream<String> test1 = MQTTUtils.createStream(ssc, brokerUrl, topic);
36+
JavaNetworkInputDStream<String> test2 = MQTTUtils.createStream(ssc, brokerUrl, topic,
3537
StorageLevel.MEMORY_AND_DISK_SER_2());
3638
}
3739
}

external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.streaming.mqtt
1919

2020
import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
2121
import org.apache.spark.storage.StorageLevel
22+
import org.apache.spark.streaming.dstream.NetworkInputDStream
2223

2324
class MQTTStreamSuite extends TestSuiteBase {
2425

@@ -28,8 +29,9 @@ class MQTTStreamSuite extends TestSuiteBase {
2829
val topic = "def"
2930

3031
// tests the API, does not actually test data receiving
31-
val test1 = MQTTUtils.createStream(ssc, brokerUrl, topic)
32-
val test2 = MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2)
32+
val test1: NetworkInputDStream[String] = MQTTUtils.createStream(ssc, brokerUrl, topic)
33+
val test2: NetworkInputDStream[String] =
34+
MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2)
3335

3436
// TODO: Actually test receiving data
3537
ssc.stop()

external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ import twitter4j.Status
2121
import twitter4j.auth.Authorization
2222
import org.apache.spark.storage.StorageLevel
2323
import org.apache.spark.streaming.StreamingContext
24-
import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext}
25-
import org.apache.spark.streaming.dstream.DStream
24+
import org.apache.spark.streaming.api.java.{JavaNetworkInputDStream, JavaDStream, JavaStreamingContext}
25+
import org.apache.spark.streaming.dstream.{NetworkInputDStream, DStream}
2626

2727
object TwitterUtils {
2828
/**
@@ -40,7 +40,7 @@ object TwitterUtils {
4040
twitterAuth: Option[Authorization],
4141
filters: Seq[String] = Nil,
4242
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
43-
): DStream[Status] = {
43+
): NetworkInputDStream[Status] = {
4444
new TwitterInputDStream(ssc, twitterAuth, filters, storageLevel)
4545
}
4646

@@ -52,7 +52,7 @@ object TwitterUtils {
5252
* Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
5353
* @param jssc JavaStreamingContext object
5454
*/
55-
def createStream(jssc: JavaStreamingContext): JavaDStream[Status] = {
55+
def createStream(jssc: JavaStreamingContext): JavaNetworkInputDStream[Status] = {
5656
createStream(jssc.ssc, None)
5757
}
5858

@@ -65,7 +65,8 @@ object TwitterUtils {
6565
* @param jssc JavaStreamingContext object
6666
* @param filters Set of filter strings to get only those tweets that match them
6767
*/
68-
def createStream(jssc: JavaStreamingContext, filters: Array[String]): JavaDStream[Status] = {
68+
def createStream(jssc: JavaStreamingContext, filters: Array[String]
69+
): JavaNetworkInputDStream[Status] = {
6970
createStream(jssc.ssc, None, filters)
7071
}
7172

@@ -82,7 +83,7 @@ object TwitterUtils {
8283
jssc: JavaStreamingContext,
8384
filters: Array[String],
8485
storageLevel: StorageLevel
85-
): JavaDStream[Status] = {
86+
): JavaNetworkInputDStream[Status] = {
8687
createStream(jssc.ssc, None, filters, storageLevel)
8788
}
8889

@@ -92,7 +93,8 @@ object TwitterUtils {
9293
* @param jssc JavaStreamingContext object
9394
* @param twitterAuth Twitter4J Authorization
9495
*/
95-
def createStream(jssc: JavaStreamingContext, twitterAuth: Authorization): JavaDStream[Status] = {
96+
def createStream(jssc: JavaStreamingContext, twitterAuth: Authorization
97+
): JavaNetworkInputDStream[Status] = {
9698
createStream(jssc.ssc, Some(twitterAuth))
9799
}
98100

@@ -107,7 +109,7 @@ object TwitterUtils {
107109
jssc: JavaStreamingContext,
108110
twitterAuth: Authorization,
109111
filters: Array[String]
110-
): JavaDStream[Status] = {
112+
): JavaNetworkInputDStream[Status] = {
111113
createStream(jssc.ssc, Some(twitterAuth), filters)
112114
}
113115

@@ -123,7 +125,7 @@ object TwitterUtils {
123125
twitterAuth: Authorization,
124126
filters: Array[String],
125127
storageLevel: StorageLevel
126-
): JavaDStream[Status] = {
128+
): JavaNetworkInputDStream[Status] = {
127129
createStream(jssc.ssc, Some(twitterAuth), filters, storageLevel)
128130
}
129131
}

0 commit comments

Comments
 (0)