675
阿里云
Spark + MNS__Spark_开发人员指南_E-MapReduce-阿里云
Spark + MNS
Spark 接入 MNS
下面这个例子演示了 Spark Streaming 如何消费 MNS 中的数据,统计每个 batch 内的单词个数。
val conf = new SparkConf().setAppName("Test MNS Streaming")
val batchInterval = Seconds(10)
val ssc = new StreamingContext(conf, batchInterval)
val queuename = "queuename"
val accessKeyId = "<accessKeyId>"
val accessKeySecret = "<accessKeySecret>"
val endpoint = "https://xxx.yyy.zzzz/abc"
val mnsStream = MnsUtils.createPullingStreamAsRawBytes(ssc, queuename, accessKeyId, accessKeySecret, endpoint,
StorageLevel.MEMORY_ONLY)
mnsStream.foreachRDD( rdd => {
rdd.map(bytes => new String(bytes)).flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _).collect().foreach(e => println(s"word: ${e._1}, cnt: ${e._2}"))
})
ssc.start()
ssc.awaitTermination()
附录
完整示例代码请看:
最后更新:2016-12-19 19:37:55
上一篇:
Spark + Log Service__Spark_开发人员指南_E-MapReduce-阿里云
下一篇:
Spark + HBase__Spark_开发人员指南_E-MapReduce-阿里云
样例代码__Python SDK_SDK参考_E-MapReduce-阿里云
错误__Go-SDK_SDK 参考_对象存储 OSS-阿里云
共享型虚拟主机收到被Ddos攻击关停邮件的处理方法___安全问题_技术分享_云虚机主机-阿里云
云服务器 ECS 镜像复制FAQ
如何提交作业__操作指南_批量计算-阿里云
企业信息安全整体解决方案 阿里云栖大会,我们来了!
查询所有区域下未创建任何 VSwitch 的 VPC 列表__脚本使用示例_用户指南_命令行工具 CLI-阿里云
创建伸缩配置__API快速入门_快速入门_弹性伸缩-阿里云
实时修正算法开发手册__算法规范_开发者指南_推荐引擎-阿里云
开通 OSS 服务__快速入门_对象存储 OSS-阿里云
相关内容
常见错误说明__附录_大数据计算服务-阿里云
发送短信接口__API使用手册_短信服务-阿里云
接口文档__Android_安全组件教程_移动安全-阿里云
运营商错误码(联通)__常见问题_短信服务-阿里云
设置短信模板__使用手册_短信服务-阿里云
OSS 权限问题及排查__常见错误及排除_最佳实践_对象存储 OSS-阿里云
消息通知__操作指南_批量计算-阿里云
设备端快速接入(MQTT)__快速开始_阿里云物联网套件-阿里云
查询API调用流量数据__API管理相关接口_API_API 网关-阿里云
使用STS访问__JavaScript-SDK_SDK 参考_对象存储 OSS-阿里云