Spark On Hbase
一、前言
MapReduce早已经对接了HBase,以HBase作为数据源,完成批量数据的读写。如今继MapReduce之后的Spark在大数据领域有着举足轻重的地位,无论跑批,流处理,甚至图计算等都有它的用武之地。Spark对接HBase成为不少用户的需求。
二、Spark On HBASE
1.可以解决的问题
Spark和HBASE无缝对接意味着我们不再需要关心安全和RDD与HBase交互的细节。更方便应用Spark带来的批处理,流处理等能力。比如以下常见的应用场景:
1. 以HBase作为存储,通过Spark对流式数据处理。
2. 以HBase作为存储,完成大规模的图或者DAG的计算。
3. 通过Spark对HBase做BulkLoad操作
4. 同Spark SQL对HBase数据做交互式分析
2.社区相关的工作
目前已经有多种Spark对接HBase的实现,这里我们选取三个有代表的工作进行分析:
2.1 华为: Spark-SQL-on-HBase
特点
对接Spark SQL,扩展了其SQL的parse功能来对接HBase。通过coprocessor和自定义filter来提升读写性能。
优点
- 扩展了对应的cli功能,支持scala shell和python shell
- 多种性能优化方式,甚至支持sub plan到coprocessor实现partial aggregation.
- 支持java和Python API
- 支持row key组合
- 支持常用DDL和DML(包括bulkload,但不支持update)
缺点
- 不支持支持基于时间戳和版本的查询
- 不支持安全
- Row key支持原始类型或者String,不支持复杂数据类型
使用示例
a. 在HBase中创建表,并写入数据
$HBase_Home/bin/hbase shell
create 'hbase_numbers', 'f'
for i in '1'..'100' do for j in '1'..'2' do put 'hbase_numbers', "row#{i}", "f:c#{j}", "#{i}#{j}" end end
b. 使用sparksql创建表并与HBase表建立映射
$SPARK_HBASE_Home/bin/hbase-sql
CREATE TABLE numbers
(rowkey STRING, a STRING, b STRING, PRIMARY KEY (rowkey))MAPPED BY (hbase_numbers, COLS=[a=f.c1, b=f.c2]);
c. 查询
select a, b from numbers where b > "980"
2.2 Hortonworks: Apache HBase Connector
特点
以简单的方式实现了标准的Spark Datasource API,使用Spark Catalyst引擎做查询优化。同时通过scratch来构建RDD,也实现了许多常见的查询优化。
当Spark和HBase部署在同一个集群时,Spark executor和HBase RegionServer在相同的节点,实现数据的本地化,来加快数据的读写速度。架构图如下:
优点
- native avro支持
- 谓词下推
- 分区裁剪
- 数据本地化
- 支持row key组合
- 支持安全
- 数据本地化
缺点
- SQL语法不够丰富,只支持spark sql原有的语法
- 只支持java原始类型
- 不支持多语言API
使用示例
a. 定义 HBase Catalog
def catalog = s"""{
|"table":{"namespace":"default", "name":"table1"},
|"rowkey":"key",
|"columns":{
|"col0":{"cf":"rowkey", "col":"key", "type":"string"},
|"col1":{"cf":"cf1", "col":"col1", "type":"boolean"},
|"col2":{"cf":"cf2", "col":"col2", "type":"double"},
|"col3":{"cf":"cf3", "col":"col3", "type":"float"},
|"col4":{"cf":"cf4", "col":"col4", "type":"int"},
|"col5":{"cf":"cf5", "col":"col5", "type":"bigint"},
|"col6":{"cf":"cf6", "col":"col6", "type":"smallint"},
|"col7":{"cf":"cf7", "col":"col7", "type":"string"},
|"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"}
|}
|}""".stripMargin
b. 使用SQL查询
// Load the dataframe
val df = withCatalog(catalog)
//SQL example
df.createOrReplaceTempView("table")
sqlContext.sql("select count(col1) from table").show
2.3 Cloudrea: SparkOnHBase
特点
通过简单的接口实现链接Spark与HBASE, 支持常用的bulk读写。架构图如下:
优点
- 支持安全
- 通过get或者scan直接生成rdd, 并可以使用API完成更高级的功能
- 支持组合rowkey
- 支持多种bulk操作
- 为spark和 spark streaming提供相似的API
缺点
- 不支持复杂数据类型
- SQL语法不够丰富,只支持spark sql原有的语法
- 只支持java原始类型
- 不支持多语言API
- 性能优化工作相对较少
使用示例
a. 直接使用scan创建一个RDD
val sc = new SparkContext(sparkConf)
val conf = HBaseConfiguration.create()
conf.addResource(new Path("/etc/hbase/conf/core-site.xml"))
conf.addResource(new Path("/etc/hbase/conf/hbase-site.xml"))
val hbaseContext = new HBaseContext(sc, conf)
var scan = new Scan()
scan.setCaching(100)
var getRdd = hbaseContext.hbaseRDD(tableName, scan)
b. 创建一个RDD并把RDD的内容写入HBase
// Nothing to see here just creating a SparkContext like you normally would
val sparkConf = new SparkConf().setAppName("HBaseBulkPutExample " + tableName + " " + columnFamily)
val sc = new SparkContext(sparkConf)
//This is making a RDD of
//(RowKey, columnFamily, columnQualifier, value)
val rdd = sc.parallelize(Array(
(Bytes.toBytes("1"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("1")))),
(Bytes.toBytes("2"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("2")))),
(Bytes.toBytes("3"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("3")))),
(Bytes.toBytes("4"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("4")))),
(Bytes.toBytes("5"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("5"))))
)
)
//Create the HBase config like you normally would then
//Pass the HBase configs and SparkContext to the HBaseContext
val conf = HBaseConfiguration.create();
conf.addResource(new Path("/etc/hbase/conf/core-site.xml"));
conf.addResource(new Path("/etc/hbase/conf/hbase-site.xml"));
val hbaseContext = new HBaseContext(sc, conf);
//Now give the rdd, table name, and a function that will convert a RDD record to a put, and finally
// A flag if you want the puts to be batched
hbaseContext.bulkPut[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])](rdd,
tableName,
//This function is really important because it allows our source RDD to have data of any type
// Also because puts are not serializable
(putRecord) > {
val put = new Put(putRecord._1)
putRecord._2.foreach((putValue) > put.add(putValue._1, putValue._2, putValue._3))
put
},
true);
3. 最后
社区中有不少Spark on HBase的工作,出发点都是为了提供更易用,更高效的接口。其中Cloudrea的SparkOnHbase更加灵活简单,在2015年8月被提交到HBase的主干(trunk)上,模块名为HBase-Spark Module,目前准备在HBASE 2.0 正式Release, 相信这个feature一定是HBase新版本的一个亮点。
如若文章中有不准确的描述,请多多指正,谢谢!
4. 参考
https://hortonworks.com/blog/spark-hbase-dataframe-based-hbase-connector/
https://blog.cloudera.com/blog/2014/12/new-in-cloudera-labs-sparkonhbase/
https://issues.apache.org/jira/browse/HBASE-13992
https://blog.madhukaraphatak.com/introduction-to-spark-two-part-6/
https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-sql-catalyst.htm
最后更新:2017-09-30 08:04:35