閱讀91 返回首頁    go 阿裏雲 go 技術社區[雲棲]


LogHub-協同消費組__Getting-Started_日誌服務-阿裏雲

協同消費組(ConsumerGroup)是實時消費數據高級模式,能夠提供多個消費實例對日誌庫消費自動負載均衡。Spark StreamingStorm都以ConsumerGroup作為基礎模式。

通過控製查看消費進度

  1. 選擇project進入協同消費功能頁麵,選擇日誌庫(LogStore)後即可查看目前是否啟用協同消費功能 consumer
  2. 選擇指定ConsumerGroup之後,點擊“消費狀態”鏈接,即可查看當前每個Shard消費數據的進度 status

如上圖所示,頁麵上展示該日誌庫包含5個Shard,對應5個消費者,其中每個消費者最近消費的數據時間如第二列顯示。通過消費數據時間可以判斷出目前數據處理是否能滿足數據產生速度,如果已經嚴重落後於當前時間(即數據消費速率小於數據產生速率),可以考慮增加消費者數目。

通過API/SDK查看消費進度

以Java SDK作為例子,演示如何通過API獲得消費狀態:

  1. package test;
  2. import java.util.ArrayList;
  3. import com.aliyun.openservices.log.Client;
  4. import com.aliyun.openservices.log.common.Consts.CursorMode;
  5. import com.aliyun.openservices.log.common.ConsumerGroup;
  6. import com.aliyun.openservices.log.common.ConsumerGroupShardCheckPoint;
  7. import com.aliyun.openservices.log.exception.LogException;
  8. public class ConsumerGroupTest {
  9. static String endpoint = "";
  10. static String project = "";
  11. static String logstore = "";
  12. static String accesskeyId = "";
  13. static String accesskey = "";
  14. public static void main(String[] args) throws LogException {
  15. Client client = new Client(endpoint, accesskeyId, accesskey);
  16. //獲取這個logstore下的所有consumer group,可能不存在,此時consumerGroups的長度是0
  17. ArrayList<ConsumerGroup> consumerGroups;
  18. try{
  19. consumerGroups = client.ListConsumerGroup(project, logstore).GetConsumerGroups();
  20. }
  21. catch(LogException e){
  22. if(e.GetErrorCode() == "LogStoreNotExist")
  23. System.out.println("this logstore does not have any consumer group");
  24. else{
  25. //internal server error branch
  26. }
  27. return;
  28. }
  29. for(ConsumerGroup c: consumerGroups){
  30. //打印consumer group的屬性,包括名稱、心跳超時時間、是否按序消費
  31. System.out.println("名稱: " + c.getConsumerGroupName());
  32. System.out.println("心跳超時時間: " + c.getTimeout());
  33. System.out.println("按序消費: " + c.isInOrder());
  34. for(ConsumerGroupShardCheckPoint cp: client.GetCheckPoint(project, logstore, c.getConsumerGroupName()).GetCheckPoints()){
  35. System.out.println("shard: " + cp.getShard());
  36. //請格式化下,這個時間返回精確到毫秒的時間,長整型
  37. System.out.println("最後一次消費數據的時間: " + cp.getUpdateTime());
  38. System.out.println("消費者名稱: " + cp.getConsumer());
  39. String consumerPrg = "";
  40. if(cp.getCheckPoint().isEmpty())
  41. consumerPrg = "尚未開始消費";
  42. else{
  43. //unix時間戳,單位是秒,輸出的時候請注意格式化
  44. try{
  45. int prg = client.GetPrevCursorTime(project, logstore, cp.getShard(), cp.getCheckPoint()).GetCursorTime();
  46. consumerPrg = "" + prg;
  47. }
  48. catch(LogException e){
  49. if(e.GetErrorCode() == "InvalidCursor")
  50. consumerPrg = "非法,前一次消費時刻已經超出了logstore中數據的生命周期";
  51. else{
  52. //internal server error
  53. throw e;
  54. }
  55. }
  56. }
  57. System.out.println("消費進度: " + consumerPrg);
  58. String endCursor = client.GetCursor(project, logstore, cp.getShard(), CursorMode.END).GetCursor();
  59. int endPrg = 0;
  60. try{
  61. endPrg = client.GetPrevCursorTime(project, logstore, cp.getShard(), endCursor).GetCursorTime();
  62. }
  63. catch(LogException e){
  64. //do nothing
  65. }
  66. //unix時間戳,單位是秒,輸出的時候請注意格式化
  67. System.out.println("最後一條數據到達時刻: " + endPrg);
  68. }
  69. }
  70. }
  71. }

最後更新:2016-11-24 11:23:49

  上一篇:go LogHub-預覽數據__Getting-Started_日誌服務-阿裏雲
  下一篇:go LogHub-監控__Getting-Started_日誌服務-阿裏雲