阅读675 返回首页    go 阿里云


Spark + MNS__Spark_开发人员指南_E-MapReduce-阿里云

Spark + MNS

Spark 接入 MNS

下面这个例子演示了 Spark Streaming 如何消费 MNS 中的数据,统计每个 batch 内的单词个数。

  1. val conf = new SparkConf().setAppName("Test MNS Streaming")
  2. val batchInterval = Seconds(10)
  3. val ssc = new StreamingContext(conf, batchInterval)
  4. val queuename = "queuename"
  5. val accessKeyId = "<accessKeyId>"
  6. val accessKeySecret = "<accessKeySecret>"
  7. val endpoint = "https://xxx.yyy.zzzz/abc"
  8. val mnsStream = MnsUtils.createPullingStreamAsRawBytes(ssc, queuename, accessKeyId, accessKeySecret, endpoint,
  9. StorageLevel.MEMORY_ONLY)
  10. mnsStream.foreachRDD( rdd => {
  11. rdd.map(bytes => new String(bytes)).flatMap(line => line.split(" "))
  12. .map(word => (word, 1))
  13. .reduceByKey(_ + _).collect().foreach(e => println(s"word: ${e._1}, cnt: ${e._2}"))
  14. })
  15. ssc.start()
  16. ssc.awaitTermination()

附录

完整示例代码请看:

最后更新:2016-12-19 19:37:55

  上一篇:go Spark + Log Service__Spark_开发人员指南_E-MapReduce-阿里云
  下一篇:go Spark + HBase__Spark_开发人员指南_E-MapReduce-阿里云