Spark與HBase的整合
前言之前因為僅僅是把HBase當成一個可橫向擴展並且具有持久化能力的KV數據庫,所以隻用在了指標存儲上,參看很早之前的一篇文章基於HBase做Storm 實時計算指標存儲。這次將HBase用在了用戶行為存儲上,因為Rowkey的過濾功能也很不錯,可以很方便的把按人或者內容的維度過濾出所有的行為。從某種意義上,HBase的是一個有且僅有一個多字段複合索引的存儲引擎。
雖然我比較推崇實時計算,不過補數據或者計算曆史數據啥的,批處理還是少不了的。對於曆史數據的計算,其實我是有兩個選擇的,一個是基於HBase的已經存儲好的行為數據進行計算,或者基於Hive的原始數據進行計算,最終選擇了前者,這就涉及到Spark(StreamingPro) 對HBase的批處理操作了。
整合過程
和Spark 整合,意味著最好能有Schema(Mapping),因為Dataframe 以及SQL API 都要求你有Schema。 遺憾的是HBase 有沒有Schema取決於使用者和場景。通常SparkOnHBase的庫都要求你定義一個Mapping(Schema),比如hortonworks的 SHC(https://github.com/hortonworks-spark/shc) 就要求你定義一個如下的配置:
{
"rowkey":"key",
"table":{"namespace":"default", "name":"pi_user_log", "tableCoder":"PrimitiveType"},
"columns":{"col0":{"cf":"rowkey", "col":"key", "type":"string"},
"col1":{"cf":"f","col":"col1", "type":"string"}
}
}
看上麵的定義已經還是很容易看出來的。對HBase的一個列族和列取一個名字,這樣就可以在Spark的DataSource API使用了,關於如何開發Spark DataSource API可以參考我的這篇文章利用 Spark DataSource API 實現Rest數據源中使用,SHC大體實現的就是這個API。現在你可以這麼用了:
val cat = "{\n\"rowkey\":\"key\",\"table\":{\"namespace\":\"default\", \"name\":\"pi_user_log\", \"tableCoder\":\"PrimitiveType\"},\n\"columns\":{\"col0\":{\"cf\":\"rowkey\", \"col\":\"key\", \"type\":\"string\"},\n\"28360592\":{\"cf\":\"f\",\"col\":\"28360592\", \"type\":\"string\"}\n}\n}"
val cc = sqlContext
.read
.options(Map(HBaseTableCatalog.tableCatalog -> cat))
.format("org.apache.spark.sql.execution.datasources.hbase")
.load()
不過當你有成千上萬個列,那麼這個就無解了,你不大可能一一定義,而且很多時候使用者也不知道會有哪些列,列名甚至可能是一個時間戳。我們現在好幾種情況都遇到了,所以都需要解決:- 自動獲取HBase裏所有的列形成Schema,這樣就不需要用戶配置了。
- 規定HBase隻有兩個列,一個rowkey,一個 content,content 是一個map,包含所有以列族+列名為key,對應內容為value。
先說說第二種方案(因為其實第一種方案也要依賴於第二種方案):
{
"name": "batch.sources",
"params": [
{
"inputTableName": "log1",
"format": "org.apache.spark.sql.execution.datasources.hbase.raw",
"path": "-",
"outputTable": "log1"
}
]
},
{
"name": "batch.sql",
"params": [
{
"sql": "select rowkey,json_value_collect(content) as actionList from log1",
"outputTableName":"finalTable"
}
]
},
首先我們配置了一個HBase的表,叫log1,當然,這裏是因為程序通過hbase-site.xml獲得HBase的鏈接,所以配置上你看不到HBase相關的信息。接著呢,在SQL 裏你就可以對content 做處理了。我這裏是把content 轉化成了JSON格式字符串。再之後你就可以自己寫一個UDF函數之類的做處理了,從而實現你複雜的業務邏輯。我們其實每個字段裏存儲的都是JSON,所以我其實不關心列名,隻要讓我拿到所有的列就好。而上麵的例子正好能夠滿足我這個需求了。而且實現這個HBase DataSource 也很簡單,核心邏輯大體如下:
case class HBaseRelation(
parameters: Map[String, String],
userSpecifiedschema: Option[StructType]
)(@transient val sqlContext: SQLContext)
extends BaseRelation with TableScan with Logging {
val hbaseConf = HBaseConfiguration.create()
def buildScan(): RDD[Row] = {
hbaseConf.set(TableInputFormat.INPUT_TABLE, parameters("inputTableName"))
val hBaseRDD = sqlContext.sparkContext.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
.map { line =>
val rowKey = Bytes.toString(line._2.getRow)
import net.liftweb.{json => SJSon}
implicit val formats = SJSon.Serialization.formats(SJSon.NoTypeHints)
val content = line._2.getMap.navigableKeySet().flatMap { f =>
line._2.getFamilyMap(f).map { c =>
(Bytes.toString(f) + ":" + Bytes.toString(c._1), Bytes.toString(c._2))
}
}.toMap
val contentStr = SJSon.Serialization.write(content)
Row.fromSeq(Seq(UTF8String.fromString(rowKey), UTF8String.fromString(contentStr)))
}
hBaseRDD
}
}
那麼我們回過頭來,如何讓Spark自動發現Schema呢?大體你還是需要過濾所有數據得到列的合集,然後形成Schema的,成本開銷很大。我們也可以先將我們的數據轉化為JSON格式,然後就可以利用Spark已經支持的JSON格式來自動推倒Schema的能力了。總體而言,其實並不太鼓勵大家使用Spark 對HBase進行批處理,因為這很容易讓HBase過載,比如內存溢出導致RegionServer 掛掉,最遺憾的地方是一旦RegionServer 掛掉了,會有一段時間讀寫不可用,而HBase 又很容易作為實時在線程序的存儲,所以影響很大。
最後更新:2017-04-01 17:13:52