多实例协同消费__loghub-消费_用户指南_日志服务-阿里云
LogHub Consumer Library使用说明
控制台查看消费进度
具体查看文档说明。Spark Streaming,Storm Spout默认已经使Consumer Library消费LogHub数据。
使用场景
LogHub Consumer Library是对LogHub消费者提供的高级模式,解决多个消费者同时消费logstore时自动分配shard问题。例如在storm、spark场景中多个消费者情况下,自动处理shard的负载均衡、消费者failover等逻辑。用户只需专注在自己业务逻辑上,而无需关心shard分配、CheckPoint、Failover等事宜。
举一个例子而言,用户需要通过storm进行流计算,启动了A、B、C 3个消费实例。在有10个shard情况下,系统会自动为A、B、C分配3、3、4个Shard进行消费。
- 当消费实例A宕机情况下,系统会把A未消费的3个Shard中数据自动均衡B、C上,当A恢复后,会重新均衡。
- 当添加实例D、E情况下,系统会自动进行均衡,每个实例消费2个Shard。
- 当Shard有Merge/Split等情况下,会根据最新的Shard信息,重新均衡。
- 当read only状态的shard消费完之后,剩余的shard会重新做负载均衡。
以上整个过程不会产生数据丢失、以及重复,用户只需在代码中做三件事情:
- 填写配置参数。
- 写处理日志的代码。
- 启动消费实例。
我们强烈建议使用loghub consumer library进行数据消费,这样您只需要关心怎么处理数据,而不需要关注复杂的负载均衡、消费断点保存、按序消费、消费异常处理等问题。
术语简介
loghub consumer library中主要有4个概念,分别是consumer group、consumer、heartbeat和checkpoint,它们之间的关系如下:
- consumer group
是logstore的子资源,拥有相同consumer group 名字的消费者共同消费同一个logstore的所有数据,这些消费者之间不会重复消费数据,一个logstore下面可以最多创建5个consumer group,不可以重名,同一个logstore下面的consumer group之间消费数据不会互相影响。consumer group有两个很重要的属性:
{"order":boolean,"timeout": integer}
order属性表示是否按照写入时间顺序消费key相同的数据,timeout表示consumer group中消费者的超时时间,单位是秒,当一个消费者汇报心跳的时间间隔超过timeout,会被认为已经超时,服务端认为这个consumer此时已经下线了。
- consumer
消费者,每个consumer上会被分配若干个shard,consumer的职责就是要消费这些shard上的数据,同一个consumer group中的consumer必须不重名。
- heartbeat
消费者心跳,consumer需要定期向服务端汇报一个心跳包,用于表明自己还处于存活状态。
- checkpoint
消费者定期将分配给自己的shard消费到的位置保存到服务端,这样当这个shard被分配给其它消费者时,从服务端可以获取shard的消费断点,接着从断点继续消费数据。
如何使用loghub consumer library
- 实现loghub consumer library中的两个接口类:
- ILogHubProcessor // 每个shard对应一个实例,每个实例只消费特定shard的数据。
- ILogHubProcessorFactory // 负责生产实现ILogHubProcessor接口实例。
- 填写参数配置。
- 启动一个或多个client worker实例。
maven地址
<dependency><groupId>com.aliyun.openservices</groupId><artifactId>loghub-client-lib</artifactId><version>0.6.5</version></dependency>
使用sample
main函数
public static void main(String args[]){LogHubConfig config = new LogHubConfig(...);ClientWorker worker = new ClientWorker(new SampleLogHubProcessorFactory(), config);Thread thread = new Thread(worker);//thread运行之后,client worker会自动运行,ClientWorker扩展了Runnable接口。thread.start();Thread.sleep(60 * 60 * 1000);//调用worker的shutdown函数,退出消费实例,关联的线程也会自动停止。worker.shutdown();//ClientWorker运行过程中会生成多个异步的Task,shutdown之后最好等待还在执行的Task安全退出,建议30s。Thread.sleep(30 * 1000);}
ILogHubProcessor、ILogHubProcessorFactory 实现sample
各个shard对应的消费实例类,实际开发过程中用户主要需要关注数据消费逻辑,同一个ClientWorker实例是串行消费数据的,只会产生一个ILogHubProcessor实例,ClientWorker退出的时候会调用ILogHubProcessor的shutdown函数。
public class SampleLogHubProcessor implements ILogHubProcessor{private int mShardId;// 记录上次持久化check point的时间private long mLastCheckTime = 0;public void initialize(int shardId){mShardId = shardId;}// 消费数据的主逻辑public String process(List<LogGroupData> logGroups,ILogHubCheckPointTracker checkPointTracker){for(LogGroupData logGroup: logGroups){LogGroup lg = logGroup.GetLogGroup();System.out.println("source ip:" + lg.getSource());System.out.println("topic: " + lg.getTopic());for(Log log: lg.getLogsList()){StringBuilder content = new StringBuilder();content.append(log.getTime() + "t");for(Content cont: log.getContentsList()){content.append(cont.getKey() + "=" + cont.getValue()+ "t");}System.out.println(content.toString());}}long curTime = System.currentTimeMillis();// 每隔60秒,写一次check point到服务端,如果60秒内,worker crash,// 新启动的worker会从上一个checkpoint其消费数据,有可能有重复数据if (curTime - mLastCheckTime > 60 * 1000){try{//参数true表示立即将checkpoint更新到服务端,为false会将checkpoint缓存在本地,默认隔60s//后台会将checkpoint刷新到服务端。checkPointTracker.saveCheckPoint(true);}catch (LogHubCheckPointException e){e.printStackTrace();}mLastCheckTime = curTime;}else{try{checkPointTracker.saveCheckPoint(false);}catch (LogHubCheckPointException e){e.printStackTrace();}}// 返回空表示正常处理数据, 如果需要回滚到上个check point的点进行重试的话,可以return checkPointTracker.getCheckpoint()return null;}// 当worker退出的时候,会调用该函数,用户可以在此处做些清理工作。public void shutdown(ILogHubCheckPointTracker checkPointTracker){//将消费断点保存到服务端。try {checkPointTracker.saveCheckPoint(true);} catch (LogHubCheckPointException e) {e.printStackTrace();}}}
- 生成 ILogHubProcessor的工厂类 :
public class SampleLogHubProcessorFactory implements ILogHubProcessorFactory{public ILogHubProcessor generatorProcessor(){// 生成一个消费实例return new SampleLogHubProcessor();}}
配置说明:
public class LogHubConfig{//worker默认的拉取数据的时间间隔public static final long DEFAULT_DATA_FETCH_INTERVAL_MS = 200;//consumer group的名字,不能为空,支持[a-z][0-9]和'_','-',长度在[3-63]字符,只能以小写字母和数字开头结尾private String mConsumerGroupName;//consumer的名字,必须确保同一个consumer group下面的各个consumer不重名private String mWorkerInstanceName;//loghub数据接口地址private String mLogHubEndPoint;//项目名称private String mProject;//日志库名称private String mLogStore;//云账号的access key idprivate String mAccessId;//云账号的access keyprivate String mAccessKey;//用于指出在服务端没有记录shard的checkpoint的情况下应该从什么位置消费shard,如果服务端保存了有效的checkpoint信息,那么这些取值不起任何作用,mCursorPosition取值可以是[BEGIN_CURSOR, END_CURSOR, SPECIAL_TIMER_CURSOR]中的一个,BEGIN_CURSOR表示从shard中的第一条数据开始消费,END_CURSOR表示从shard中的当前时刻的最后一条数据开始消费,SPECIAL_TIMER_CURSOR和下面的mLoghubCursorStartTime配对使用,表示从特定的时刻开始消费数据。private LogHubCursorPosition mCursorPosition;//当mCursorPosition取值为SPECIAL_TIMER_CURSOR时,指定消费时间,单位是秒。private int mLoghubCursorStartTime = 0;// 轮询获取loghub数据的时间间隔,间隔越小,抓取越快,单位是毫秒,默认是DEFAULT_DATA_FETCH_INTERVAL_MS,建议时间间隔200ms以上。private long mDataFetchIntervalMillis;// worker向服务端汇报心跳的时间间隔,单位是毫秒,建议取值10000ms。private long mHeartBeatIntervalMillis;//是否按序消费private boolean mConsumeInOrder;}
常见问题&注意事项
- LogHubConfig 中 consumerGroupName表一个消费组,consumerGroupName相同的consumer分摊消费logstore中的shard,同一个consumerGroupName中的consumer,通过workerInstance name进行区分。
假设logstore中有shard 0 ~ shard 3 这4个shard。有3个worker,其consumerGroupName和workerinstance name分别是 :<consumer_group_name_1 , worker_A>,<consumer_group_name_1 , worker_B>,<consumer_group_name_2 , worker_C>则,这些worker和shard的分配关系是:<consumer_group_name_1 , worker_A>: shard_0, shard_1<consumer_group_name_1 , worker_B>: shard_2, shard_3<consumer_group_name_2 , worker_C>: shard_0, shard_1, shard_2, shard_3 # group name不同的worker相互不影响
- 确保实现的ILogHubProcessor process()接口每次都能顺利执行,并退出,这点很重要。
- ILogHubCheckPointTracker的saveCheckPoint()接口,无论传递的参数是true,还是false,都表示当前处理的数据已经完成,参数为true,则立刻持久化至服务端,false则每隔60秒同步一次到服务端。
- 如果LogHubConfig中配置的是子用户的accessKeyId、accessKey,需要在RAM中进行以下授权,详细内容请参考API文档:
| Action | Resource |
|---|---|
| log:GetCursorOrData | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName} |
| log:CreateConsumerGroup | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/* |
| log:ListConsumerGroup | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/* |
| log:ConsumerGroupUpdateCheckPoint | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName} |
| log:ConsumerGroupHeartBeat | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName} |
| log:GetConsumerGroupCheckPoint | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName} |
最后更新:2016-12-16 11:55:42
上一篇:
消费日志__loghub-消费_用户指南_日志服务-阿里云
下一篇:
通过Storm消费__loghub-消费_用户指南_日志服务-阿里云
任务管理__运维中心手册_用户操作指南_大数据开发套件-阿里云
迁移 RDS for PostgreSQL 数据到本地 PostgreSQL__数据迁移_用户指南_云数据库 RDS 版-阿里云
CreateLoadBalancerUDPListener__Listener相关API_API 参考_负载均衡-阿里云
包年包月__计费方式_购买指南_对象存储 OSS-阿里云
查询UV数据__资源监控接口_API 手册_CDN-阿里云
跟着Demo快速体验移动推送__快速开始_移动推送-阿里云
云服务器 ECS复制镜像
采集-日志管理__最佳实践_日志服务-阿里云
日志服务监控指标__常见问题_日志服务-阿里云
线上培训课程介绍__线上培训课程_上云培训-阿里云
相关内容
常见错误说明__附录_大数据计算服务-阿里云
发送短信接口__API使用手册_短信服务-阿里云
接口文档__Android_安全组件教程_移动安全-阿里云
运营商错误码(联通)__常见问题_短信服务-阿里云
设置短信模板__使用手册_短信服务-阿里云
OSS 权限问题及排查__常见错误及排除_最佳实践_对象存储 OSS-阿里云
消息通知__操作指南_批量计算-阿里云
设备端快速接入(MQTT)__快速开始_阿里云物联网套件-阿里云
查询API调用流量数据__API管理相关接口_API_API 网关-阿里云
使用STS访问__JavaScript-SDK_SDK 参考_对象存储 OSS-阿里云