閱讀438 返回首頁    go 阿裏雲


Spark + Log Service__Spark_開發人員指南_E-MapReduce-阿裏雲

Spark + Log Service

Spark 接入 Log Service

下麵這個例子演示了 Spark Streaming 如何消費 Log Service 中的日誌數據,統計日誌條數。

  1. if (args.length < 6) {
  2. System.err.println(
  3. """Usage: TestLoghub <sls project> <sls logstore> <loghub group name>
  4. | <sls endpoint> <receiver number> <batch interval seconds>
  5. """.stripMargin)
  6. System.exit(1)
  7. }
  8. val logserviceProject = args(0) // Log Service 中 project 名
  9. val logStoreName = args(1) // Log Service 中 logstore 名
  10. val loghubGroupName = args(2) // loghubGroupName 相同的作業將共同消費 logstore 的數據
  11. val loghubEndpoint = args(3) // 阿裏雲日誌服務數據類 API Endpoint
  12. val accessKeyId = "<accessKeyId>" // 訪問日誌服務的 AccessKeyId
  13. val accessKeySecret = "<accessKeySecret>" // 訪問日誌服務的 AccessKeySecret
  14. val numReceivers = args(4).toInt // 啟動多少個 Receiver 來讀取 logstore 中的數據
  15. val batchInterval = Milliseconds(args(5).toInt * 1000) // Spark Streaming 中每次處理批次時間間隔
  16. val conf = new SparkConf().setAppName("Test Loghub Streaming")
  17. val ssc = new StreamingContext(conf, batchInterval)
  18. val loghubStream = LoghubUtils.createStream(
  19. ssc,
  20. loghubProject,
  21. logStream,
  22. loghubGroupName,
  23. endpoint,
  24. numReceivers,
  25. accessKeyId,
  26. accessKeySecret,
  27. StorageLevel.MEMORY_AND_DISK)
  28. loghubStream.foreachRDD(rdd => println(rdd.count()))
  29. ssc.start()
  30. ssc.awaitTermination()

說明

  • E-MapReduce SDK支持LogService的三種消費模式,即“BEGIN_CURSOR”,“END_CURSOR”和“SPECIAL_TIMER_CURSOR”,默認是“END_CURSOR”。
    • BEGIN_CURSOR:從日誌頭開始消費,如果有checkpoint記錄,則從checkpoint處開始消費。
    • END_CURSOR:從日誌尾開始消費,如果有checkpoint記錄,則從checkpoint處開始消費。
    • SPECIAL_TIMER_CURSOR:從指定時間點開始消費,如果有checkpoint記錄,則從checkpoint處開始消費。單位為秒。
    • 以上三種消費模式都收到checkpoint記錄的影響,如果存在checkpoint記錄,則從checkpoint處開始消費,不管指定的是什麼消費模式。E-MapReduce SDK基於“SPECIAL_TIMER_CURSOR”模式支持用戶強製在指定時間點開始消費:在LoghubUtils#createStream接口中,以下參數需要組合使用:
      • cursorPosition:LogHubCursorPosition.SPECIAL_TIMER_CURSOR
      • forceSpecial:true
  • E-MapReduce 的機器(除了 Master 節點)無法連接公網。配置 Log Service endpoint 時,請注意使用 Log Service 提供的內網 endpoint,否則無法請求到 Log Service。
  • 了解更多關於 Log Service,請查看相關文檔

附錄

完整示例代碼請看:

最後更新:2016-12-19 19:29:39

  上一篇:go Spark + ONS__Spark_開發人員指南_E-MapReduce-阿裏雲
  下一篇:go Spark + MNS__Spark_開發人員指南_E-MapReduce-阿裏雲