阅读473 返回首页    go 阿里云 go 技术社区[云栖]


Spark + ONS__Spark_开发人员指南_E-MapReduce-阿里云

Spark + ONS

Spark 接入 ONS

下面这个例子演示了 Spark Streaming 如何消费 ONS 中的数据,统计每个 batch 内的单词个数。

  1. val Array(cId, topic, subExpression, parallelism, interval) = args
  2. val accessKeyId = "<accessKeyId>"
  3. val accessKeySecret = "<accessKeySecret>"
  4. val numStreams = parallelism.toInt
  5. val batchInterval = Milliseconds(interval.toInt)
  6. val conf = new SparkConf().setAppName("Test ONS Streaming")
  7. val ssc = new StreamingContext(conf, batchInterval)
  8. def func: Message => Array[Byte] = msg => msg.getBody
  9. val onsStreams = (0 until numStreams).map { i =>
  10. println(s"starting stream $i")
  11. OnsUtils.createStream(
  12. ssc,
  13. cId,
  14. topic,
  15. subExpression,
  16. accessKeyId,
  17. accessKeySecret,
  18. StorageLevel.MEMORY_AND_DISK_2,
  19. func)
  20. }
  21. val unionStreams = ssc.union(onsStreams)
  22. unionStreams.foreachRDD(rdd => {
  23. rdd.map(bytes => new String(bytes)).flatMap(line => line.split(" "))
  24. .map(word => (word, 1))
  25. .reduceByKey(_ + _).collect().foreach(e => println(s"word: ${e._1}, cnt: ${e._2}"))
  26. })
  27. ssc.start()
  28. ssc.awaitTermination()

附录

示例代码请看:

最后更新:2016-12-19 19:20:01

  上一篇:go Spark + ODPS__Spark_开发人员指南_E-MapReduce-阿里云
  下一篇:go Spark + Log Service__Spark_开发人员指南_E-MapReduce-阿里云