473
阿裏雲
Spark + ONS__Spark_開發人員指南_E-MapReduce-阿裏雲
Spark + ONS
Spark 接入 ONS
下麵這個例子演示了 Spark Streaming 如何消費 ONS 中的數據,統計每個 batch 內的單詞個數。
val Array(cId, topic, subExpression, parallelism, interval) = args
val accessKeyId = "<accessKeyId>"
val accessKeySecret = "<accessKeySecret>"
val numStreams = parallelism.toInt
val batchInterval = Milliseconds(interval.toInt)
val conf = new SparkConf().setAppName("Test ONS Streaming")
val ssc = new StreamingContext(conf, batchInterval)
def func: Message => Array[Byte] = msg => msg.getBody
val onsStreams = (0 until numStreams).map { i =>
println(s"starting stream $i")
OnsUtils.createStream(
ssc,
cId,
topic,
subExpression,
accessKeyId,
accessKeySecret,
StorageLevel.MEMORY_AND_DISK_2,
func)
}
val unionStreams = ssc.union(onsStreams)
unionStreams.foreachRDD(rdd => {
rdd.map(bytes => new String(bytes)).flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _).collect().foreach(e => println(s"word: ${e._1}, cnt: ${e._2}"))
})
ssc.start()
ssc.awaitTermination()
附錄
示例代碼請看:
最後更新:2016-12-19 19:20:01
上一篇:
Spark + ODPS__Spark_開發人員指南_E-MapReduce-阿裏雲
下一篇:
Spark + Log Service__Spark_開發人員指南_E-MapReduce-阿裏雲
查詢返回碼__資源監控接口_API 手冊_CDN-阿裏雲
事務消息__最佳實踐_消息服務-阿裏雲
停止作業實例__執行計劃_API參考_E-MapReduce-阿裏雲
管理Bucket__Java-SDK_SDK 參考_對象存儲 OSS-阿裏雲
上傳SSL證書__域名相關接口_API_API 網關-阿裏雲
作業盒子、騰躍校長在線、音樂筆記等獲融資;新東方網與新東方擬出資參與設立霍東投資;阿裏雲發布互聯網大學
刪除簽名密鑰__後端簽名密鑰相關接口_API_API 網關-阿裏雲
服務器端加密編碼__安全管理_開發人員指南_對象存儲 OSS-阿裏雲
簡單下載示例__SDK示例_批量數據通道_大數據計算服務-阿裏雲
雲服務器 ECS實例規格族
相關內容
常見錯誤說明__附錄_大數據計算服務-阿裏雲
發送短信接口__API使用手冊_短信服務-阿裏雲
接口文檔__Android_安全組件教程_移動安全-阿裏雲
運營商錯誤碼(聯通)__常見問題_短信服務-阿裏雲
設置短信模板__使用手冊_短信服務-阿裏雲
OSS 權限問題及排查__常見錯誤及排除_最佳實踐_對象存儲 OSS-阿裏雲
消息通知__操作指南_批量計算-阿裏雲
設備端快速接入(MQTT)__快速開始_阿裏雲物聯網套件-阿裏雲
查詢API調用流量數據__API管理相關接口_API_API 網關-阿裏雲
使用STS訪問__JavaScript-SDK_SDK 參考_對象存儲 OSS-阿裏雲