阅读91 返回首页    go 阿里云


LogHub-协同消费组__Getting-Started_日志服务-阿里云

协同消费组(ConsumerGroup)是实时消费数据高级模式,能够提供多个消费实例对日志库消费自动负载均衡。Spark StreamingStorm都以ConsumerGroup作为基础模式。

通过控制查看消费进度

  1. 选择project进入协同消费功能页面,选择日志库(LogStore)后即可查看目前是否启用协同消费功能 consumer
  2. 选择指定ConsumerGroup之后,点击“消费状态”链接,即可查看当前每个Shard消费数据的进度 status

如上图所示,页面上展示该日志库包含5个Shard,对应5个消费者,其中每个消费者最近消费的数据时间如第二列显示。通过消费数据时间可以判断出目前数据处理是否能满足数据产生速度,如果已经严重落后于当前时间(即数据消费速率小于数据产生速率),可以考虑增加消费者数目。

通过API/SDK查看消费进度

以Java SDK作为例子,演示如何通过API获得消费状态:

  1. package test;
  2. import java.util.ArrayList;
  3. import com.aliyun.openservices.log.Client;
  4. import com.aliyun.openservices.log.common.Consts.CursorMode;
  5. import com.aliyun.openservices.log.common.ConsumerGroup;
  6. import com.aliyun.openservices.log.common.ConsumerGroupShardCheckPoint;
  7. import com.aliyun.openservices.log.exception.LogException;
  8. public class ConsumerGroupTest {
  9. static String endpoint = "";
  10. static String project = "";
  11. static String logstore = "";
  12. static String accesskeyId = "";
  13. static String accesskey = "";
  14. public static void main(String[] args) throws LogException {
  15. Client client = new Client(endpoint, accesskeyId, accesskey);
  16. //获取这个logstore下的所有consumer group,可能不存在,此时consumerGroups的长度是0
  17. ArrayList<ConsumerGroup> consumerGroups;
  18. try{
  19. consumerGroups = client.ListConsumerGroup(project, logstore).GetConsumerGroups();
  20. }
  21. catch(LogException e){
  22. if(e.GetErrorCode() == "LogStoreNotExist")
  23. System.out.println("this logstore does not have any consumer group");
  24. else{
  25. //internal server error branch
  26. }
  27. return;
  28. }
  29. for(ConsumerGroup c: consumerGroups){
  30. //打印consumer group的属性,包括名称、心跳超时时间、是否按序消费
  31. System.out.println("名称: " + c.getConsumerGroupName());
  32. System.out.println("心跳超时时间: " + c.getTimeout());
  33. System.out.println("按序消费: " + c.isInOrder());
  34. for(ConsumerGroupShardCheckPoint cp: client.GetCheckPoint(project, logstore, c.getConsumerGroupName()).GetCheckPoints()){
  35. System.out.println("shard: " + cp.getShard());
  36. //请格式化下,这个时间返回精确到毫秒的时间,长整型
  37. System.out.println("最后一次消费数据的时间: " + cp.getUpdateTime());
  38. System.out.println("消费者名称: " + cp.getConsumer());
  39. String consumerPrg = "";
  40. if(cp.getCheckPoint().isEmpty())
  41. consumerPrg = "尚未开始消费";
  42. else{
  43. //unix时间戳,单位是秒,输出的时候请注意格式化
  44. try{
  45. int prg = client.GetPrevCursorTime(project, logstore, cp.getShard(), cp.getCheckPoint()).GetCursorTime();
  46. consumerPrg = "" + prg;
  47. }
  48. catch(LogException e){
  49. if(e.GetErrorCode() == "InvalidCursor")
  50. consumerPrg = "非法,前一次消费时刻已经超出了logstore中数据的生命周期";
  51. else{
  52. //internal server error
  53. throw e;
  54. }
  55. }
  56. }
  57. System.out.println("消费进度: " + consumerPrg);
  58. String endCursor = client.GetCursor(project, logstore, cp.getShard(), CursorMode.END).GetCursor();
  59. int endPrg = 0;
  60. try{
  61. endPrg = client.GetPrevCursorTime(project, logstore, cp.getShard(), endCursor).GetCursorTime();
  62. }
  63. catch(LogException e){
  64. //do nothing
  65. }
  66. //unix时间戳,单位是秒,输出的时候请注意格式化
  67. System.out.println("最后一条数据到达时刻: " + endPrg);
  68. }
  69. }
  70. }
  71. }

最后更新:2016-11-24 11:23:49

  上一篇:go LogHub-预览数据__Getting-Started_日志服务-阿里云
  下一篇:go LogHub-监控__Getting-Started_日志服务-阿里云