LogHub-协同消费组__Getting-Started_日志服务-阿里云
协同消费组(ConsumerGroup)是实时消费数据高级模式,能够提供多个消费实例对日志库消费自动负载均衡。Spark Streaming、Storm都以ConsumerGroup作为基础模式。
通过控制查看消费进度
- 选择project进入协同消费功能页面,选择日志库(LogStore)后即可查看目前是否启用协同消费功能

- 选择指定ConsumerGroup之后,点击“消费状态”链接,即可查看当前每个Shard消费数据的进度

如上图所示,页面上展示该日志库包含5个Shard,对应5个消费者,其中每个消费者最近消费的数据时间如第二列显示。通过消费数据时间可以判断出目前数据处理是否能满足数据产生速度,如果已经严重落后于当前时间(即数据消费速率小于数据产生速率),可以考虑增加消费者数目。
通过API/SDK查看消费进度
以Java SDK作为例子,演示如何通过API获得消费状态:
package test;import java.util.ArrayList;import com.aliyun.openservices.log.Client;import com.aliyun.openservices.log.common.Consts.CursorMode;import com.aliyun.openservices.log.common.ConsumerGroup;import com.aliyun.openservices.log.common.ConsumerGroupShardCheckPoint;import com.aliyun.openservices.log.exception.LogException;public class ConsumerGroupTest {static String endpoint = "";static String project = "";static String logstore = "";static String accesskeyId = "";static String accesskey = "";public static void main(String[] args) throws LogException {Client client = new Client(endpoint, accesskeyId, accesskey);//获取这个logstore下的所有consumer group,可能不存在,此时consumerGroups的长度是0ArrayList<ConsumerGroup> consumerGroups;try{consumerGroups = client.ListConsumerGroup(project, logstore).GetConsumerGroups();}catch(LogException e){if(e.GetErrorCode() == "LogStoreNotExist")System.out.println("this logstore does not have any consumer group");else{//internal server error branch}return;}for(ConsumerGroup c: consumerGroups){//打印consumer group的属性,包括名称、心跳超时时间、是否按序消费System.out.println("名称: " + c.getConsumerGroupName());System.out.println("心跳超时时间: " + c.getTimeout());System.out.println("按序消费: " + c.isInOrder());for(ConsumerGroupShardCheckPoint cp: client.GetCheckPoint(project, logstore, c.getConsumerGroupName()).GetCheckPoints()){System.out.println("shard: " + cp.getShard());//请格式化下,这个时间返回精确到毫秒的时间,长整型System.out.println("最后一次消费数据的时间: " + cp.getUpdateTime());System.out.println("消费者名称: " + cp.getConsumer());String consumerPrg = "";if(cp.getCheckPoint().isEmpty())consumerPrg = "尚未开始消费";else{//unix时间戳,单位是秒,输出的时候请注意格式化try{int prg = client.GetPrevCursorTime(project, logstore, cp.getShard(), cp.getCheckPoint()).GetCursorTime();consumerPrg = "" + prg;}catch(LogException e){if(e.GetErrorCode() == "InvalidCursor")consumerPrg = "非法,前一次消费时刻已经超出了logstore中数据的生命周期";else{//internal server errorthrow e;}}}System.out.println("消费进度: " + consumerPrg);String endCursor = client.GetCursor(project, logstore, cp.getShard(), CursorMode.END).GetCursor();int endPrg = 0;try{endPrg = client.GetPrevCursorTime(project, logstore, cp.getShard(), endCursor).GetCursorTime();}catch(LogException e){//do nothing}//unix时间戳,单位是秒,输出的时候请注意格式化System.out.println("最后一条数据到达时刻: " + endPrg);}}}}
最后更新:2016-11-24 11:23:49
上一篇:
LogHub-预览数据__Getting-Started_日志服务-阿里云
下一篇:
LogHub-监控__Getting-Started_日志服务-阿里云
iOS SDK手册__SDK手册_HTTPDNS-阿里云
常见攻击类型及排查处理建议___安全问题_技术分享_云虚机主机-阿里云
Ali-Tomcat 安装__开发工具准备_开发者指南_企业级分布式应用服务 EDAS-阿里云
设置读策略__数据库管理_开放API_分布式关系型数据库 DRDS-阿里云
查询资源列表__资源相关接口_API 文档_资源编排-阿里云
数据订阅__功能特性_产品简介_数据传输-阿里云
获取域名 Whois 信息__域名管理接口_API文档_云解析-阿里云
批量消费消息__队列接口规范_API使用手册_消息服务-阿里云
4.2 逻辑表达式和特殊语法__第四章 DML_使用手册_分析型数据库-阿里云
基本介绍__运维中心手册_用户操作指南_大数据开发套件-阿里云
相关内容
常见错误说明__附录_大数据计算服务-阿里云
发送短信接口__API使用手册_短信服务-阿里云
接口文档__Android_安全组件教程_移动安全-阿里云
运营商错误码(联通)__常见问题_短信服务-阿里云
设置短信模板__使用手册_短信服务-阿里云
OSS 权限问题及排查__常见错误及排除_最佳实践_对象存储 OSS-阿里云
消息通知__操作指南_批量计算-阿里云
设备端快速接入(MQTT)__快速开始_阿里云物联网套件-阿里云
查询API调用流量数据__API管理相关接口_API_API 网关-阿里云
使用STS访问__JavaScript-SDK_SDK 参考_对象存储 OSS-阿里云