LogHub-協同消費組__Getting-Started_日誌服務-阿裏雲
協同消費組(ConsumerGroup)是實時消費數據高級模式,能夠提供多個消費實例對日誌庫消費自動負載均衡。Spark Streaming、Storm都以ConsumerGroup作為基礎模式。
通過控製查看消費進度
- 選擇project進入協同消費功能頁麵,選擇日誌庫(LogStore)後即可查看目前是否啟用協同消費功能
- 選擇指定ConsumerGroup之後,點擊“消費狀態”鏈接,即可查看當前每個Shard消費數據的進度
如上圖所示,頁麵上展示該日誌庫包含5個Shard,對應5個消費者,其中每個消費者最近消費的數據時間如第二列顯示。通過消費數據時間可以判斷出目前數據處理是否能滿足數據產生速度,如果已經嚴重落後於當前時間(即數據消費速率小於數據產生速率),可以考慮增加消費者數目。
通過API/SDK查看消費進度
以Java SDK作為例子,演示如何通過API獲得消費狀態:
package test;
import java.util.ArrayList;
import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.Consts.CursorMode;
import com.aliyun.openservices.log.common.ConsumerGroup;
import com.aliyun.openservices.log.common.ConsumerGroupShardCheckPoint;
import com.aliyun.openservices.log.exception.LogException;
public class ConsumerGroupTest {
static String endpoint = "";
static String project = "";
static String logstore = "";
static String accesskeyId = "";
static String accesskey = "";
public static void main(String[] args) throws LogException {
Client client = new Client(endpoint, accesskeyId, accesskey);
//獲取這個logstore下的所有consumer group,可能不存在,此時consumerGroups的長度是0
ArrayList<ConsumerGroup> consumerGroups;
try{
consumerGroups = client.ListConsumerGroup(project, logstore).GetConsumerGroups();
}
catch(LogException e){
if(e.GetErrorCode() == "LogStoreNotExist")
System.out.println("this logstore does not have any consumer group");
else{
//internal server error branch
}
return;
}
for(ConsumerGroup c: consumerGroups){
//打印consumer group的屬性,包括名稱、心跳超時時間、是否按序消費
System.out.println("名稱: " + c.getConsumerGroupName());
System.out.println("心跳超時時間: " + c.getTimeout());
System.out.println("按序消費: " + c.isInOrder());
for(ConsumerGroupShardCheckPoint cp: client.GetCheckPoint(project, logstore, c.getConsumerGroupName()).GetCheckPoints()){
System.out.println("shard: " + cp.getShard());
//請格式化下,這個時間返回精確到毫秒的時間,長整型
System.out.println("最後一次消費數據的時間: " + cp.getUpdateTime());
System.out.println("消費者名稱: " + cp.getConsumer());
String consumerPrg = "";
if(cp.getCheckPoint().isEmpty())
consumerPrg = "尚未開始消費";
else{
//unix時間戳,單位是秒,輸出的時候請注意格式化
try{
int prg = client.GetPrevCursorTime(project, logstore, cp.getShard(), cp.getCheckPoint()).GetCursorTime();
consumerPrg = "" + prg;
}
catch(LogException e){
if(e.GetErrorCode() == "InvalidCursor")
consumerPrg = "非法,前一次消費時刻已經超出了logstore中數據的生命周期";
else{
//internal server error
throw e;
}
}
}
System.out.println("消費進度: " + consumerPrg);
String endCursor = client.GetCursor(project, logstore, cp.getShard(), CursorMode.END).GetCursor();
int endPrg = 0;
try{
endPrg = client.GetPrevCursorTime(project, logstore, cp.getShard(), endCursor).GetCursorTime();
}
catch(LogException e){
//do nothing
}
//unix時間戳,單位是秒,輸出的時候請注意格式化
System.out.println("最後一條數據到達時刻: " + endPrg);
}
}
}
}
最後更新:2016-11-24 11:23:49
上一篇:
LogHub-預覽數據__Getting-Started_日誌服務-阿裏雲
下一篇:
LogHub-監控__Getting-Started_日誌服務-阿裏雲
iOS SDK手冊__SDK手冊_HTTPDNS-阿裏雲
常見攻擊類型及排查處理建議___安全問題_技術分享_雲虛機主機-阿裏雲
Ali-Tomcat 安裝__開發工具準備_開發者指南_企業級分布式應用服務 EDAS-阿裏雲
設置讀策略__數據庫管理_開放API_分布式關係型數據庫 DRDS-阿裏雲
查詢資源列表__資源相關接口_API 文檔_資源編排-阿裏雲
數據訂閱__功能特性_產品簡介_數據傳輸-阿裏雲
獲取域名 Whois 信息__域名管理接口_API文檔_雲解析-阿裏雲
批量消費消息__隊列接口規範_API使用手冊_消息服務-阿裏雲
4.2 邏輯表達式和特殊語法__第四章 DML_使用手冊_分析型數據庫-阿裏雲
基本介紹__運維中心手冊_用戶操作指南_大數據開發套件-阿裏雲
相關內容
常見錯誤說明__附錄_大數據計算服務-阿裏雲
發送短信接口__API使用手冊_短信服務-阿裏雲
接口文檔__Android_安全組件教程_移動安全-阿裏雲
運營商錯誤碼(聯通)__常見問題_短信服務-阿裏雲
設置短信模板__使用手冊_短信服務-阿裏雲
OSS 權限問題及排查__常見錯誤及排除_最佳實踐_對象存儲 OSS-阿裏雲
消息通知__操作指南_批量計算-阿裏雲
設備端快速接入(MQTT)__快速開始_阿裏雲物聯網套件-阿裏雲
查詢API調用流量數據__API管理相關接口_API_API 網關-阿裏雲
使用STS訪問__JavaScript-SDK_SDK 參考_對象存儲 OSS-阿裏雲