Skip to content

Commit de237dc

Browse files
mgorbovrsotn-mapr
authored andcommitted
Kafka streaming producer added. (apache#66)
Signed-off-by: Rostyslav Sotnychenko <[email protected]>
1 parent adb91d4 commit de237dc

File tree

18 files changed

+1112
-0
lines changed

18 files changed

+1112
-0
lines changed

build/dev-build.sh

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
#!/usr/bin/env bash
2+
3+
PROFILES="-Pyarn -Phadoop-provided -Pscala-2.11 -Phive -Phive-thriftserver -DskipTests"
4+
5+
if [[ -z "$1" ]]; then
6+
./build/mvn ${PROFILES} clean install
7+
else
8+
./build/mvn ./build/mvn ${PROFILES} -pl :$1 clean install
9+
if [ $? -ne 0 ]; then exit 1; fi
10+
./build/mvn ${PROFILES} -pl :spark-assembly_2.10 clean package
11+
fi
12+
13+
if [ $? -ne 0 ]; then exit 1; fi
14+
15+
scp -r assembly/target/scala-2.11/jars mapr@node1:/opt/mapr/spark/spark-2.0.1/jars
16+
if [ $? -ne 0 ]; then exit 1; fi

examples/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,12 @@
121121
<artifactId>parquet-hadoop-bundle</artifactId>
122122
<scope>provided</scope>
123123
</dependency>
124+
<dependency>
125+
<groupId>org.apache.spark</groupId>
126+
<artifactId>spark-streaming-kafka-producer_${scala.binary.version}</artifactId>
127+
<scope>provided</scope>
128+
<version>${project.version}</version>
129+
</dependency>
124130
</dependencies>
125131

126132
<build>
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.examples.streaming
19+
20+
import java.util.{ Map => JMap }
21+
22+
import org.apache.kafka.common.serialization.Serializer
23+
24+
import org.apache.spark.SparkConf
25+
import org.apache.spark.rdd.RDD
26+
import org.apache.spark.streaming.{Seconds, StreamingContext}
27+
import org.apache.spark.streaming.dstream.{ConstantInputDStream, DStream}
28+
29+
class ItemJsonSerializer extends Serializer[Item] {
30+
override def configure(configs: JMap[String, _], isKey: Boolean): Unit = { /* NOP */ }
31+
32+
override def serialize(topic: String, data: Item): Array[Byte] = data.toString.getBytes
33+
34+
override def close(): Unit = { /* NOP */ }
35+
}
36+
37+
case class Item(id: Int, value: Int) {
38+
override def toString: String = s"""{"id":"$id","value":"$value"}"""
39+
}
40+
41+
/**
42+
* Produces messages to Kafka.
43+
* Usage: KafkaProducerExample <kafkaBrokers> <topics> <numMessages>
44+
* <kafkaBrokers> is a list of one or more kafka brokers
45+
* <topics> is a list of one or more kafka topics
46+
* <numMessages> is the number of messages that the kafka producer should send
47+
*
48+
* Example:
49+
* `$ bin/run-example \
50+
* org.apache.spark.examples.streaming.KafkaProducerExample broker1,broker2 \
51+
* topic1,topic2 10`
52+
*/
53+
54+
// scalastyle:off println
55+
object KafkaProducerExample extends App {
56+
import org.apache.spark.streaming.kafka.producer._
57+
58+
if (args.length < 3) {
59+
System.err.println(s"""
60+
|Usage: Usage: KafkaProducerExample <kafkaBrokers> <topics> <numMessages>
61+
| <kafkaBrokers> is a list of one or more kafka brokers
62+
| <topics> is a list of one or more kafka topics
63+
| <numMessages> is the number of messages that the kafka producer
64+
| should send
65+
""".stripMargin)
66+
System.exit(1)
67+
}
68+
69+
val Array(kafkaBrokers, topics, numMessages) = args
70+
71+
val batchTime = Seconds(2)
72+
73+
val sparkConf = new SparkConf()
74+
.set("spark.executor.memory", "1g")
75+
.set("spark.driver.memory", "1g")
76+
.setAppName(getClass.getCanonicalName)
77+
val ssc = new StreamingContext(sparkConf, batchTime)
78+
79+
val producerConf = new ProducerConf(
80+
bootstrapServers = kafkaBrokers.split(",").toList)
81+
82+
val items = (0 until numMessages.toInt).map(i => Item(i, i))
83+
val defaultRDD: RDD[Item] = ssc.sparkContext.parallelize(items)
84+
val dStream: DStream[Item] = new ConstantInputDStream[Item](ssc, defaultRDD)
85+
86+
dStream.sendToKafka[ItemJsonSerializer](topics, producerConf)
87+
dStream.count().print()
88+
89+
ssc.start()
90+
ssc.awaitTermination()
91+
92+
ssc.stop(stopSparkContext = true, stopGracefully = true)
93+
}

external/kafka-producer/pom.xml

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
~ Licensed to the Apache Software Foundation (ASF) under one or more
4+
~ contributor license agreements. See the NOTICE file distributed with
5+
~ this work for additional information regarding copyright ownership.
6+
~ The ASF licenses this file to You under the Apache License, Version 2.0
7+
~ (the "License"); you may not use this file except in compliance with
8+
~ the License. You may obtain a copy of the License at
9+
~
10+
~ http://www.apache.org/licenses/LICENSE-2.0
11+
~
12+
~ Unless required by applicable law or agreed to in writing, software
13+
~ distributed under the License is distributed on an "AS IS" BASIS,
14+
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
~ See the License for the specific language governing permissions and
16+
~ limitations under the License.
17+
-->
18+
19+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
20+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
21+
<modelVersion>4.0.0</modelVersion>
22+
<parent>
23+
<groupId>org.apache.spark</groupId>
24+
<artifactId>spark-parent_2.11</artifactId>
25+
<version>2.1.0-mapr-SNAPSHOT</version>
26+
<relativePath>../../pom.xml</relativePath>
27+
</parent>
28+
29+
<groupId>org.apache.spark</groupId>
30+
<artifactId>spark-streaming-kafka-producer_2.11</artifactId>
31+
<properties>
32+
<sbt.project.name>streaming-kafka-producer</sbt.project.name>
33+
</properties>
34+
<packaging>jar</packaging>
35+
<name>Spark Project External Kafka Producer v09</name>
36+
<url>http://spark.apache.org/</url>
37+
38+
<dependencies>
39+
<dependency>
40+
<groupId>com.101tec</groupId>
41+
<artifactId>zkclient</artifactId>
42+
<version>0.6</version>
43+
</dependency>
44+
<dependency>
45+
<groupId>com.yammer.metrics</groupId>
46+
<artifactId>metrics-core</artifactId>
47+
<version>2.2.0</version>
48+
</dependency>
49+
50+
<dependency>
51+
<groupId>org.eclipse.jetty</groupId>
52+
<artifactId>jetty-util</artifactId>
53+
</dependency>
54+
55+
<dependency>
56+
<groupId>org.apache.spark</groupId>
57+
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
58+
<version>${project.version}</version>
59+
<scope>provided</scope>
60+
</dependency>
61+
<dependency>
62+
<groupId>org.apache.spark</groupId>
63+
<artifactId>spark-core_${scala.binary.version}</artifactId>
64+
<version>${project.version}</version>
65+
<type>test-jar</type>
66+
<scope>test</scope>
67+
</dependency>
68+
<dependency>
69+
<groupId>org.apache.kafka</groupId>
70+
<artifactId>kafka_${scala.binary.version}</artifactId>
71+
<version>0.9.0.0</version>
72+
<exclusions>
73+
<exclusion>
74+
<groupId>com.sun.jmx</groupId>
75+
<artifactId>jmxri</artifactId>
76+
</exclusion>
77+
<exclusion>
78+
<groupId>com.sun.jdmk</groupId>
79+
<artifactId>jmxtools</artifactId>
80+
</exclusion>
81+
<exclusion>
82+
<groupId>net.sf.jopt-simple</groupId>
83+
<artifactId>jopt-simple</artifactId>
84+
</exclusion>
85+
<exclusion>
86+
<groupId>org.slf4j</groupId>
87+
<artifactId>slf4j-simple</artifactId>
88+
</exclusion>
89+
<exclusion>
90+
<groupId>org.apache.zookeeper</groupId>
91+
<artifactId>zookeeper</artifactId>
92+
</exclusion>
93+
</exclusions>
94+
</dependency>
95+
<dependency>
96+
<groupId>org.apache.kafka</groupId>
97+
<artifactId>kafka-clients</artifactId>
98+
<version>0.9.0.0-mapr-1607</version>
99+
<exclusions>
100+
<exclusion>
101+
<groupId>com.sun.jmx</groupId>
102+
<artifactId>jmxri</artifactId>
103+
</exclusion>
104+
<exclusion>
105+
<groupId>com.sun.jdmk</groupId>
106+
<artifactId>jmxtools</artifactId>
107+
</exclusion>
108+
<exclusion>
109+
<groupId>net.sf.jopt-simple</groupId>
110+
<artifactId>jopt-simple</artifactId>
111+
</exclusion>
112+
<exclusion>
113+
<groupId>org.slf4j</groupId>
114+
<artifactId>slf4j-simple</artifactId>
115+
</exclusion>
116+
<exclusion>
117+
<groupId>org.apache.zookeeper</groupId>
118+
<artifactId>zookeeper</artifactId>
119+
</exclusion>
120+
</exclusions>
121+
</dependency>
122+
<dependency>
123+
<groupId>net.sf.jopt-simple</groupId>
124+
<artifactId>jopt-simple</artifactId>
125+
<version>3.2</version>
126+
<scope>test</scope>
127+
</dependency>
128+
<dependency>
129+
<groupId>org.scalacheck</groupId>
130+
<artifactId>scalacheck_${scala.binary.version}</artifactId>
131+
<scope>test</scope>
132+
</dependency>
133+
</dependencies>
134+
135+
<build>
136+
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
137+
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
138+
</build>
139+
</project>
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
log4j.rootCategory=WARN, console
2+
log4j.appender.console=org.apache.log4j.ConsoleAppender
3+
log4j.appender.console.target=System.out
4+
log4j.appender.console.layout=org.apache.log4j.PatternLayout
5+
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.streaming.kafka.producer
19+
20+
import scala.reflect.{classTag, ClassTag}
21+
import scala.util.{Failure, Success, Try}
22+
23+
import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecord, RecordMetadata}
24+
import org.apache.kafka.common.KafkaException
25+
import org.apache.kafka.common.serialization.Serializer
26+
27+
import org.apache.spark.{SparkException, TaskContext}
28+
import org.apache.spark.internal.Logging
29+
30+
class KafkaRDDWriter[
31+
K: ClassTag,
32+
V: ClassTag,
33+
KS <: Serializer[K] : ClassTag,
34+
VS <: Serializer[V] : ClassTag](
35+
val topic: String,
36+
val producerConf: ProducerConf) extends Serializable with Logging {
37+
38+
private var producer: KafkaProducer[K, V] = null
39+
40+
private val callback: KafkaCallback = new KafkaCallback()
41+
42+
private def initializeProducer(producerConf: ProducerConf): KafkaProducer[K, V] = {
43+
val conf = producerConf
44+
.withKeySerializer(classTag[KS].runtimeClass.getName)
45+
.withValueSerializer(classTag[VS].runtimeClass.getName)
46+
Try(new KafkaProducer[K, V](conf.asJMap())) match {
47+
case Success(_producer) => _producer
48+
case Failure(kafkaException) =>
49+
throw new SparkException("Failed to instantiate Kafka producer due to: ", kafkaException)
50+
}
51+
}
52+
53+
def sendV(taskContext: TaskContext, records: Iterator[V]): Unit = {
54+
sendRecords[V](taskContext, records, toValueRecord)
55+
}
56+
57+
def sendKV(taskContext: TaskContext, records: Iterator[(K, V)]): Unit = {
58+
sendRecords[(K, V)](taskContext, records, kv => toKeyValueRecord(kv._1, kv._2))
59+
}
60+
61+
private def sendRecords[R](
62+
taskContext: TaskContext,
63+
records: Iterator[R],
64+
recordMapper: R => ProducerRecord[K, V]): Unit = {
65+
if (records.isEmpty) {
66+
logDebug(s"No data to send: rdd-partition=${taskContext.partitionId()}")
67+
return
68+
}
69+
try {
70+
producer = initializeProducer(producerConf)
71+
72+
taskContext.addTaskCompletionListener((taskContext: TaskContext) => {
73+
log.debug(s"Task completed: topic=$topic, rdd-partition=${taskContext.partitionId()}." +
74+
s" Closing producer")
75+
Option(producer).foreach(_.close())
76+
})
77+
78+
log.debug(s"Sending data: topic=$topic, rdd-partition=${taskContext.partitionId()}")
79+
records.map(recordMapper).foreach(producer.send(_, callback))
80+
producer.flush()
81+
} catch {
82+
case kex: KafkaException => throw new SparkException(kex.getMessage)
83+
} finally {
84+
Option(producer).foreach(_.close())
85+
}
86+
}
87+
88+
private def toValueRecord(value: V): ProducerRecord[K, V] = {
89+
new ProducerRecord[K, V](topic, value)
90+
}
91+
92+
private def toKeyValueRecord(key: K, value: V): ProducerRecord[K, V] = {
93+
new ProducerRecord[K, V](topic, key, value)
94+
}
95+
}
96+
97+
class KafkaCallback extends Callback with Serializable {
98+
override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
99+
if (exception != null) {
100+
throw new SparkException("Failed to send data due to: ", exception)
101+
}
102+
}
103+
}

0 commit comments

Comments
 (0)