Spark + ONS__Spark_开发人员指南_E-MapReduce-阿里云
Spark + ONS
Spark 接入 ONS
下面这个例子演示了 Spark Streaming 如何消费 ONS 中的数据,统计每个 batch 内的单词个数。
val Array(cId, topic, subExpression, parallelism, interval) = args
val accessKeyId = "<accessKeyId>"
val accessKeySecret = "<accessKeySecret>"
val numStreams = parallelism.toInt
val batchInterval = Milliseconds(interval.toInt)
val conf = new SparkConf().setAppName("Test ONS Streaming")
val ssc = new StreamingContext(conf, batchInterval)
def func: Message => Array[Byte] = msg => msg.getBody
val onsStreams = (0 until numStreams).map { i =>
println(s"starting stream $i")
OnsUtils.createStream(
ssc,
cId,
topic,
subExpression,
accessKeyId,
accessKeySecret,
StorageLevel.MEMORY_AND_DISK_2,
func)
}
val unionStreams = ssc.union(onsStreams)
unionStreams.foreachRDD(rdd => {
rdd.map(bytes => new String(bytes)).flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _).collect().foreach(e => println(s"word: ${e._1}, cnt: ${e._2}"))
})
ssc.start()
ssc.awaitTermination()
附录
示例代码请看:
最后更新:2016-12-19 19:20:01
上一篇:
Spark + ODPS__Spark_开发人员指南_E-MapReduce-阿里云
下一篇:
Spark + Log Service__Spark_开发人员指南_E-MapReduce-阿里云
查询返回码__资源监控接口_API 手册_CDN-阿里云
事务消息__最佳实践_消息服务-阿里云
停止作业实例__执行计划_API参考_E-MapReduce-阿里云
管理Bucket__Java-SDK_SDK 参考_对象存储 OSS-阿里云
上传SSL证书__域名相关接口_API_API 网关-阿里云
作业盒子、腾跃校长在线、音乐笔记等获融资;新东方网与新东方拟出资参与设立霍东投资;阿里云发布互联网大学
删除签名密钥__后端签名密钥相关接口_API_API 网关-阿里云
服务器端加密编码__安全管理_开发人员指南_对象存储 OSS-阿里云
简单下载示例__SDK示例_批量数据通道_大数据计算服务-阿里云
云服务器 ECS实例规格族
相关内容
常见错误说明__附录_大数据计算服务-阿里云
发送短信接口__API使用手册_短信服务-阿里云
接口文档__Android_安全组件教程_移动安全-阿里云
运营商错误码(联通)__常见问题_短信服务-阿里云
设置短信模板__使用手册_短信服务-阿里云
OSS 权限问题及排查__常见错误及排除_最佳实践_对象存储 OSS-阿里云
消息通知__操作指南_批量计算-阿里云
设备端快速接入(MQTT)__快速开始_阿里云物联网套件-阿里云
查询API调用流量数据__API管理相关接口_API_API 网关-阿里云
使用STS访问__JavaScript-SDK_SDK 参考_对象存储 OSS-阿里云