you should read the previous part about HBase dependencies, and spark classpaths first: http://www.abcn.net/2014/07/lighting-spark-with-hbase-full-edition.html
and you'd better read this for some background knowledge about combining HBase and Spark: http://www.vidyasource.com/blog/Programming/Scala/Java/Data/Hadoop/Analytics/2014/01/25/lighting-a-spark-with-hbase
this post aims to provide some additional complicated real world examples of above post.
at first, you can put your hbase-site.xml into spark's conf folder, otherwise you have to specify the full path (absolute path) of hbase-site.xml in your code.
ln -s /etc/hbase/conf/hbase-site.xml $SPARK_HOME/conf/
now, we use a very simple HBase table with string rowkey and string values to warm up.
table contents:
hbase(main):001:0> scan 'tmp' ROW COLUMN+CELL abc column=cf:test, timestamp=1401466636075, value=789 abc column=cf:val, timestamp=1401466435722, value=789 bar column=cf:val, timestamp=1396648974135, value=bb sku_2 column=cf:val, timestamp=1401464467396, value=999 test column=cf:val, timestamp=1396649021478, value=bb tmp column=cf:val, timestamp=1401466616160, value=test
in the post from vidyasource.com we can find how to get values from HBase Result's tuple, but no keys.
following code shows how to create a RDD of key-value pairs RDD[(key, value)] from HBase Results:
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor} import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.spark.rdd.NewHadoopRDD val conf = HBaseConfiguration.create() conf.set(TableInputFormat.INPUT_TABLE, "tmp") var hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) hBaseRDD.map(tuple => tuple._2).map(result => (result.getRow, result.getColumn("cf".getBytes(), "val".getBytes()))).map(row => { ( row._1.map(_.toChar).mkString, row._2.asScala.reduceLeft { (a, b) => if (a.getTimestamp > b.getTimestamp) a else b }.getValue ) }).take(10)you will get
Array[(String, Array[Byte])] = Array((abc,Array(55, 56, 57)), (bar,Array(98, 98)), (sku_2,Array(57, 57, 57)), (test,Array(98, 98)), (tmp,Array(116, 101, 115, 116)))
in scala, we can use map(_.toChar).mkString to convert Array[Byte] to a string (because we said, in this warm up example, the HBase table has only string values)
hBaseRDD.map(tuple => tuple._2).map(result => (result.getRow, result.getColumn("cf".getBytes(), "val".getBytes()))).map(row => { ( row._1.map(_.toChar).mkString, row._2.asScala.reduceLeft { (a, b) => if (a.getTimestamp > b.getTimestamp) a else b }.getValue.map(_.toChar).mkString ) }).take(10)then we get
Array[(String, String)] = Array((abc,789), (bar,bb), (sku_2,999), (test,bb), (tmp,test))=======================================================================
after warm up, let us take a complicated HBase table example:
this table stores the UUID/cookie or whatever of user's different devices, you can image this table is a part of some kind of platform which is used for cross device user tracking and/or analyzing user behavior on different devices.
userid as rowkey, is string (such as some kind of hashed value)
column family is d (device family)
column qualifiers are the name or id of device (such as some internal id of User Agent Strings, in this example we use some simple string like app1, app2 for mobile apps, pc1, ios2 for different browser on different devices)
value of row is an 8 bytes long (a ByteArray with length 8)
it looks like this:
hbase(main):001:0> scan 'test1' ROW COLUMN+CELL user1 column=lf:app1, timestamp=1401645690042, value=\x00\x00\x00\x00\x00\x00\x00\x0F user1 column=lf:app2, timestamp=1401645690093, value=\x00\x00\x00\x00\x00\x00\x00\x10 user2 column=lf:app1, timestamp=1401645690142, value=\x00\x00\x00\x00\x00\x00\x00\x11 user2 column=lf:pc1, timestamp=1401645690170, value=\x00\x00\x00\x00\x00\x00\x00\x12 user3 column=lf:ios2, timestamp=1401645690180, value=\x00\x00\x00\x00\x00\x00\x00\x02
to create such a table, you should put like this in hbase shell
put 'test1', 'user1', 'lf:app1', "\x00\x00\x00\x00\x00\x00\x00\x0F" put 'test1', 'user1', 'lf:app2', "\x00\x00\x00\x00\x00\x00\x00\x10" put 'test1', 'user2', 'lf:app1', "\x00\x00\x00\x00\x00\x00\x00\x11" put 'test1', 'user2', 'lf:pc1', "\x00\x00\x00\x00\x00\x00\x00\x12" put 'test1', 'user3', 'lf:ios2', "\x00\x00\x00\x00\x00\x00\x00\x02"
ok, then, how can we read/scan this table from spark?
let us see this code:
conf.set(TableInputFormat.INPUT_TABLE, "test1") var hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) hBaseRDD.map(tuple => tuple._2).map(result => (result.getRow, result.getColumn("lf".getBytes(), "app1".getBytes()))).map(row => if (row._2.size > 0) { ( row._1.map(_.toChar).mkString, row._2.asScala.reduceLeft { (a, b) => if (a.getTimestamp > b.getTimestamp) a else b }.getValue.map(_.toInt).mkString ) }).take(10)
why this time it is map(._toInt) ? because in this Array[Byte], those Bytes are numbers, not Chars.
but we get
Array((user1,000000015), (user2,000000017), ())what? 000000015 ?... yes, because _.toInt convert each element in this Array[Byte] to Int, to avoid this, we can use java.nio.ByteBuffer
this code should be changed to
import java.nio.ByteBuffer hBaseRDD.map(tuple => tuple._2).map(result => (result.getRow, result.getColumn("lf".getBytes(), "app1".getBytes()))).map(row => if (row._2.size > 0) { ( row._1.map(_.toChar).mkString, ByteBuffer.wrap(row._2.asScala.reduceLeft { (a, b) => if (a.getTimestamp > b.getTimestamp) a else b }.getValue).getLong ) }).take(10)then we get
Array((user1,15), (user2,17), ())finally looked better, but what is the last () ?!...
it is because rowkey user3 has no value with column lf:app1, so, again, we can do it better! in HBaseConfiguration object we can set TableInputFormat.SCAN_COLUMNS to a particular column qualifier, so we change the code to FINAL EDITION...
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor} import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.spark.rdd.NewHadoopRDD val conf = HBaseConfiguration.create() conf.set(TableInputFormat.INPUT_TABLE, "test1") conf.set(TableInputFormat.SCAN_COLUMNS, "lf:app1") var hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) import java.nio.ByteBuffer hBaseRDD.map(tuple => tuple._2).map(result => { ( result.getRow.map(_.toChar).mkString, ByteBuffer.wrap(result.value).getLong ) }).take(10)
and now, finally we get:
Array[(String, Long)] = Array((user1,15), (user2,17))
=======================================================================
FINAL FULL EDITION
now, if you want to get all of key-value pairs of a HBase table (all versions of values from all of column qualifiers)
you can try this code (for string values table "tmp"):
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor} import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.spark.rdd.NewHadoopRDD import java.nio.ByteBuffer type HBaseRow = java.util.NavigableMap[Array[Byte], java.util.NavigableMap[Array[Byte], java.util.NavigableMap[java.lang.Long, Array[Byte]]]] type CFTimeseriesRow = Map[Array[Byte], Map[Array[Byte], Map[Long, Array[Byte]]]] def navMapToMap(navMap: HBaseRow): CFTimeseriesRow = navMap.asScala.toMap.map(cf => (cf._1, cf._2.asScala.toMap.map(col => (col._1, col._2.asScala.toMap.map(elem => (elem._1.toLong, elem._2)))))) type CFTimeseriesRowStr = Map[String, Map[String, Map[Long, String]]] def rowToStrMap(navMap: CFTimeseriesRow): CFTimeseriesRowStr = navMap.map(cf => (cf._1.map(_.toChar).mkString, cf._2.map(col => (col._1.map(_.toChar).mkString, col._2.map(elem => (elem._1, elem._2.map(_.toChar).mkString)))))) val conf = HBaseConfiguration.create() conf.set(TableInputFormat.INPUT_TABLE, "tmp") val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) hBaseRDD.map(kv => (kv._1.get(), navMapToMap(kv._2.getMap))).map(kv => (kv._1.map(_.toChar).mkString, rowToStrMap(kv._2))).take(10)
for long values column family "lf" in table "test1", you can try to define CFTimeseriesRowStr and rowToStrMap as follows:
type CFTimeseriesRowStr = Map[String, Map[String, Map[Long, Long]]] def rowToStrMap(navMap: CFTimeseriesRow): CFTimeseriesRowStr = navMap.map(cf => (cf._1.map(_.toChar).mkString, cf._2.map(col => (col._1.map(_.toChar).mkString, col._2.map(elem => (elem._1, ByteBuffer.wrap(elem._2).getLong))))))
=======================================================================
beyond all of these code, there are more particulars you should think about when you querying HBase table, such as scan cache, enable block cache or not, whether or not to use bloom filters
and most important is, spark is still using org.apache.hadoop.hbase.mapreduce.TableInputFormat to read from HBase, it is the same as MapReduce program or hive hbase table mapping, so there is a big problem, your job will fail when one of HBase Region for target HBase table is splitting ! because the original region will be offline by splitting.
so if your HBase regions must be splittable, you should be careful to use spark or hive to read from HBase table. maybe you should write coprocessor instead of using hbase.mapreduce API.
if not, you should disable auto region split. following slide summarized all of HBase config properties related to control HBase region split.