阅读804 返回首页    go 小米路由器


多实例协同消费__loghub-消费_用户指南_日志服务-阿里云

LogHub Consumer Library使用说明

控制台查看消费进度

具体查看文档说明Spark StreamingStorm Spout默认已经使Consumer Library消费LogHub数据。

使用场景

LogHub Consumer Library是对LogHub消费者提供的高级模式,解决多个消费者同时消费logstore时自动分配shard问题。例如在storm、spark场景中多个消费者情况下,自动处理shard的负载均衡、消费者failover等逻辑。用户只需专注在自己业务逻辑上,而无需关心shard分配、CheckPoint、Failover等事宜。

举一个例子而言,用户需要通过storm进行流计算,启动了A、B、C 3个消费实例。在有10个shard情况下,系统会自动为A、B、C分配3、3、4个Shard进行消费。

  • 当消费实例A宕机情况下,系统会把A未消费的3个Shard中数据自动均衡B、C上,当A恢复后,会重新均衡。
  • 当添加实例D、E情况下,系统会自动进行均衡,每个实例消费2个Shard。
  • 当Shard有Merge/Split等情况下,会根据最新的Shard信息,重新均衡。
  • 当read only状态的shard消费完之后,剩余的shard会重新做负载均衡。

以上整个过程不会产生数据丢失、以及重复,用户只需在代码中做三件事情:

  1. 填写配置参数。
  2. 写处理日志的代码。
  3. 启动消费实例。

我们强烈建议使用loghub consumer library进行数据消费,这样您只需要关心怎么处理数据,而不需要关注复杂的负载均衡、消费断点保存、按序消费、消费异常处理等问题

术语简介

loghub consumer library中主要有4个概念,分别是consumer group、consumer、heartbeat和checkpoint,它们之间的关系如下:

  • consumer group

是logstore的子资源,拥有相同consumer group 名字的消费者共同消费同一个logstore的所有数据,这些消费者之间不会重复消费数据,一个logstore下面可以最多创建5个consumer group,不可以重名,同一个logstore下面的consumer group之间消费数据不会互相影响。consumer group有两个很重要的属性:

  1. {
  2. "order":boolean,
  3. "timeout": integer
  4. }

order属性表示是否按照写入时间顺序消费key相同的数据,timeout表示consumer group中消费者的超时时间,单位是秒,当一个消费者汇报心跳的时间间隔超过timeout,会被认为已经超时,服务端认为这个consumer此时已经下线了。

  • consumer

消费者,每个consumer上会被分配若干个shard,consumer的职责就是要消费这些shard上的数据,同一个consumer group中的consumer必须不重名。

  • heartbeat

消费者心跳,consumer需要定期向服务端汇报一个心跳包,用于表明自己还处于存活状态。

  • checkpoint

消费者定期将分配给自己的shard消费到的位置保存到服务端,这样当这个shard被分配给其它消费者时,从服务端可以获取shard的消费断点,接着从断点继续消费数据。

如何使用loghub consumer library

  • 实现loghub consumer library中的两个接口类:
    • ILogHubProcessor // 每个shard对应一个实例,每个实例只消费特定shard的数据。
    • ILogHubProcessorFactory // 负责生产实现ILogHubProcessor接口实例。
  • 填写参数配置。
  • 启动一个或多个client worker实例。

maven地址

  1. <dependency>
  2. <groupId>com.aliyun.openservices</groupId>
  3. <artifactId>loghub-client-lib</artifactId>
  4. <version>0.6.5</version>
  5. </dependency>

使用sample

main函数

  1. public static void main(String args[])
  2. {
  3. LogHubConfig config = new LogHubConfig(...);
  4. ClientWorker worker = new ClientWorker(new SampleLogHubProcessorFactory(), config);
  5. Thread thread = new Thread(worker);
  6. //thread运行之后,client worker会自动运行,ClientWorker扩展了Runnable接口。
  7. thread.start();
  8. Thread.sleep(60 * 60 * 1000);
  9. //调用worker的shutdown函数,退出消费实例,关联的线程也会自动停止。
  10. worker.shutdown();
  11. //ClientWorker运行过程中会生成多个异步的Task,shutdown之后最好等待还在执行的Task安全退出,建议30s。
  12. Thread.sleep(30 * 1000);
  13. }

ILogHubProcessor、ILogHubProcessorFactory 实现sample

  • 各个shard对应的消费实例类,实际开发过程中用户主要需要关注数据消费逻辑,同一个ClientWorker实例是串行消费数据的,只会产生一个ILogHubProcessor实例,ClientWorker退出的时候会调用ILogHubProcessor的shutdown函数。

    1. public class SampleLogHubProcessor implements ILogHubProcessor
    2. {
    3. private int mShardId;
    4. // 记录上次持久化check point的时间
    5. private long mLastCheckTime = 0;
    6. public void initialize(int shardId)
    7. {
    8. mShardId = shardId;
    9. }
    10. // 消费数据的主逻辑
    11. public String process(List<LogGroupData> logGroups,
    12. ILogHubCheckPointTracker checkPointTracker)
    13. {
    14. for(LogGroupData logGroup: logGroups)
    15. {
    16. LogGroup lg = logGroup.GetLogGroup();
    17. System.out.println("source ip:" + lg.getSource());
    18. System.out.println("topic: " + lg.getTopic());
    19. for(Log log: lg.getLogsList())
    20. {
    21. StringBuilder content = new StringBuilder();
    22. content.append(log.getTime() + "t");
    23. for(Content cont: log.getContentsList())
    24. {
    25. content.append(cont.getKey() + "=" + cont.getValue()+ "t");
    26. }
    27. System.out.println(content.toString());
    28. }
    29. }
    30. long curTime = System.currentTimeMillis();
    31. // 每隔60秒,写一次check point到服务端,如果60秒内,worker crash,
    32. // 新启动的worker会从上一个checkpoint其消费数据,有可能有重复数据
    33. if (curTime - mLastCheckTime > 60 * 1000)
    34. {
    35. try
    36. {
    37. //参数true表示立即将checkpoint更新到服务端,为false会将checkpoint缓存在本地,默认隔60s
    38. //后台会将checkpoint刷新到服务端。
    39. checkPointTracker.saveCheckPoint(true);
    40. }
    41. catch (LogHubCheckPointException e)
    42. {
    43. e.printStackTrace();
    44. }
    45. mLastCheckTime = curTime;
    46. }
    47. else
    48. {
    49. try
    50. {
    51. checkPointTracker.saveCheckPoint(false);
    52. }
    53. catch (LogHubCheckPointException e)
    54. {
    55. e.printStackTrace();
    56. }
    57. }
    58. // 返回空表示正常处理数据, 如果需要回滚到上个check point的点进行重试的话,可以return checkPointTracker.getCheckpoint()
    59. return null;
    60. }
    61. // 当worker退出的时候,会调用该函数,用户可以在此处做些清理工作。
    62. public void shutdown(ILogHubCheckPointTracker checkPointTracker)
    63. {
    64. //将消费断点保存到服务端。
    65. try {
    66. checkPointTracker.saveCheckPoint(true);
    67. } catch (LogHubCheckPointException e) {
    68. e.printStackTrace();
    69. }
    70. }
    71. }
  • 生成 ILogHubProcessor的工厂类 :
    1. public class SampleLogHubProcessorFactory implements ILogHubProcessorFactory
    2. {
    3. public ILogHubProcessor generatorProcessor()
    4. {
    5. // 生成一个消费实例
    6. return new SampleLogHubProcessor();
    7. }
    8. }

    配置说明:

  1. public class LogHubConfig
  2. {
  3. //worker默认的拉取数据的时间间隔
  4. public static final long DEFAULT_DATA_FETCH_INTERVAL_MS = 200;
  5. //consumer group的名字,不能为空,支持[a-z][0-9]和'_','-',长度在[3-63]字符,只能以小写字母和数字开头结尾
  6. private String mConsumerGroupName;
  7. //consumer的名字,必须确保同一个consumer group下面的各个consumer不重名
  8. private String mWorkerInstanceName;
  9. //loghub数据接口地址
  10. private String mLogHubEndPoint;
  11. //项目名称
  12. private String mProject;
  13. //日志库名称
  14. private String mLogStore;
  15. //云账号的access key id
  16. private String mAccessId;
  17. //云账号的access key
  18. private String mAccessKey;
  19. //用于指出在服务端没有记录shard的checkpoint的情况下应该从什么位置消费shard,如果服务端保存了有效的checkpoint信息,那么这些取值不起任何作用,mCursorPosition取值可以是[BEGIN_CURSOR, END_CURSOR, SPECIAL_TIMER_CURSOR]中的一个,BEGIN_CURSOR表示从shard中的第一条数据开始消费,END_CURSOR表示从shard中的当前时刻的最后一条数据开始消费,SPECIAL_TIMER_CURSOR和下面的mLoghubCursorStartTime配对使用,表示从特定的时刻开始消费数据。
  20. private LogHubCursorPosition mCursorPosition;
  21. //当mCursorPosition取值为SPECIAL_TIMER_CURSOR时,指定消费时间,单位是秒。
  22. private int mLoghubCursorStartTime = 0;
  23. // 轮询获取loghub数据的时间间隔,间隔越小,抓取越快,单位是毫秒,默认是DEFAULT_DATA_FETCH_INTERVAL_MS,建议时间间隔200ms以上。
  24. private long mDataFetchIntervalMillis;
  25. // worker向服务端汇报心跳的时间间隔,单位是毫秒,建议取值10000ms。
  26. private long mHeartBeatIntervalMillis;
  27. //是否按序消费
  28. private boolean mConsumeInOrder;
  29. }

常见问题&注意事项

  • LogHubConfig 中 consumerGroupName表一个消费组,consumerGroupName相同的consumer分摊消费logstore中的shard,同一个consumerGroupName中的consumer,通过workerInstance name进行区分。
    1. 假设logstore中有shard 0 ~ shard 3 这4个shard。
    2. 有3个worker,其consumerGroupName和workerinstance name分别是 :
    3. <consumer_group_name_1 , worker_A>,
    4. <consumer_group_name_1 , worker_B>,
    5. <consumer_group_name_2 , worker_C>
    6. 则,这些worker和shard的分配关系是:
    7. <consumer_group_name_1 , worker_A>: shard_0, shard_1
    8. <consumer_group_name_1 , worker_B>: shard_2, shard_3
    9. <consumer_group_name_2 , worker_C>: shard_0, shard_1, shard_2, shard_3 # group name不同的worker相互不影响
  • 确保实现的ILogHubProcessor process()接口每次都能顺利执行,并退出,这点很重要。
  • ILogHubCheckPointTracker的saveCheckPoint()接口,无论传递的参数是true,还是false,都表示当前处理的数据已经完成,参数为true,则立刻持久化至服务端,false则每隔60秒同步一次到服务端。
  • 如果LogHubConfig中配置的是子用户的accessKeyId、accessKey,需要在RAM中进行以下授权,详细内容请参考API文档
Action Resource
log:GetCursorOrData acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}
log:CreateConsumerGroup acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/*
log:ListConsumerGroup acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/*
log:ConsumerGroupUpdateCheckPoint acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName}
log:ConsumerGroupHeartBeat acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName}
log:GetConsumerGroupCheckPoint acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName}

最后更新:2016-12-16 11:55:42

  上一篇:go 消费日志__loghub-消费_用户指南_日志服务-阿里云
  下一篇:go 通过Storm消费__loghub-消费_用户指南_日志服务-阿里云