0% found this document useful (0 votes)
25 views2 pages

HBase Examples

The document contains Scala code for integrating Spark with HBase, focusing on inserting employee data into an HBase table. It checks for the existence of the 'employee' table and creates it if necessary, then adds a sample employee record. Additionally, it demonstrates how to write a DataFrame to HBase using Spark's DataFrame API.

Uploaded by

gkillerfish
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as TXT, PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
25 views2 pages

HBase Examples

The document contains Scala code for integrating Spark with HBase, focusing on inserting employee data into an HBase table. It checks for the existence of the 'employee' table and creates it if necessary, then adds a sample employee record. Additionally, it demonstrates how to write a DataFrame to HBase using Spark's DataFrame API.

Uploaded by

gkillerfish
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as TXT, PDF, TXT or read online on Scribd

package com.zaloni.mgohain.sparkHbaseIntegration.

services

import org.apache.hadoop.hbase.client.{HBaseAdmin, HTable, Put}


import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor,
HTableDescriptor}

object Employee {
def main(args: Array[String]) {
if (args.length != 1) {
System.err.println("In correct number of arguments " + args.length)
System.out.println("Please provide correct arguments.")
System.exit(1)
}
val hbaseConf = HBaseConfiguration.create()
val tableName = "employee"
hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
hbaseConf.set("hbase.zookeeper.quorum","quickstart.cloudera")
hbaseConf.set("hbase.zookeeper.property.client.port","2181")
val admin = new HBaseAdmin(hbaseConf)
val cfProfessionalData = Bytes.toBytes("professional_data")
val cfPersonalData = Bytes.toBytes("personal_data")
if (!admin.isTableAvailable(tableName)) {
val tableDesc = new HTableDescriptor(tableName)
tableDesc.addFamily(new HColumnDescriptor(cfProfessionalData))
tableDesc.addFamily(new HColumnDescriptor(cfPersonalData))
}
val hTable = new HTable(hbaseConf,tableName)
//val records = sc.textFile(args(0))
val put = new Put(Bytes.toBytes("e_1"))
val eId = Bytes.toBytes("Emp_id")
val name = Bytes.toBytes("Name")
val dsgtn = Bytes.toBytes("Designation")
val doj = Bytes.toBytes("DOJ")
val addr = Bytes.toBytes("Address")
val phn = Bytes.toBytes("Phone")
val dob = Bytes.toBytes("DOB")
put.add(cfProfessionalData, eId, Bytes.toBytes(1))
put.add(cfProfessionalData, name, Bytes.toBytes("Mridul Gohain"))
put.add(cfProfessionalData, dsgtn, Bytes.toBytes("SE"))
put.add(cfProfessionalData, doj, Bytes.toBytes("15-07-2015"))
put.add(cfPersonalData, addr, Bytes.toBytes("Chabua"))
put.add(cfPersonalData, phn, Bytes.toBytes("9859559606"))
put.add(cfPersonalData, dob, Bytes.toBytes("04-10-1991"))
hTable.put(put)
hTable.close()
}
}

-- Dataframe to HBASE:

yourDataFrame.write.options(
Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable ->
"5"))
.format("org.apache.hadoop.hbase.spark ")
.save()
-- Another:

def main(args: Array[String]): Unit = {

val spark =
SparkSession.builder().appName("sparkToHive").enableHiveSupport().getOrCreate()
import spark.implicits._

val config = HBaseConfiguration.create()


config.set("hbase.zookeeper.quorum", "ip's")
config.set("hbase.zookeeper.property.clientPort","2181")
config.set(TableInputFormat.INPUT_TABLE, "tableName")

val newAPIJobConfiguration1 = Job.getInstance(config)


newAPIJobConfiguration1.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE,
"tableName")

newAPIJobConfiguration1.setOutputFormatClass(classOf[TableOutputFormat[ImmutableByt
esWritable]])

val df: DataFrame = Seq(("foo", "1", "foo1"), ("bar", "2",


"bar1")).toDF("key", "value1", "value2")

val hbasePuts= df.rdd.map((row: Row) => {


val put = new Put(Bytes.toBytes(row.getString(0)))
put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("value1"),
Bytes.toBytes(row.getString(1)))
put.addColumn(Bytes.toBytes("cf2"), Bytes.toBytes("value2"),
Bytes.toBytes(row.getString(2)))
(new ImmutableBytesWritable(), put)
})

hbasePuts.saveAsNewAPIHadoopDataset(newAPIJobConfiguration1.getConfiguration())
}

You might also like