閱讀675 返回首頁    go 阿裏雲 go 技術社區[雲棲]


Spark + MNS__Spark_開發人員指南_E-MapReduce-阿裏雲

Spark + MNS

Spark 接入 MNS

下麵這個例子演示了 Spark Streaming 如何消費 MNS 中的數據,統計每個 batch 內的單詞個數。

  1. val conf = new SparkConf().setAppName("Test MNS Streaming")
  2. val batchInterval = Seconds(10)
  3. val ssc = new StreamingContext(conf, batchInterval)
  4. val queuename = "queuename"
  5. val accessKeyId = "<accessKeyId>"
  6. val accessKeySecret = "<accessKeySecret>"
  7. val endpoint = "https://xxx.yyy.zzzz/abc"
  8. val mnsStream = MnsUtils.createPullingStreamAsRawBytes(ssc, queuename, accessKeyId, accessKeySecret, endpoint,
  9. StorageLevel.MEMORY_ONLY)
  10. mnsStream.foreachRDD( rdd => {
  11. rdd.map(bytes => new String(bytes)).flatMap(line => line.split(" "))
  12. .map(word => (word, 1))
  13. .reduceByKey(_ + _).collect().foreach(e => println(s"word: ${e._1}, cnt: ${e._2}"))
  14. })
  15. ssc.start()
  16. ssc.awaitTermination()

附錄

完整示例代碼請看:

最後更新:2016-12-19 19:37:55

  上一篇:go Spark + Log Service__Spark_開發人員指南_E-MapReduce-阿裏雲
  下一篇:go Spark + HBase__Spark_開發人員指南_E-MapReduce-阿裏雲