閱讀473 返回首頁    go 阿裏雲


Spark + ONS__Spark_開發人員指南_E-MapReduce-阿裏雲

Spark + ONS

Spark 接入 ONS

下麵這個例子演示了 Spark Streaming 如何消費 ONS 中的數據,統計每個 batch 內的單詞個數。

  1. val Array(cId, topic, subExpression, parallelism, interval) = args
  2. val accessKeyId = "<accessKeyId>"
  3. val accessKeySecret = "<accessKeySecret>"
  4. val numStreams = parallelism.toInt
  5. val batchInterval = Milliseconds(interval.toInt)
  6. val conf = new SparkConf().setAppName("Test ONS Streaming")
  7. val ssc = new StreamingContext(conf, batchInterval)
  8. def func: Message => Array[Byte] = msg => msg.getBody
  9. val onsStreams = (0 until numStreams).map { i =>
  10. println(s"starting stream $i")
  11. OnsUtils.createStream(
  12. ssc,
  13. cId,
  14. topic,
  15. subExpression,
  16. accessKeyId,
  17. accessKeySecret,
  18. StorageLevel.MEMORY_AND_DISK_2,
  19. func)
  20. }
  21. val unionStreams = ssc.union(onsStreams)
  22. unionStreams.foreachRDD(rdd => {
  23. rdd.map(bytes => new String(bytes)).flatMap(line => line.split(" "))
  24. .map(word => (word, 1))
  25. .reduceByKey(_ + _).collect().foreach(e => println(s"word: ${e._1}, cnt: ${e._2}"))
  26. })
  27. ssc.start()
  28. ssc.awaitTermination()

附錄

示例代碼請看:

最後更新:2016-12-19 19:20:01

  上一篇:go Spark + ODPS__Spark_開發人員指南_E-MapReduce-阿裏雲
  下一篇:go Spark + Log Service__Spark_開發人員指南_E-MapReduce-阿裏雲