295
阿里云
通过Storm消费__loghub-消费_用户指南_日志服务-阿里云
日志服务的Loghub提供了高效、可靠的日志通道功能,用户可以通过logtail、sdk等多种方式来实时收集日志数据。之后,可以在spark stream、storm等各实时系统来消费写入到Loghub中的数据。
为了降低Storm用户消费Loghub的代价,我们提供了Loghub Storm spout来实时读取Loghub数据。
基本结构和流程
- 上图中红色虚线框中就是提供的Loghub storm spout,每个storm topology会有一组spout,同组内的spout共同负责读取logstore中全部数据。不同topology中的spout相互不干扰。
- 每个topology需要选择唯一的loghub consume group名字来相互标识,同一topology内的spout通过Loghub client lib来完成负载均衡和自动failover。
- spout从loghub中实时读取数据之后,发送至topology中的bolt节点,定期保存消费完成位置作为check point到loghubserver端。
使用样例
Spout使用示例(用于构建Topology)
public static void main( String[] args )
{
String mode = "Local"; // 使用本地测试模式
String conumser_group_name = ""; // 每个topology需要设定唯一的consumer group名字,不能为空,支持[a-z][0-9]和'_','-',长度在[3-63]字符,只能以小写字母和数字开头结尾
String project = ""; // 日志服务的project
String logstore = ""; // 日志服务的logstore
String endpoint = ""; // 日志服务访问域名
String access_id = ""; // 用户ak信息
String access_key = "";
// 构建一个Loghub storm spout需要使用的配置
LogHubSpoutConfig config = new LogHubSpoutConfig(conumser_group_name,
endpoint, project, logstore, access_id,
access_key, LogHubCursorPosition.END_CURSOR);
TopologyBuilder builder = new TopologyBuilder();
// 构建loghub storm spout
LogHubSpout spout = new LogHubSpout(config);
// 在实际场景中,spout的个数可以和logstore shard个数相同
builder.setSpout("spout", spout, 1);
builder.setBolt("exclaim", new SampleBolt()).shuffleGrouping("spout");
Config conf = new Config();
conf.setDebug(false);
conf.setMaxSpoutPending(1);
// 如果使用 Kryo 进行数据的序列化和反序列化,则需要显示设置LogGroupData的序列化方法 LogGroupDataSerializSerializer
Config.registerSerialization(conf, LogGroupData.class, LogGroupDataSerializSerializer.class);
if (mode.equals("Local")) {
logger.info("Local mode...");
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test-jstorm-spout", conf, builder.createTopology());
try {
Thread.sleep(6000 * 1000); //waiting for several minutes
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
cluster.killTopology("test-jstorm-spout");
cluster.shutdown();
} else if (mode.equals("Remote")) {
logger.info("Remote mode...");
conf.setNumWorkers(2);
try {
StormSubmitter.submitTopology("stt-jstorm-spout-4", conf, builder.createTopology());
} catch (AlreadyAliveException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InvalidTopologyException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} else {
logger.error("invalid mode: " + mode);
}
}
}
消费数据的bolt代码样例,只打印每条日志的内容
public class SampleBolt extends BaseRichBolt {
private static final long serialVersionUID = 4752656887774402264L;
private static final Logger logger = Logger.getLogger(BaseBasicBolt.class);
private OutputCollector mCollector;
@Override
public void prepare(@SuppressWarnings("rawtypes") Map stormConf, TopologyContext context,
OutputCollector collector) {
mCollector = collector;
}
@Override
public void execute(Tuple tuple) {
String shardId = (String) tuple
.getValueByField(LogHubSpout.FIELD_SHARD_ID);
@SuppressWarnings("unchecked")
List<LogGroupData> logGroupDatas = (ArrayList<LogGroupData>) tuple.getValueByField(LogHubSpout.FIELD_LOGGROUPS);
for (LogGroupData groupData : logGroupDatas) {
// 每个LogGroup由一条或多条日志组成
LogGroup logGroup = groupData.GetLogGroup();
for (Log log : logGroup.getLogsList()) {
StringBuilder sb = new StringBuilder();
// 每条日志,有一个时间字段, 以及多个Key:Value对,
int log_time = log.getTime();
sb.append("LogTime:").append(log_time);
for (Content content : log.getContentsList()) {
sb.append("t").append(content.getKey()).append(":")
.append(content.getValue());
}
logger.info(sb.toString());
}
}
// 在Loghub spout 中,强制依赖storm的ack机制,用于确认spout将消息正确
// 发送至bolt,所以在bolt中一定要调用ack
mCollector.ack(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
//do nothing
}
}
使用注意点
- 为了防止滥用,每个Logstore最多支持5个consumer group,对于不再使用的consumer group,可以使用java sdk中的DeleteConsumerGroup接口进行删除
- Spout的个数最好和shard个数相同,否则可能会导致单个spout处理数据量过多而处理不过来。
- 如果单个shard的数据量还是太大,超过一个spout处理能,则可以使用shard split接口分裂shard,来降低每个shard的数据量
- 在Loghub spout 中,强制依赖storm的ack机制,用于确认spout将消息正确发送至bolt,所以在bolt中一定要调用ack进行确认
Maven
storm 1.0之前版本(如0.9.6),请使用:
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>loghub-storm-spout</artifactId>
<version>0.6.3</version>
</dependency>
storm 1.0版本及以后,请使用:
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>loghub-storm-1.0-spout-</artifactId>
<version>0.1.0</version>
</dependency>
最后更新:2016-11-24 11:23:47
上一篇:
多实例协同消费__loghub-消费_用户指南_日志服务-阿里云
下一篇:
通过StreamCompute消费__loghub-消费_用户指南_日志服务-阿里云
阿里云等4家企业违反《网络安全法》被责令整改
步骤 3:安装 WordPress__搭建 WordPress 网站_建站教程_云服务器 ECS-阿里云
GetConfig__Logtail配置相关接口_API-Reference_日志服务-阿里云
单行数据操作__Java-SDK_SDK 参考_表格存储-阿里云
阿里云栖大会超详干货分享!他宣布,3年内,将投入1000亿搞研发
ForwardEntryItemType__数据类型_API 参考_云服务器 ECS-阿里云
打开空的SQL窗口__使用SQL窗口_SQL操作_用户指南(RDBMS)_数据管理-阿里云
通用错误码__API-Reference_日志服务-阿里云
分享文件__快速入门_对象存储 OSS-阿里云
根据标签检索__标签_用户指南_云服务器 ECS-阿里云
相关内容
常见错误说明__附录_大数据计算服务-阿里云
发送短信接口__API使用手册_短信服务-阿里云
接口文档__Android_安全组件教程_移动安全-阿里云
运营商错误码(联通)__常见问题_短信服务-阿里云
设置短信模板__使用手册_短信服务-阿里云
OSS 权限问题及排查__常见错误及排除_最佳实践_对象存储 OSS-阿里云
消息通知__操作指南_批量计算-阿里云
设备端快速接入(MQTT)__快速开始_阿里云物联网套件-阿里云
查询API调用流量数据__API管理相关接口_API_API 网关-阿里云
使用STS访问__JavaScript-SDK_SDK 参考_对象存储 OSS-阿里云