804
阿裏雲
多實例協同消費__loghub-消費_用戶指南_日誌服務-阿裏雲
LogHub Consumer Library使用說明
控製台查看消費進度
具體查看文檔說明。Spark Streaming,Storm Spout默認已經使Consumer Library消費LogHub數據。
使用場景
LogHub Consumer Library是對LogHub消費者提供的高級模式,解決多個消費者同時消費logstore時自動分配shard問題。例如在storm、spark場景中多個消費者情況下,自動處理shard的負載均衡、消費者failover等邏輯。用戶隻需專注在自己業務邏輯上,而無需關心shard分配、CheckPoint、Failover等事宜。
舉一個例子而言,用戶需要通過storm進行流計算,啟動了A、B、C 3個消費實例。在有10個shard情況下,係統會自動為A、B、C分配3、3、4個Shard進行消費。
- 當消費實例A宕機情況下,係統會把A未消費的3個Shard中數據自動均衡B、C上,當A恢複後,會重新均衡。
- 當添加實例D、E情況下,係統會自動進行均衡,每個實例消費2個Shard。
- 當Shard有Merge/Split等情況下,會根據最新的Shard信息,重新均衡。
- 當read only狀態的shard消費完之後,剩餘的shard會重新做負載均衡。
以上整個過程不會產生數據丟失、以及重複,用戶隻需在代碼中做三件事情:
- 填寫配置參數。
- 寫處理日誌的代碼。
- 啟動消費實例。
我們強烈建議使用loghub consumer library進行數據消費,這樣您隻需要關心怎麼處理數據,而不需要關注複雜的負載均衡、消費斷點保存、按序消費、消費異常處理等問題。
術語簡介
loghub consumer library中主要有4個概念,分別是consumer group、consumer、heartbeat和checkpoint,它們之間的關係如下:
- consumer group
是logstore的子資源,擁有相同consumer group 名字的消費者共同消費同一個logstore的所有數據,這些消費者之間不會重複消費數據,一個logstore下麵可以最多創建5個consumer group,不可以重名,同一個logstore下麵的consumer group之間消費數據不會互相影響。consumer group有兩個很重要的屬性:
{
"order":boolean,
"timeout": integer
}
order屬性表示是否按照寫入時間順序消費key相同的數據,timeout表示consumer group中消費者的超時時間,單位是秒,當一個消費者匯報心跳的時間間隔超過timeout,會被認為已經超時,服務端認為這個consumer此時已經下線了。
- consumer
消費者,每個consumer上會被分配若幹個shard,consumer的職責就是要消費這些shard上的數據,同一個consumer group中的consumer必須不重名。
- heartbeat
消費者心跳,consumer需要定期向服務端匯報一個心跳包,用於表明自己還處於存活狀態。
- checkpoint
消費者定期將分配給自己的shard消費到的位置保存到服務端,這樣當這個shard被分配給其它消費者時,從服務端可以獲取shard的消費斷點,接著從斷點繼續消費數據。
如何使用loghub consumer library
- 實現loghub consumer library中的兩個接口類:
- ILogHubProcessor // 每個shard對應一個實例,每個實例隻消費特定shard的數據。
- ILogHubProcessorFactory // 負責生產實現ILogHubProcessor接口實例。
- 填寫參數配置。
- 啟動一個或多個client worker實例。
maven地址
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>loghub-client-lib</artifactId>
<version>0.6.5</version>
</dependency>
使用sample
main函數
public static void main(String args[])
{
LogHubConfig config = new LogHubConfig(...);
ClientWorker worker = new ClientWorker(new SampleLogHubProcessorFactory(), config);
Thread thread = new Thread(worker);
//thread運行之後,client worker會自動運行,ClientWorker擴展了Runnable接口。
thread.start();
Thread.sleep(60 * 60 * 1000);
//調用worker的shutdown函數,退出消費實例,關聯的線程也會自動停止。
worker.shutdown();
//ClientWorker運行過程中會生成多個異步的Task,shutdown之後最好等待還在執行的Task安全退出,建議30s。
Thread.sleep(30 * 1000);
}
ILogHubProcessor、ILogHubProcessorFactory 實現sample
各個shard對應的消費實例類,實際開發過程中用戶主要需要關注數據消費邏輯,同一個ClientWorker實例是串行消費數據的,隻會產生一個ILogHubProcessor實例,ClientWorker退出的時候會調用ILogHubProcessor的shutdown函數。
public class SampleLogHubProcessor implements ILogHubProcessor
{
private int mShardId;
// 記錄上次持久化check point的時間
private long mLastCheckTime = 0;
public void initialize(int shardId)
{
mShardId = shardId;
}
// 消費數據的主邏輯
public String process(List<LogGroupData> logGroups,
ILogHubCheckPointTracker checkPointTracker)
{
for(LogGroupData logGroup: logGroups)
{
LogGroup lg = logGroup.GetLogGroup();
System.out.println("source ip:" + lg.getSource());
System.out.println("topic: " + lg.getTopic());
for(Log log: lg.getLogsList())
{
StringBuilder content = new StringBuilder();
content.append(log.getTime() + "t");
for(Content cont: log.getContentsList())
{
content.append(cont.getKey() + "=" + cont.getValue()+ "t");
}
System.out.println(content.toString());
}
}
long curTime = System.currentTimeMillis();
// 每隔60秒,寫一次check point到服務端,如果60秒內,worker crash,
// 新啟動的worker會從上一個checkpoint其消費數據,有可能有重複數據
if (curTime - mLastCheckTime > 60 * 1000)
{
try
{
//參數true表示立即將checkpoint更新到服務端,為false會將checkpoint緩存在本地,默認隔60s
//後台會將checkpoint刷新到服務端。
checkPointTracker.saveCheckPoint(true);
}
catch (LogHubCheckPointException e)
{
e.printStackTrace();
}
mLastCheckTime = curTime;
}
else
{
try
{
checkPointTracker.saveCheckPoint(false);
}
catch (LogHubCheckPointException e)
{
e.printStackTrace();
}
}
// 返回空表示正常處理數據, 如果需要回滾到上個check point的點進行重試的話,可以return checkPointTracker.getCheckpoint()
return null;
}
// 當worker退出的時候,會調用該函數,用戶可以在此處做些清理工作。
public void shutdown(ILogHubCheckPointTracker checkPointTracker)
{
//將消費斷點保存到服務端。
try {
checkPointTracker.saveCheckPoint(true);
} catch (LogHubCheckPointException e) {
e.printStackTrace();
}
}
}
- 生成 ILogHubProcessor的工廠類 :
public class SampleLogHubProcessorFactory implements ILogHubProcessorFactory
{
public ILogHubProcessor generatorProcessor()
{
// 生成一個消費實例
return new SampleLogHubProcessor();
}
}
配置說明:
public class LogHubConfig
{
//worker默認的拉取數據的時間間隔
public static final long DEFAULT_DATA_FETCH_INTERVAL_MS = 200;
//consumer group的名字,不能為空,支持[a-z][0-9]和'_','-',長度在[3-63]字符,隻能以小寫字母和數字開頭結尾
private String mConsumerGroupName;
//consumer的名字,必須確保同一個consumer group下麵的各個consumer不重名
private String mWorkerInstanceName;
//loghub數據接口地址
private String mLogHubEndPoint;
//項目名稱
private String mProject;
//日誌庫名稱
private String mLogStore;
//雲賬號的access key id
private String mAccessId;
//雲賬號的access key
private String mAccessKey;
//用於指出在服務端沒有記錄shard的checkpoint的情況下應該從什麼位置消費shard,如果服務端保存了有效的checkpoint信息,那麼這些取值不起任何作用,mCursorPosition取值可以是[BEGIN_CURSOR, END_CURSOR, SPECIAL_TIMER_CURSOR]中的一個,BEGIN_CURSOR表示從shard中的第一條數據開始消費,END_CURSOR表示從shard中的當前時刻的最後一條數據開始消費,SPECIAL_TIMER_CURSOR和下麵的mLoghubCursorStartTime配對使用,表示從特定的時刻開始消費數據。
private LogHubCursorPosition mCursorPosition;
//當mCursorPosition取值為SPECIAL_TIMER_CURSOR時,指定消費時間,單位是秒。
private int mLoghubCursorStartTime = 0;
// 輪詢獲取loghub數據的時間間隔,間隔越小,抓取越快,單位是毫秒,默認是DEFAULT_DATA_FETCH_INTERVAL_MS,建議時間間隔200ms以上。
private long mDataFetchIntervalMillis;
// worker向服務端匯報心跳的時間間隔,單位是毫秒,建議取值10000ms。
private long mHeartBeatIntervalMillis;
//是否按序消費
private boolean mConsumeInOrder;
}
常見問題&注意事項
- LogHubConfig 中 consumerGroupName表一個消費組,consumerGroupName相同的consumer分攤消費logstore中的shard,同一個consumerGroupName中的consumer,通過workerInstance name進行區分。
假設logstore中有shard 0 ~ shard 3 這4個shard。
有3個worker,其consumerGroupName和workerinstance name分別是 :
<consumer_group_name_1 , worker_A>,
<consumer_group_name_1 , worker_B>,
<consumer_group_name_2 , worker_C>
則,這些worker和shard的分配關係是:
<consumer_group_name_1 , worker_A>: shard_0, shard_1
<consumer_group_name_1 , worker_B>: shard_2, shard_3
<consumer_group_name_2 , worker_C>: shard_0, shard_1, shard_2, shard_3 # group name不同的worker相互不影響
- 確保實現的ILogHubProcessor process()接口每次都能順利執行,並退出,這點很重要。
- ILogHubCheckPointTracker的saveCheckPoint()接口,無論傳遞的參數是true,還是false,都表示當前處理的數據已經完成,參數為true,則立刻持久化至服務端,false則每隔60秒同步一次到服務端。
- 如果LogHubConfig中配置的是子用戶的accessKeyId、accessKey,需要在RAM中進行以下授權,詳細內容請參考API文檔:
Action | Resource |
---|---|
log:GetCursorOrData | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName} |
log:CreateConsumerGroup | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/* |
log:ListConsumerGroup | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/* |
log:ConsumerGroupUpdateCheckPoint | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName} |
log:ConsumerGroupHeartBeat | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName} |
log:GetConsumerGroupCheckPoint | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName} |
最後更新:2016-12-16 11:55:42
上一篇:
消費日誌__loghub-消費_用戶指南_日誌服務-阿裏雲
下一篇:
通過Storm消費__loghub-消費_用戶指南_日誌服務-阿裏雲
任務管理__運維中心手冊_用戶操作指南_大數據開發套件-阿裏雲
遷移 RDS for PostgreSQL 數據到本地 PostgreSQL__數據遷移_用戶指南_雲數據庫 RDS 版-阿裏雲
CreateLoadBalancerUDPListener__Listener相關API_API 參考_負載均衡-阿裏雲
包年包月__計費方式_購買指南_對象存儲 OSS-阿裏雲
查詢UV數據__資源監控接口_API 手冊_CDN-阿裏雲
跟著Demo快速體驗移動推送__快速開始_移動推送-阿裏雲
雲服務器 ECS複製鏡像
采集-日誌管理__最佳實踐_日誌服務-阿裏雲
日誌服務監控指標__常見問題_日誌服務-阿裏雲
線上培訓課程介紹__線上培訓課程_上雲培訓-阿裏雲
相關內容
常見錯誤說明__附錄_大數據計算服務-阿裏雲
發送短信接口__API使用手冊_短信服務-阿裏雲
接口文檔__Android_安全組件教程_移動安全-阿裏雲
運營商錯誤碼(聯通)__常見問題_短信服務-阿裏雲
設置短信模板__使用手冊_短信服務-阿裏雲
OSS 權限問題及排查__常見錯誤及排除_最佳實踐_對象存儲 OSS-阿裏雲
消息通知__操作指南_批量計算-阿裏雲
設備端快速接入(MQTT)__快速開始_阿裏雲物聯網套件-阿裏雲
查詢API調用流量數據__API管理相關接口_API_API 網關-阿裏雲
使用STS訪問__JavaScript-SDK_SDK 參考_對象存儲 OSS-阿裏雲