Skip to content

Commit 9f33873

Browse files
committed
Add the Python API for Flume
1 parent 4c5889e commit 9f33873

File tree

8 files changed

+427
-5
lines changed

8 files changed

+427
-5
lines changed

docs/streaming-flume-integration.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,15 @@ configuring Flume agents.
5858
See the [API docs](api/java/index.html?org/apache/spark/streaming/flume/FlumeUtils.html)
5959
and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java).
6060
</div>
61+
<div data-lang="python" markdown="1">
62+
from pyspark.streaming.flume import FlumeUtils
63+
64+
flumeStream = FlumeUtils.createStream(streamingContext, [chosen machine's hostname], [chosen port])
65+
66+
By default, the Python API will decode Flume event body as UTF8 encoded strings. You can specify your custom decoding function to decode the body byte arrays in Flume events to any arbitrary data type.
67+
See the [API docs](api/python/pyspark.streaming.html#pyspark.streaming.flume.FlumeUtils)
68+
and the [example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/streaming/flume_wordcount.py).
69+
</div>
6170
</div>
6271

6372
Note that the hostname should be the same as the one used by the resource manager in the
@@ -129,6 +138,12 @@ configuring Flume agents.
129138
JavaReceiverInputDStream<SparkFlumeEvent>flumeStream =
130139
FlumeUtils.createPollingStream(streamingContext, [sink machine hostname], [sink port]);
131140
</div>
141+
<div data-lang="python" markdown="1">
142+
from pyspark.streaming.flume import FlumeUtils
143+
144+
addresses = [([sink machine hostname 1], [sink port 1]), ([sink machine hostname 2], [sink port 2])]
145+
flumeStream = FlumeUtils.createPollingStream(streamingContext, addresses)
146+
</div>
132147
</div>
133148

134149
See the Scala example [FlumePollingEventCount]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala).

docs/streaming-programming-guide.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -683,7 +683,7 @@ for Java, and [StreamingContext](api/python/pyspark.streaming.html#pyspark.strea
683683
{:.no_toc}
684684

685685
<span class="badge" style="background-color: grey">Python API</span> As of Spark {{site.SPARK_VERSION_SHORT}},
686-
out of these sources, *only* Kafka is available in the Python API. We will add more advanced sources in the Python API in future.
686+
out of these sources, *only* Kafka and Flume are available in the Python API. We will add more advanced sources in the Python API in future.
687687

688688
This category of sources require interfacing with external non-Spark libraries, some of them with
689689
complex dependencies (e.g., Kafka and Flume). Hence, to minimize issues related to version conflicts
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
"""
19+
Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
20+
Usage: flume_wordcount.py <hostname> <port>
21+
22+
To run this on your local machine, you need to setup Flume first, see
23+
https://flume.apache.org/documentation.html
24+
25+
and then run the example
26+
`$ bin/spark-submit --jars external/flume-assembly/target/scala-*/\
27+
spark-streaming-flume-assembly-*.jar examples/src/main/python/streaming/flume_wordcount.py \
28+
localhost 12345
29+
"""
30+
from __future__ import print_function
31+
32+
import sys
33+
34+
from pyspark import SparkContext
35+
from pyspark.streaming import StreamingContext
36+
from pyspark.streaming.flume import FlumeUtils
37+
38+
if __name__ == "__main__":
39+
if len(sys.argv) != 3:
40+
print("Usage: flume_wordcount.py <hostname> <port>", file=sys.stderr)
41+
exit(-1)
42+
43+
sc = SparkContext(appName="PythonStreamingFlumeWordCount")
44+
ssc = StreamingContext(sc, 1)
45+
46+
hostname, port = sys.argv[1:]
47+
kvs = FlumeUtils.createStream(ssc, hostname, int(port))
48+
lines = kvs.map(lambda x: x[1])
49+
counts = lines.flatMap(lambda line: line.split(" ")) \
50+
.map(lambda word: (word, 1)) \
51+
.reduceByKey(lambda a, b: a+b)
52+
counts.pprint()
53+
54+
ssc.start()
55+
ssc.awaitTermination()

external/flume-assembly/pom.xml

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
~ Licensed to the Apache Software Foundation (ASF) under one or more
4+
~ contributor license agreements. See the NOTICE file distributed with
5+
~ this work for additional information regarding copyright ownership.
6+
~ The ASF licenses this file to You under the Apache License, Version 2.0
7+
~ (the "License"); you may not use this file except in compliance with
8+
~ the License. You may obtain a copy of the License at
9+
~
10+
~ http://www.apache.org/licenses/LICENSE-2.0
11+
~
12+
~ Unless required by applicable law or agreed to in writing, software
13+
~ distributed under the License is distributed on an "AS IS" BASIS,
14+
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
~ See the License for the specific language governing permissions and
16+
~ limitations under the License.
17+
-->
18+
19+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
20+
<modelVersion>4.0.0</modelVersion>
21+
<parent>
22+
<groupId>org.apache.spark</groupId>
23+
<artifactId>spark-parent_2.10</artifactId>
24+
<version>1.5.0-SNAPSHOT</version>
25+
<relativePath>../../pom.xml</relativePath>
26+
</parent>
27+
28+
<groupId>org.apache.spark</groupId>
29+
<artifactId>spark-streaming-flume-assembly_2.10</artifactId>
30+
<packaging>jar</packaging>
31+
<name>Spark Project External Flume Assembly</name>
32+
<url>http://spark.apache.org/</url>
33+
34+
<properties>
35+
<sbt.project.name>streaming-flume-assembly</sbt.project.name>
36+
</properties>
37+
38+
<dependencies>
39+
<dependency>
40+
<groupId>org.apache.spark</groupId>
41+
<artifactId>spark-streaming-flume_${scala.binary.version}</artifactId>
42+
<version>${project.version}</version>
43+
</dependency>
44+
<dependency>
45+
<groupId>org.apache.spark</groupId>
46+
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
47+
<version>${project.version}</version>
48+
<scope>provided</scope>
49+
</dependency>
50+
<dependency>
51+
<groupId>org.apache.avro</groupId>
52+
<artifactId>avro</artifactId>
53+
<version>${avro.version}</version>
54+
</dependency>
55+
<dependency>
56+
<groupId>org.apache.avro</groupId>
57+
<artifactId>avro-ipc</artifactId>
58+
<version>${avro.version}</version>
59+
<exclusions>
60+
<exclusion>
61+
<groupId>io.netty</groupId>
62+
<artifactId>netty</artifactId>
63+
</exclusion>
64+
<exclusion>
65+
<groupId>org.mortbay.jetty</groupId>
66+
<artifactId>jetty</artifactId>
67+
</exclusion>
68+
<exclusion>
69+
<groupId>org.mortbay.jetty</groupId>
70+
<artifactId>jetty-util</artifactId>
71+
</exclusion>
72+
<exclusion>
73+
<groupId>org.mortbay.jetty</groupId>
74+
<artifactId>servlet-api</artifactId>
75+
</exclusion>
76+
<exclusion>
77+
<groupId>org.apache.velocity</groupId>
78+
<artifactId>velocity</artifactId>
79+
</exclusion>
80+
</exclusions>
81+
</dependency>
82+
</dependencies>
83+
84+
<build>
85+
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
86+
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
87+
<plugins>
88+
<plugin>
89+
<groupId>org.apache.maven.plugins</groupId>
90+
<artifactId>maven-shade-plugin</artifactId>
91+
<configuration>
92+
<shadedArtifactAttached>false</shadedArtifactAttached>
93+
<artifactSet>
94+
<includes>
95+
<include>*:*</include>
96+
</includes>
97+
</artifactSet>
98+
<filters>
99+
<filter>
100+
<artifact>*:*</artifact>
101+
<excludes>
102+
<exclude>META-INF/*.SF</exclude>
103+
<exclude>META-INF/*.DSA</exclude>
104+
<exclude>META-INF/*.RSA</exclude>
105+
</excludes>
106+
</filter>
107+
</filters>
108+
</configuration>
109+
<executions>
110+
<execution>
111+
<phase>package</phase>
112+
<goals>
113+
<goal>shade</goal>
114+
</goals>
115+
<configuration>
116+
<transformers>
117+
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
118+
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
119+
<resource>reference.conf</resource>
120+
</transformer>
121+
<transformer implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
122+
<resource>log4j.properties</resource>
123+
</transformer>
124+
<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"/>
125+
<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/>
126+
</transformers>
127+
</configuration>
128+
</execution>
129+
</executions>
130+
</plugin>
131+
</plugins>
132+
</build>
133+
</project>
134+

external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala

Lines changed: 75 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,16 @@
1818
package org.apache.spark.streaming.flume
1919

2020
import java.net.InetSocketAddress
21+
import java.io.{DataOutputStream, ByteArrayOutputStream}
22+
import java.util.{List => JList, Map => JMap}
2123

24+
import scala.collection.JavaConversions._
25+
26+
import org.apache.spark.api.java.function.PairFunction
27+
import org.apache.spark.api.python.PythonRDD
2228
import org.apache.spark.storage.StorageLevel
2329
import org.apache.spark.streaming.StreamingContext
24-
import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext}
30+
import org.apache.spark.streaming.api.java.{JavaPairDStream, JavaReceiverInputDStream, JavaStreamingContext}
2531
import org.apache.spark.streaming.dstream.ReceiverInputDStream
2632

2733

@@ -236,3 +242,71 @@ object FlumeUtils {
236242
createPollingStream(jssc.ssc, addresses, storageLevel, maxBatchSize, parallelism)
237243
}
238244
}
245+
246+
/**
247+
* This is a helper class that wraps the methods in FlumeUtils into more Python-friendly class and
248+
* function so that it can be easily instantiated and called from Python's FlumeUtils.
249+
*/
250+
private class FlumeUtilsPythonHelper {
251+
252+
def createStream(
253+
jssc: JavaStreamingContext,
254+
hostname: String,
255+
port: Int,
256+
storageLevel: StorageLevel,
257+
enableDecompression: Boolean
258+
): JavaPairDStream[Array[Byte], Array[Byte]] = {
259+
val dstream = FlumeUtils.createStream(jssc, hostname, port, storageLevel, enableDecompression)
260+
FlumeUtilsPythonHelper.toDStreamForPython(dstream)
261+
}
262+
263+
def createPollingStream(
264+
jssc: JavaStreamingContext,
265+
hosts: JList[String],
266+
ports: JList[Int],
267+
storageLevel: StorageLevel,
268+
maxBatchSize: Int,
269+
parallelism: Int
270+
): JavaPairDStream[Array[Byte], Array[Byte]] = {
271+
assert(hosts.length == ports.length)
272+
val addresses = hosts.zip(ports).map {
273+
case (host, port) => new InetSocketAddress(host, port)
274+
}
275+
val dstream = FlumeUtils.createPollingStream(
276+
jssc.ssc, addresses, storageLevel, maxBatchSize, parallelism)
277+
FlumeUtilsPythonHelper.toDStreamForPython(dstream)
278+
}
279+
280+
}
281+
282+
private object FlumeUtilsPythonHelper {
283+
284+
private def stringMapToByteArray(map: JMap[CharSequence, CharSequence]): Array[Byte] = {
285+
val byteStream = new ByteArrayOutputStream()
286+
val output = new DataOutputStream(byteStream)
287+
try {
288+
output.writeInt(map.size)
289+
map.foreach { kv =>
290+
PythonRDD.writeUTF(kv._1.toString, output)
291+
PythonRDD.writeUTF(kv._2.toString, output)
292+
}
293+
byteStream.toByteArray
294+
}
295+
finally {
296+
output.close()
297+
}
298+
}
299+
300+
private def toDStreamForPython(dstream: JavaReceiverInputDStream[SparkFlumeEvent]):
301+
JavaPairDStream[Array[Byte], Array[Byte]] = {
302+
dstream.mapToPair(new PairFunction[SparkFlumeEvent, Array[Byte], Array[Byte]] {
303+
override def call(sparkEvent: SparkFlumeEvent): (Array[Byte], Array[Byte]) = {
304+
val event = sparkEvent.event
305+
val byteBuffer = event.getBody
306+
val body = new Array[Byte](byteBuffer.remaining())
307+
byteBuffer.get(body)
308+
(stringMapToByteArray(event.getHeaders), body)
309+
}
310+
})
311+
}
312+
}

pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@
102102
<module>external/twitter</module>
103103
<module>external/flume</module>
104104
<module>external/flume-sink</module>
105+
<module>external/flume-assembly</module>
105106
<module>external/mqtt</module>
106107
<module>external/zeromq</module>
107108
<module>examples</module>

project/SparkBuild.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@ object BuildCommons {
4343
sparkKinesisAsl) = Seq("yarn", "yarn-stable", "java8-tests", "ganglia-lgpl",
4444
"kinesis-asl").map(ProjectRef(buildLocation, _))
4545

46-
val assemblyProjects@Seq(assembly, examples, networkYarn, streamingKafkaAssembly) =
47-
Seq("assembly", "examples", "network-yarn", "streaming-kafka-assembly")
46+
val assemblyProjects@Seq(assembly, examples, networkYarn, streamingFlumeAssembly, streamingKafkaAssembly) =
47+
Seq("assembly", "examples", "network-yarn", "streaming-flume-assembly", "streaming-kafka-assembly")
4848
.map(ProjectRef(buildLocation, _))
4949

5050
val tools = ProjectRef(buildLocation, "tools")
@@ -349,7 +349,7 @@ object Assembly {
349349
.getOrElse(SbtPomKeys.effectivePom.value.getProperties.get("hadoop.version").asInstanceOf[String])
350350
},
351351
jarName in assembly <<= (version, moduleName, hadoopVersion) map { (v, mName, hv) =>
352-
if (mName.contains("streaming-kafka-assembly")) {
352+
if (mName.contains("streaming-flume-assembly") || mName.contains("streaming-kafka-assembly")) {
353353
// This must match the same name used in maven (see external/kafka-assembly/pom.xml)
354354
s"${mName}-${v}.jar"
355355
} else {

0 commit comments

Comments
 (0)