Skip to content

Commit 19480b7

Browse files
committed
Example for cassandra CQL read/write from spark
1 parent cda381f commit 19480b7

File tree

1 file changed

+124
-0
lines changed

1 file changed

+124
-0
lines changed
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.examples
19+
20+
import org.apache.spark.SparkContext
21+
import org.apache.spark.SparkContext._
22+
import org.apache.hadoop.mapreduce.Job
23+
import org.apache.cassandra.hadoop.ConfigHelper
24+
import org.apache.cassandra.utils.ByteBufferUtil
25+
import org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat
26+
import org.apache.cassandra.db.IColumn
27+
import org.apache.cassandra.hadoop.cql3.CqlConfigHelper
28+
import org.apache.cassandra.hadoop.cql3.CqlOutputFormat
29+
import scala.collection.JavaConversions._
30+
import java.nio.ByteBuffer
31+
import scala.collection.mutable.ListBuffer
32+
import scala.collection.immutable.Map
33+
34+
/*
35+
Need to create following keyspace and column family in cassandra before running this example
36+
Start CQL shell using ./bin/cqlsh and execute following commands
37+
CREATE KEYSPACE retail WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
38+
use retail;
39+
40+
CREATE TABLE salecount (product_id text,sale_count int, PRIMARY KEY (product_id));
41+
CREATE TABLE ordercf (user_id text,
42+
time timestamp,
43+
product_id text,
44+
quantity int,
45+
PRIMARY KEY (user_id, time));
46+
INSERT INTO ordercf (user_id, time, product_id, quantity) VALUES ('bob', 1385983646000, 'iphone', 1);
47+
INSERT INTO ordercf (user_id, time, product_id, quantity) VALUES ('tom', 1385983647000, 'samsung', 4);
48+
INSERT INTO ordercf (user_id, time, product_id, quantity) VALUES ('dora', 1385983648000, 'nokia', 2);
49+
INSERT INTO ordercf (user_id, time, product_id, quantity) VALUES ('charlie', 1385983649000, 'iphone', 2);
50+
*/
51+
52+
/*
53+
* This example demonstrates how to read and write to cassandra column family created using CQL3
54+
* using Spark.
55+
* Parameters : <spark_master> <cassandra_node> <cassandra_port>
56+
* Usage: ./bin/run-example org.apache.spark.examples.CassandraCQLTest local[2] localhost 9160
57+
*
58+
*/
59+
object CassandraCQLTest {
60+
61+
def main(args: Array[String]) {
62+
val sc = new SparkContext(args(0), "CQLTestApp", System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
63+
val cHost: String = args(1)
64+
val cPort: String = args(2)
65+
val KeySpace = "retail"
66+
val InputColumnFamily = "ordercf"
67+
val OutputColumnFamily = "salecount"
68+
69+
val job = new Job()
70+
job.setInputFormatClass(classOf[CqlPagingInputFormat])
71+
ConfigHelper.setInputInitialAddress(job.getConfiguration(), cHost)
72+
ConfigHelper.setInputRpcPort(job.getConfiguration(), cPort)
73+
ConfigHelper.setInputColumnFamily(job.getConfiguration(), KeySpace, InputColumnFamily)
74+
ConfigHelper.setInputPartitioner(job.getConfiguration(), "Murmur3Partitioner")
75+
CqlConfigHelper.setInputCQLPageRowSize(job.getConfiguration(), "3");
76+
77+
// CqlConfigHelper.setInputWhereClauses(job.getConfiguration(), "user_id='bob'");
78+
79+
// An UPDATE writes one or more columns to a record in a Cassandra column family.
80+
val query:String = "UPDATE " + KeySpace + "." + OutputColumnFamily + " SET sale_count = ? "
81+
CqlConfigHelper.setOutputCql(job.getConfiguration(), query);
82+
83+
job.setOutputFormatClass(classOf[CqlOutputFormat]);
84+
ConfigHelper.setOutputColumnFamily(job.getConfiguration(), KeySpace, OutputColumnFamily);
85+
ConfigHelper.setOutputInitialAddress(job.getConfiguration(), cHost);
86+
ConfigHelper.setOutputRpcPort(job.getConfiguration(), cPort)
87+
ConfigHelper.setOutputPartitioner(job.getConfiguration(), "Murmur3Partitioner");
88+
89+
val casRdd = sc.newAPIHadoopRDD(job.getConfiguration(),
90+
classOf[CqlPagingInputFormat],
91+
classOf[java.util.Map[String,ByteBuffer]],
92+
classOf[java.util.Map[String,ByteBuffer]])
93+
94+
println("Count: " + casRdd.count);
95+
val productSaleRDD = casRdd.map {
96+
case (key, value) => {
97+
(ByteBufferUtil.string(value.get("product_id")),ByteBufferUtil.toInt(value.get("quantity")))
98+
}
99+
}
100+
val aggregatedRDD = productSaleRDD.reduceByKey(_+_)
101+
aggregatedRDD.collect().foreach {
102+
case (productId, saleCount) => println(productId + ":" + saleCount)
103+
}
104+
105+
val casoutputCF = aggregatedRDD.map {
106+
case (productId, saleCount) => {
107+
val outColFamKey = Map("product_id" ->ByteBufferUtil.bytes(productId))
108+
val outKey : java.util.Map[String, ByteBuffer] = outColFamKey
109+
var outColFamVal = new ListBuffer[ByteBuffer]
110+
outColFamVal += ByteBufferUtil.bytes(saleCount)
111+
val outVal : java.util.List[ByteBuffer] = outColFamVal
112+
(outKey,outVal)
113+
}
114+
}
115+
116+
casoutputCF.saveAsNewAPIHadoopFile(
117+
KeySpace,
118+
classOf[java.util.Map[String, ByteBuffer]],
119+
classOf[java.util.List[ByteBuffer]],
120+
classOf[CqlOutputFormat],
121+
job.getConfiguration()
122+
)
123+
}
124+
}

0 commit comments

Comments
 (0)