Spark + HBase__Spark_开发人员指南_E-MapReduce-阿里云
Spark + Hbase
Spark 接入 Hbase
下面这个例子演示了 Spark 如何向 Hbase 写数据。需要指出的是,计算集群需要和 Hbase 集群处于一个安全组内,否则网络无法打通。在 E-Mapreduce 创建集群时,请注意选择 Hbase 集群所处的安全组。
object ConnectionUtil extends Serializable {
private val conf = HBaseConfiguration.create()
conf.set(HConstants.ZOOKEEPER_QUORUM,"ecs1,ecs1,ecs3")
conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/hbase")
private val connection = ConnectionFactory.createConnection(conf)
def getDefaultConn: Connection = connection
}
//创建数据流 unionStreams
unionStreams.foreachRDD(rdd => {
rdd.map(bytes => new String(bytes))
.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
.mapPartitions {words => {
val conn = ConnectionUtil.getDefaultConn
val tableName = TableName.valueOf(tname)
val t = conn.getTable(tableName)
try {
words.sliding(100, 100).foreach(slice => {
val puts = slice.map(word => {
println(s"word: $word")
val put = new Put(Bytes.toBytes(word._1 + System.currentTimeMillis()))
put.addColumn(COLUMN_FAMILY_BYTES, COLUMN_QUALIFIER_BYTES,
System.currentTimeMillis(), Bytes.toBytes(word._2))
put
}).toList
t.put(puts)
})
} finally {
t.close()
}
Iterator.empty
}}.count()
})
ssc.start()
ssc.awaitTermination()
附录
完整示例代码请看:
最后更新:2016-12-19 19:38:30
上一篇:
Spark + MNS__Spark_开发人员指南_E-MapReduce-阿里云
下一篇:
spark-submit 参数设置说明__Spark_开发人员指南_E-MapReduce-阿里云
DeleteObject__关于Object操作_API 参考_对象存储 OSS-阿里云
切换主备实例__实例管理_用户指南_云数据库 RDS 版-阿里云
动作列表__RAM子用户访问_API-Reference_日志服务-阿里云
修改共享带宽包-增加公网IP__NAT网关相关接口_API 参考_云服务器 ECS-阿里云
创建主题__主题操作_快速入门_消息服务-阿里云
云服务器 ECS 加强云服务器安全
405错误___排错手册_Web 应用防火墙-阿里云
删除水印模板__水印模板接口_API使用手册_媒体转码-阿里云
RAM子用户使用__Getting-Started_日志服务-阿里云
iOS 推流SDK使用说明__SDK文档及下载_API及SDK_视频直播-阿里云
相关内容
常见错误说明__附录_大数据计算服务-阿里云
发送短信接口__API使用手册_短信服务-阿里云
接口文档__Android_安全组件教程_移动安全-阿里云
运营商错误码(联通)__常见问题_短信服务-阿里云
设置短信模板__使用手册_短信服务-阿里云
OSS 权限问题及排查__常见错误及排除_最佳实践_对象存储 OSS-阿里云
消息通知__操作指南_批量计算-阿里云
设备端快速接入(MQTT)__快速开始_阿里云物联网套件-阿里云
查询API调用流量数据__API管理相关接口_API_API 网关-阿里云
使用STS访问__JavaScript-SDK_SDK 参考_对象存储 OSS-阿里云