閱讀804 返回首頁    go 阿裏雲


多實例協同消費__loghub-消費_用戶指南_日誌服務-阿裏雲

LogHub Consumer Library使用說明

控製台查看消費進度

具體查看文檔說明Spark StreamingStorm Spout默認已經使Consumer Library消費LogHub數據。

使用場景

LogHub Consumer Library是對LogHub消費者提供的高級模式,解決多個消費者同時消費logstore時自動分配shard問題。例如在storm、spark場景中多個消費者情況下,自動處理shard的負載均衡、消費者failover等邏輯。用戶隻需專注在自己業務邏輯上,而無需關心shard分配、CheckPoint、Failover等事宜。

舉一個例子而言,用戶需要通過storm進行流計算,啟動了A、B、C 3個消費實例。在有10個shard情況下,係統會自動為A、B、C分配3、3、4個Shard進行消費。

  • 當消費實例A宕機情況下,係統會把A未消費的3個Shard中數據自動均衡B、C上,當A恢複後,會重新均衡。
  • 當添加實例D、E情況下,係統會自動進行均衡,每個實例消費2個Shard。
  • 當Shard有Merge/Split等情況下,會根據最新的Shard信息,重新均衡。
  • 當read only狀態的shard消費完之後,剩餘的shard會重新做負載均衡。

以上整個過程不會產生數據丟失、以及重複,用戶隻需在代碼中做三件事情:

  1. 填寫配置參數。
  2. 寫處理日誌的代碼。
  3. 啟動消費實例。

我們強烈建議使用loghub consumer library進行數據消費,這樣您隻需要關心怎麼處理數據,而不需要關注複雜的負載均衡、消費斷點保存、按序消費、消費異常處理等問題

術語簡介

loghub consumer library中主要有4個概念,分別是consumer group、consumer、heartbeat和checkpoint,它們之間的關係如下:

  • consumer group

是logstore的子資源,擁有相同consumer group 名字的消費者共同消費同一個logstore的所有數據,這些消費者之間不會重複消費數據,一個logstore下麵可以最多創建5個consumer group,不可以重名,同一個logstore下麵的consumer group之間消費數據不會互相影響。consumer group有兩個很重要的屬性:

  1. {
  2. "order":boolean,
  3. "timeout": integer
  4. }

order屬性表示是否按照寫入時間順序消費key相同的數據,timeout表示consumer group中消費者的超時時間,單位是秒,當一個消費者匯報心跳的時間間隔超過timeout,會被認為已經超時,服務端認為這個consumer此時已經下線了。

  • consumer

消費者,每個consumer上會被分配若幹個shard,consumer的職責就是要消費這些shard上的數據,同一個consumer group中的consumer必須不重名。

  • heartbeat

消費者心跳,consumer需要定期向服務端匯報一個心跳包,用於表明自己還處於存活狀態。

  • checkpoint

消費者定期將分配給自己的shard消費到的位置保存到服務端,這樣當這個shard被分配給其它消費者時,從服務端可以獲取shard的消費斷點,接著從斷點繼續消費數據。

如何使用loghub consumer library

  • 實現loghub consumer library中的兩個接口類:
    • ILogHubProcessor // 每個shard對應一個實例,每個實例隻消費特定shard的數據。
    • ILogHubProcessorFactory // 負責生產實現ILogHubProcessor接口實例。
  • 填寫參數配置。
  • 啟動一個或多個client worker實例。

maven地址

  1. <dependency>
  2. <groupId>com.aliyun.openservices</groupId>
  3. <artifactId>loghub-client-lib</artifactId>
  4. <version>0.6.5</version>
  5. </dependency>

使用sample

main函數

  1. public static void main(String args[])
  2. {
  3. LogHubConfig config = new LogHubConfig(...);
  4. ClientWorker worker = new ClientWorker(new SampleLogHubProcessorFactory(), config);
  5. Thread thread = new Thread(worker);
  6. //thread運行之後,client worker會自動運行,ClientWorker擴展了Runnable接口。
  7. thread.start();
  8. Thread.sleep(60 * 60 * 1000);
  9. //調用worker的shutdown函數,退出消費實例,關聯的線程也會自動停止。
  10. worker.shutdown();
  11. //ClientWorker運行過程中會生成多個異步的Task,shutdown之後最好等待還在執行的Task安全退出,建議30s。
  12. Thread.sleep(30 * 1000);
  13. }

ILogHubProcessor、ILogHubProcessorFactory 實現sample

  • 各個shard對應的消費實例類,實際開發過程中用戶主要需要關注數據消費邏輯,同一個ClientWorker實例是串行消費數據的,隻會產生一個ILogHubProcessor實例,ClientWorker退出的時候會調用ILogHubProcessor的shutdown函數。

    1. public class SampleLogHubProcessor implements ILogHubProcessor
    2. {
    3. private int mShardId;
    4. // 記錄上次持久化check point的時間
    5. private long mLastCheckTime = 0;
    6. public void initialize(int shardId)
    7. {
    8. mShardId = shardId;
    9. }
    10. // 消費數據的主邏輯
    11. public String process(List<LogGroupData> logGroups,
    12. ILogHubCheckPointTracker checkPointTracker)
    13. {
    14. for(LogGroupData logGroup: logGroups)
    15. {
    16. LogGroup lg = logGroup.GetLogGroup();
    17. System.out.println("source ip:" + lg.getSource());
    18. System.out.println("topic: " + lg.getTopic());
    19. for(Log log: lg.getLogsList())
    20. {
    21. StringBuilder content = new StringBuilder();
    22. content.append(log.getTime() + "t");
    23. for(Content cont: log.getContentsList())
    24. {
    25. content.append(cont.getKey() + "=" + cont.getValue()+ "t");
    26. }
    27. System.out.println(content.toString());
    28. }
    29. }
    30. long curTime = System.currentTimeMillis();
    31. // 每隔60秒,寫一次check point到服務端,如果60秒內,worker crash,
    32. // 新啟動的worker會從上一個checkpoint其消費數據,有可能有重複數據
    33. if (curTime - mLastCheckTime > 60 * 1000)
    34. {
    35. try
    36. {
    37. //參數true表示立即將checkpoint更新到服務端,為false會將checkpoint緩存在本地,默認隔60s
    38. //後台會將checkpoint刷新到服務端。
    39. checkPointTracker.saveCheckPoint(true);
    40. }
    41. catch (LogHubCheckPointException e)
    42. {
    43. e.printStackTrace();
    44. }
    45. mLastCheckTime = curTime;
    46. }
    47. else
    48. {
    49. try
    50. {
    51. checkPointTracker.saveCheckPoint(false);
    52. }
    53. catch (LogHubCheckPointException e)
    54. {
    55. e.printStackTrace();
    56. }
    57. }
    58. // 返回空表示正常處理數據, 如果需要回滾到上個check point的點進行重試的話,可以return checkPointTracker.getCheckpoint()
    59. return null;
    60. }
    61. // 當worker退出的時候,會調用該函數,用戶可以在此處做些清理工作。
    62. public void shutdown(ILogHubCheckPointTracker checkPointTracker)
    63. {
    64. //將消費斷點保存到服務端。
    65. try {
    66. checkPointTracker.saveCheckPoint(true);
    67. } catch (LogHubCheckPointException e) {
    68. e.printStackTrace();
    69. }
    70. }
    71. }
  • 生成 ILogHubProcessor的工廠類 :
    1. public class SampleLogHubProcessorFactory implements ILogHubProcessorFactory
    2. {
    3. public ILogHubProcessor generatorProcessor()
    4. {
    5. // 生成一個消費實例
    6. return new SampleLogHubProcessor();
    7. }
    8. }

    配置說明:

  1. public class LogHubConfig
  2. {
  3. //worker默認的拉取數據的時間間隔
  4. public static final long DEFAULT_DATA_FETCH_INTERVAL_MS = 200;
  5. //consumer group的名字,不能為空,支持[a-z][0-9]和'_','-',長度在[3-63]字符,隻能以小寫字母和數字開頭結尾
  6. private String mConsumerGroupName;
  7. //consumer的名字,必須確保同一個consumer group下麵的各個consumer不重名
  8. private String mWorkerInstanceName;
  9. //loghub數據接口地址
  10. private String mLogHubEndPoint;
  11. //項目名稱
  12. private String mProject;
  13. //日誌庫名稱
  14. private String mLogStore;
  15. //雲賬號的access key id
  16. private String mAccessId;
  17. //雲賬號的access key
  18. private String mAccessKey;
  19. //用於指出在服務端沒有記錄shard的checkpoint的情況下應該從什麼位置消費shard,如果服務端保存了有效的checkpoint信息,那麼這些取值不起任何作用,mCursorPosition取值可以是[BEGIN_CURSOR, END_CURSOR, SPECIAL_TIMER_CURSOR]中的一個,BEGIN_CURSOR表示從shard中的第一條數據開始消費,END_CURSOR表示從shard中的當前時刻的最後一條數據開始消費,SPECIAL_TIMER_CURSOR和下麵的mLoghubCursorStartTime配對使用,表示從特定的時刻開始消費數據。
  20. private LogHubCursorPosition mCursorPosition;
  21. //當mCursorPosition取值為SPECIAL_TIMER_CURSOR時,指定消費時間,單位是秒。
  22. private int mLoghubCursorStartTime = 0;
  23. // 輪詢獲取loghub數據的時間間隔,間隔越小,抓取越快,單位是毫秒,默認是DEFAULT_DATA_FETCH_INTERVAL_MS,建議時間間隔200ms以上。
  24. private long mDataFetchIntervalMillis;
  25. // worker向服務端匯報心跳的時間間隔,單位是毫秒,建議取值10000ms。
  26. private long mHeartBeatIntervalMillis;
  27. //是否按序消費
  28. private boolean mConsumeInOrder;
  29. }

常見問題&注意事項

  • LogHubConfig 中 consumerGroupName表一個消費組,consumerGroupName相同的consumer分攤消費logstore中的shard,同一個consumerGroupName中的consumer,通過workerInstance name進行區分。
    1. 假設logstore中有shard 0 ~ shard 3 這4個shard。
    2. 有3個worker,其consumerGroupName和workerinstance name分別是 :
    3. <consumer_group_name_1 , worker_A>,
    4. <consumer_group_name_1 , worker_B>,
    5. <consumer_group_name_2 , worker_C>
    6. 則,這些worker和shard的分配關係是:
    7. <consumer_group_name_1 , worker_A>: shard_0, shard_1
    8. <consumer_group_name_1 , worker_B>: shard_2, shard_3
    9. <consumer_group_name_2 , worker_C>: shard_0, shard_1, shard_2, shard_3 # group name不同的worker相互不影響
  • 確保實現的ILogHubProcessor process()接口每次都能順利執行,並退出,這點很重要。
  • ILogHubCheckPointTracker的saveCheckPoint()接口,無論傳遞的參數是true,還是false,都表示當前處理的數據已經完成,參數為true,則立刻持久化至服務端,false則每隔60秒同步一次到服務端。
  • 如果LogHubConfig中配置的是子用戶的accessKeyId、accessKey,需要在RAM中進行以下授權,詳細內容請參考API文檔
Action Resource
log:GetCursorOrData acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}
log:CreateConsumerGroup acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/*
log:ListConsumerGroup acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/*
log:ConsumerGroupUpdateCheckPoint acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName}
log:ConsumerGroupHeartBeat acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName}
log:GetConsumerGroupCheckPoint acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName}

最後更新:2016-12-16 11:55:42

  上一篇:go 消費日誌__loghub-消費_用戶指南_日誌服務-阿裏雲
  下一篇:go 通過Storm消費__loghub-消費_用戶指南_日誌服務-阿裏雲