閱讀944 返回首頁    go 阿裏雲


Spark + HBase__Spark_開發人員指南_E-MapReduce-阿裏雲

Spark + Hbase

Spark 接入 Hbase

下麵這個例子演示了 Spark 如何向 Hbase 寫數據。需要指出的是,計算集群需要和 Hbase 集群處於一個安全組內,否則網絡無法打通。在 E-Mapreduce 創建集群時,請注意選擇 Hbase 集群所處的安全組。

  1. object ConnectionUtil extends Serializable {
  2. private val conf = HBaseConfiguration.create()
  3. conf.set(HConstants.ZOOKEEPER_QUORUM,"ecs1,ecs1,ecs3")
  4. conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/hbase")
  5. private val connection = ConnectionFactory.createConnection(conf)
  6. def getDefaultConn: Connection = connection
  7. }
  8. //創建數據流 unionStreams
  9. unionStreams.foreachRDD(rdd => {
  10. rdd.map(bytes => new String(bytes))
  11. .flatMap(line => line.split(" "))
  12. .map(word => (word, 1))
  13. .reduceByKey(_ + _)
  14. .mapPartitions {words => {
  15. val conn = ConnectionUtil.getDefaultConn
  16. val tableName = TableName.valueOf(tname)
  17. val t = conn.getTable(tableName)
  18. try {
  19. words.sliding(100, 100).foreach(slice => {
  20. val puts = slice.map(word => {
  21. println(s"word: $word")
  22. val put = new Put(Bytes.toBytes(word._1 + System.currentTimeMillis()))
  23. put.addColumn(COLUMN_FAMILY_BYTES, COLUMN_QUALIFIER_BYTES,
  24. System.currentTimeMillis(), Bytes.toBytes(word._2))
  25. put
  26. }).toList
  27. t.put(puts)
  28. })
  29. } finally {
  30. t.close()
  31. }
  32. Iterator.empty
  33. }}.count()
  34. })
  35. ssc.start()
  36. ssc.awaitTermination()

附錄

完整示例代碼請看:

最後更新:2016-12-19 19:38:30

  上一篇:go Spark + MNS__Spark_開發人員指南_E-MapReduce-阿裏雲
  下一篇:go spark-submit 參數設置說明__Spark_開發人員指南_E-MapReduce-阿裏雲