閱讀24 返回首頁    go 阿裏雲 go 技術社區[雲棲]


Spark On Hbase

一、前言

MapReduce早已經對接了HBase,以HBase作為數據源,完成批量數據的讀寫。如今繼MapReduce之後的Spark在大數據領域有著舉足輕重的地位,無論跑批,流處理,甚至圖計算等都有它的用武之地。Spark對接HBase成為不少用戶的需求。

二、Spark On HBASE

1.可以解決的問題

Spark和HBASE無縫對接意味著我們不再需要關心安全和RDD與HBase交互的細節。更方便應用Spark帶來的批處理,流處理等能力。比如以下常見的應用場景:
1. 以HBase作為存儲,通過Spark對流式數據處理。
2. 以HBase作為存儲,完成大規模的圖或者DAG的計算。
3. 通過Spark對HBase做BulkLoad操作
4. 同Spark SQL對HBase數據做交互式分析

2.社區相關的工作

目前已經有多種Spark對接HBase的實現,這裏我們選取三個有代表的工作進行分析:

2.1 華為: Spark-SQL-on-HBase

特點
對接Spark SQL,擴展了其SQL的parse功能來對接HBase。通過coprocessor和自定義filter來提升讀寫性能。

優點

  • 擴展了對應的cli功能,支持scala shell和python shell
  • 多種性能優化方式,甚至支持sub plan到coprocessor實現partial aggregation.
  • 支持java和Python API
  • 支持row key組合
  • 支持常用DDL和DML(包括bulkload,但不支持update)

缺點

  • 不支持支持基於時間戳和版本的查詢
  • 不支持安全
  • Row key支持原始類型或者String,不支持複雜數據類型

使用示例
a. 在HBase中創建表,並寫入數據

$HBase_Home/bin/hbase shell
create 'hbase_numbers', 'f'
for i in '1'..'100' do for j in '1'..'2' do put 'hbase_numbers', "row#{i}", "f:c#{j}", "#{i}#{j}" end end

b. 使用sparksql創建表並與HBase表建立映射

$SPARK_HBASE_Home/bin/hbase-sql
CREATE TABLE numbers
(rowkey STRING, a STRING, b STRING, PRIMARY KEY (rowkey))MAPPED BY (hbase_numbers, COLS=[a=f.c1, b=f.c2]);

c. 查詢

select a, b from numbers where b > "980"

2.2 Hortonworks: Apache HBase Connector

特點
以簡單的方式實現了標準的Spark Datasource API,使用Spark Catalyst引擎做查詢優化。同時通過scratch來構建RDD,也實現了許多常見的查詢優化。

當Spark和HBase部署在同一個集群時,Spark executor和HBase RegionServer在相同的節點,實現數據的本地化,來加快數據的讀寫速度。架構圖如下:
age

優點

  • native avro支持
  • 謂詞下推
  • 分區裁剪
  • 數據本地化
  • 支持row key組合
  • 支持安全
  • 數據本地化

缺點

  • SQL語法不夠豐富,隻支持spark sql原有的語法
  • 隻支持java原始類型
  • 不支持多語言API

使用示例
a. 定義 HBase Catalog

def catalog = s"""{
        |"table":{"namespace":"default", "name":"table1"},
        |"rowkey":"key",
        |"columns":{
          |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
          |"col1":{"cf":"cf1", "col":"col1", "type":"boolean"},
          |"col2":{"cf":"cf2", "col":"col2", "type":"double"},
          |"col3":{"cf":"cf3", "col":"col3", "type":"float"},
          |"col4":{"cf":"cf4", "col":"col4", "type":"int"},
          |"col5":{"cf":"cf5", "col":"col5", "type":"bigint"},
          |"col6":{"cf":"cf6", "col":"col6", "type":"smallint"},
          |"col7":{"cf":"cf7", "col":"col7", "type":"string"},
          |"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"}
        |}
      |}""".stripMargin

b. 使用SQL查詢

// Load the dataframe
val df = withCatalog(catalog)
//SQL example
df.createOrReplaceTempView("table")
sqlContext.sql("select count(col1) from table").show

2.3 Cloudrea: SparkOnHBase

特點
通過簡單的接口實現鏈接Spark與HBASE, 支持常用的bulk讀寫。架構圖如下:

Cloudera_Spark_2015_01

優點

  • 支持安全
  • 通過get或者scan直接生成rdd, 並可以使用API完成更高級的功能
  • 支持組合rowkey
  • 支持多種bulk操作
  • 為spark和 spark streaming提供相似的API

缺點

  • 不支持複雜數據類型
  • SQL語法不夠豐富,隻支持spark sql原有的語法
  • 隻支持java原始類型
  • 不支持多語言API
  • 性能優化工作相對較少

使用示例
a. 直接使用scan創建一個RDD

val sc = new SparkContext(sparkConf)

val conf = HBaseConfiguration.create()
conf.addResource(new Path("/etc/hbase/conf/core-site.xml"))
conf.addResource(new Path("/etc/hbase/conf/hbase-site.xml"))

val hbaseContext = new HBaseContext(sc, conf)

var scan = new Scan()
scan.setCaching(100)

var getRdd = hbaseContext.hbaseRDD(tableName, scan)

b. 創建一個RDD並把RDD的內容寫入HBase

// Nothing to see here just creating a SparkContext like you normally would
val sparkConf = new SparkConf().setAppName("HBaseBulkPutExample " + tableName + " " + columnFamily)
val sc = new SparkContext(sparkConf)

//This is making a RDD of
//(RowKey, columnFamily, columnQualifier, value)
val rdd = sc.parallelize(Array(
      (Bytes.toBytes("1"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("1")))),
      (Bytes.toBytes("2"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("2")))),
      (Bytes.toBytes("3"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("3")))),
      (Bytes.toBytes("4"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("4")))),
      (Bytes.toBytes("5"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("5"))))
     )
    )

//Create the HBase config like you normally would  then
//Pass the HBase configs and SparkContext to the HBaseContext
val conf = HBaseConfiguration.create();
    conf.addResource(new Path("/etc/hbase/conf/core-site.xml"));
    conf.addResource(new Path("/etc/hbase/conf/hbase-site.xml"));
val hbaseContext = new HBaseContext(sc, conf);

//Now give the rdd, table name, and a function that will convert a RDD record to a put, and finally
// A flag if you want the puts to be batched
hbaseContext.bulkPut[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])](rdd,
    tableName,
    //This function is really important because it allows our source RDD to have data of any type
    // Also because puts are not serializable
    (putRecord) > {
      val put = new Put(putRecord._1)
      putRecord._2.foreach((putValue) > put.add(putValue._1, putValue._2, putValue._3))
       put
    },
    true);

3. 最後

社區中有不少Spark on HBase的工作,出發點都是為了提供更易用,更高效的接口。其中Cloudrea的SparkOnHbase更加靈活簡單,在2015年8月被提交到HBase的主幹(trunk)上,模塊名為HBase-Spark Module,目前準備在HBASE 2.0 正式Release, 相信這個feature一定是HBase新版本的一個亮點。

如若文章中有不準確的描述,請多多指正,謝謝!

4. 參考

https://hortonworks.com/blog/spark-hbase-dataframe-based-hbase-connector/
https://blog.cloudera.com/blog/2014/12/new-in-cloudera-labs-sparkonhbase/
https://issues.apache.org/jira/browse/HBASE-13992
https://blog.madhukaraphatak.com/introduction-to-spark-two-part-6/
https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-sql-catalyst.htm

最後更新:2017-09-30 08:04:35

  上一篇:go  阿裏雲服務器價格表-阿裏雲服務器收費標準
  下一篇:go  阿裏雲ECS全球啟用秒級計費