I will introduce 2 ways, one is normal load using Put , and another way is to use Bulk Load API.
1. Normal Load using org.apache.hadoop.hbase.client.Put(For Hbase and MapRDB)
This way is to use Put object to load data one by one. It is not so efficient as bulk loading.import org.apache.spark._ import org.apache.spark.rdd.NewHadoopRDD import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor} import org.apache.hadoop.hbase.client.HBaseAdmin import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HColumnDescriptor import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.HTable; val conf = HBaseConfiguration.create() val tableName = "/t1" conf.set(TableInputFormat.INPUT_TABLE, tableName) val myTable = new HTable(conf, tableName); var p = new Put(); p = new Put(new String("row999").getBytes()); p.add("cf".getBytes(), "column_name".getBytes(), new String("value999").getBytes()); myTable.put(p); myTable.flushCommits();
2. Bulk Load using Hfiles(For Hbase only).
This way has 2 steps, 1st step is to generate Hfiles and then use org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles to load the Hfiles in Hbase.This only works for Hbase tables, not for MapRDB tables because is does not support bulk loading using Hfiles.
import org.apache.spark._ import org.apache.spark.rdd.NewHadoopRDD import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor} import org.apache.hadoop.hbase.client.HBaseAdmin import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HColumnDescriptor import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop.hbase.KeyValue import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles val conf = HBaseConfiguration.create() val tableName = "hao" val table = new HTable(conf, tableName) conf.set(TableOutputFormat.OUTPUT_TABLE, tableName) val job = Job.getInstance(conf) job.setMapOutputKeyClass (classOf[ImmutableBytesWritable]) job.setMapOutputValueClass (classOf[KeyValue]) HFileOutputFormat.configureIncrementalLoad (job, table) // Generate 10 sample data: val num = sc.parallelize(1 to 10) val rdd = num.map(x=>{ val kv: KeyValue = new KeyValue(Bytes.toBytes(x), "cf".getBytes(), "c1".getBytes(), "value_xxx".getBytes() ) (new ImmutableBytesWritable(Bytes.toBytes(x)), kv) }) // Save Hfiles on HDFS rdd.saveAsNewAPIHadoopFile("/tmp/xxxx19", classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat], conf) //Bulk load Hfiles to Hbase val bulkLoader = new LoadIncrementalHFiles(conf) bulkLoader.doBulkLoad(new Path("/tmp/xxxx19"), table)After that, 10 rows are inserted.
hbase(main):020:0> scan 'hao' ROW COLUMN+CELL \x00\x00\x00\x01 column=cf:c1, timestamp=1425128075586, value=value_xxx \x00\x00\x00\x02 column=cf:c1, timestamp=1425128075586, value=value_xxx \x00\x00\x00\x03 column=cf:c1, timestamp=1425128075586, value=value_xxx \x00\x00\x00\x04 column=cf:c1, timestamp=1425128075586, value=value_xxx \x00\x00\x00\x05 column=cf:c1, timestamp=1425128075586, value=value_xxx \x00\x00\x00\x06 column=cf:c1, timestamp=1425128075675, value=value_xxx \x00\x00\x00\x07 column=cf:c1, timestamp=1425128075675, value=value_xxx \x00\x00\x00\x08 column=cf:c1, timestamp=1425128075675, value=value_xxx \x00\x00\x00\x09 column=cf:c1, timestamp=1425128075675, value=value_xxx \x00\x00\x00\x0A column=cf:c1, timestamp=1425128075675, value=value_xxx
3. Direct Bulk Load without Hfiles(For Hbase and MapRDB).
This way does not need to create Hfiles on HDFS and it will save to Hbase tables directly.There is only a minor difference comparing to above examples:
Changes from :
rdd.saveAsNewAPIHadoopFile("/tmp/xxxx19", classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat], conf)
To:rdd.saveAsNewAPIHadoopFile("/tmp/xxxx19", classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat], job.getConfiguration())
Here is a complete example:import org.apache.spark._ import org.apache.spark.rdd.NewHadoopRDD import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor} import org.apache.hadoop.hbase.client.HBaseAdmin import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HColumnDescriptor import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop.hbase.KeyValue import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles val conf = HBaseConfiguration.create() val tableName = "hao" val table = new HTable(conf, tableName) conf.set(TableOutputFormat.OUTPUT_TABLE, tableName) val job = Job.getInstance(conf) job.setMapOutputKeyClass (classOf[ImmutableBytesWritable]) job.setMapOutputValueClass (classOf[KeyValue]) HFileOutputFormat.configureIncrementalLoad (job, table) // Generate 10 sample data: val num = sc.parallelize(1 to 10) val rdd = num.map(x=>{ val kv: KeyValue = new KeyValue(Bytes.toBytes(x), "cf".getBytes(), "c1".getBytes(), "value_xxx".getBytes() ) (new ImmutableBytesWritable(Bytes.toBytes(x)), kv) }) // Directly bulk load to Hbase/MapRDB tables. rdd.saveAsNewAPIHadoopFile("/tmp/xxxx19", classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat], job.getConfiguration())
Note: In above example, I am using "saveAsNewAPIHadoopFile" to save Hfiles on HDFS.
You can also use "saveAsNewAPIHadoopDataset" to achieve the same goal.
For example, just change below code from :
rdd.saveAsNewAPIHadoopFile("/tmp/xxxx19", classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat], job.getConfiguration())
To:job.getConfiguration.set("mapred.output.dir", "/tmp/xxxx19")
rdd.saveAsNewAPIHadoopDataset(job.getConfiguration)
Hi , i have multiple column families and multiple qualifiers for each column families, can you please guide how to represent those in key value pairs. you have mentioned like below
ReplyDeleteval rdd = num.map(x=>{
val kv: KeyValue = new KeyValue(Bytes.toBytes(x), "cf".getBytes(), "c1".getBytes(), "value_xxx".getBytes() )
(new ImmutableBytesWritable(Bytes.toBytes(x)), kv)
})
but i am unable to change the above line to accumulate multiple column families and qualifiers please help me on this.
You can create another rdd2 for example for column family 2.
Deleteand then use "union"
val totalrdd = rdd union rdd2
See http://www.openkb.info/2015/01/scala-on-spark-cheatsheet.html for more details on each transformation and action.
Would you please suggest that how to construct KeyValue map for multiple column qualifiers?
Deletee.g. I want to insert to hbase like
rowkey, column_family, column_qualifier, value
John, cf, age, 29
John cf, sex, male
Hi,
ReplyDeleteI've been trying to do the same thing with rows that have multiple columns. I transform my RDD so that it is of type (ImmutableBytesWritable, KeyValue); however due to the fact that rows have multiple columns, some of the pairs in the RDD have the same row key, e.g.
(rowKey1, [rowKey1, cf, col1, val1])
(rowKey1, [rowKey1, cf, col2, val2])
(rowKey1, [rowKey1, cf, col3, val3])
When I run this code though, I get an exception that looks like this:
Added a key not lexically larger than previous. Current cell = 155964:1/variant:155967/1461259975844/Put/vlen=577/seqid=0, lastCell = 155964:1/variant:coords/1461259975824/Put/vlen=29/seqid=0
Is there any way around this?
I saw the same error when using multiple columns. I tried .sortByKey(true), but that did not help and I am still seeing not lexically larger than previous.
DeleteCQs need to be ordered
ReplyDeleteNOT working. Even if entire RDD is Ordered including rowkeyy, cf, cq. ANd it should not because HFileOutputFormat clearly says every cells needs to be in order. How that will work with distributed RDD. just not possible. Unless someone claims its working for a truly distributed rdd I think this post is misleading.
ReplyDeleteHi All, I have a task like I want to read xml data from hdfs and stored xml data into HBase suing spark and scala, please help me in this.
ReplyDeleteHi All, I have a task like I want to read xml data from hdfs and stored xml data into HBase suing spark and scala, please help me in this.
ReplyDeleteSpark Scala code to load data into hbase
ReplyDelete//HIVE Connectivity
import java.lang.String
import org.apache.spark.sql.functions._
import org.apache.spark._
import org.apache.spark.SparkContext._
//HBASE Connectivity
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{HBaseAdmin,HTable,Put,Get}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HColumnDescriptor
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.HTable;
//Set Hive Context
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
hiveContext.setConf("hive.exec.dynamic.partition", "true")
hiveContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
//Load Data from Hive Table to DataFrame
val df = hiveContext.sql("select t.* from eagle_abhijeet.abhi t order by t.id")
//GroupBy Logic for Generic Columns
val gencol_df = df.select("id", "sal")
val agg_df = gencol_df.groupBy("id").agg(sum("sal"))
//Complete GroupBy for Distribution
val dist_df = df.select("id", "at", "dt")
val agg_dist_df = dist_df.groupBy("id", "at").agg(sum("dt"))
//Connect to HBASE
val config = HBaseConfiguration.create()
config.clear();
config.set("hbase.zookeeper.quorum", "10.10.127.7");
config.set("hbase.zookeeper.property.clientPort","2181");
//Connect to Table
val tableName = "aggTable"
config.set(TableInputFormat.INPUT_TABLE, tableName)
val myTable = new HTable(config, tableName);
//Store values in HBASE
//Iterate through the Main DF and then append Dist DF
agg_df.collect().foreach(elem => {
var rowKey = elem.getString(0);
var sal = elem.getDouble(1).toString;
var p = new Put(rowKey.getBytes()); //Store RowKey
p.add("CF".getBytes(), "sal".getBytes(), sal.getBytes()); //Store Sal
//Iterate through Distribution
agg_dist_df.filter($"id" === rowKey).collect().foreach(innerElem => {
var acctTypCD = innerElem.get(1).toString;
var acctTypDistribution = innerElem.getDouble(2).toString;
p.add("CF".getBytes(), "DistByAcctType:".concat(acctTypCD).getBytes(), acctTypDistribution.getBytes())
})
//Commit to DB
myTable.put(p);
myTable.flushCommits();
})
//Exit the Code
exit
can you share the POM.xml for your application?
DeleteThere is no pom.xml. Above code was executed in spark shell. But since it is pretty old version, some of the API may change. So feel free to correct any code in latest release of Spark.
DeleteDoes this work with multiple columns in a column family ?
DeleteCan you share an example for the same.
Smm panel
ReplyDeleteSmm panel
iş ilanları
instagram takipçi satın al
Hırdavatçı
beyazesyateknikservisi.com.tr
servis
tiktok jeton hilesi
uc satın al
ReplyDeleteözel ambulans
en son çıkan perde modelleri
nft nasıl alınır
lisans satın al
minecraft premium
yurtdışı kargo
en son çıkan perde modelleri
Good text Write good content success. Thank you
ReplyDeletekralbet
slot siteleri
tipobet
mobil ödeme bahis
betmatik
betpark
kibris bahis siteleri
poker siteleri
görükle
ReplyDeletesinop
bodrum +
van
sultanbeyli
CGRY
güngören
ReplyDeleteeskişehir
esenyurt
trabzon
kadıköy
02F6
yalova
ReplyDeleteartvin
balıkesir
tuzla
kayseri
Z3QCHP
kıbrıs
ReplyDeleteedirne
muş
trabzon
balıkesir
8FED0G
salt likit
ReplyDeletesalt likit
EBH
Very unique and reliable blog. Thanks for sharing your valuable thoughts with us.
ReplyDeleteestate jewelry appraisals