通過Storm消費__loghub-消費_用戶指南_日誌服務-阿裏雲
日誌服務的Loghub提供了高效、可靠的日誌通道功能,用戶可以通過logtail、sdk等多種方式來實時收集日誌數據。之後,可以在spark stream、storm等各實時係統來消費寫入到Loghub中的數據。
為了降低Storm用戶消費Loghub的代價,我們提供了Loghub Storm spout來實時讀取Loghub數據。
基本結構和流程
- 上圖中紅色虛線框中就是提供的Loghub storm spout,每個storm topology會有一組spout,同組內的spout共同負責讀取logstore中全部數據。不同topology中的spout相互不幹擾。
- 每個topology需要選擇唯一的loghub consume group名字來相互標識,同一topology內的spout通過Loghub client lib來完成負載均衡和自動failover。
- spout從loghub中實時讀取數據之後,發送至topology中的bolt節點,定期保存消費完成位置作為check point到loghubserver端。
使用樣例
Spout使用示例(用於構建Topology)
public static void main( String[] args )
{
String mode = "Local"; // 使用本地測試模式
String conumser_group_name = ""; // 每個topology需要設定唯一的consumer group名字,不能為空,支持[a-z][0-9]和'_','-',長度在[3-63]字符,隻能以小寫字母和數字開頭結尾
String project = ""; // 日誌服務的project
String logstore = ""; // 日誌服務的logstore
String endpoint = ""; // 日誌服務訪問域名
String access_id = ""; // 用戶ak信息
String access_key = "";
// 構建一個Loghub storm spout需要使用的配置
LogHubSpoutConfig config = new LogHubSpoutConfig(conumser_group_name,
endpoint, project, logstore, access_id,
access_key, LogHubCursorPosition.END_CURSOR);
TopologyBuilder builder = new TopologyBuilder();
// 構建loghub storm spout
LogHubSpout spout = new LogHubSpout(config);
// 在實際場景中,spout的個數可以和logstore shard個數相同
builder.setSpout("spout", spout, 1);
builder.setBolt("exclaim", new SampleBolt()).shuffleGrouping("spout");
Config conf = new Config();
conf.setDebug(false);
conf.setMaxSpoutPending(1);
// 如果使用 Kryo 進行數據的序列化和反序列化,則需要顯示設置LogGroupData的序列化方法 LogGroupDataSerializSerializer
Config.registerSerialization(conf, LogGroupData.class, LogGroupDataSerializSerializer.class);
if (mode.equals("Local")) {
logger.info("Local mode...");
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test-jstorm-spout", conf, builder.createTopology());
try {
Thread.sleep(6000 * 1000); //waiting for several minutes
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
cluster.killTopology("test-jstorm-spout");
cluster.shutdown();
} else if (mode.equals("Remote")) {
logger.info("Remote mode...");
conf.setNumWorkers(2);
try {
StormSubmitter.submitTopology("stt-jstorm-spout-4", conf, builder.createTopology());
} catch (AlreadyAliveException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InvalidTopologyException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} else {
logger.error("invalid mode: " + mode);
}
}
}
消費數據的bolt代碼樣例,隻打印每條日誌的內容
public class SampleBolt extends BaseRichBolt {
private static final long serialVersionUID = 4752656887774402264L;
private static final Logger logger = Logger.getLogger(BaseBasicBolt.class);
private OutputCollector mCollector;
@Override
public void prepare(@SuppressWarnings("rawtypes") Map stormConf, TopologyContext context,
OutputCollector collector) {
mCollector = collector;
}
@Override
public void execute(Tuple tuple) {
String shardId = (String) tuple
.getValueByField(LogHubSpout.FIELD_SHARD_ID);
@SuppressWarnings("unchecked")
List<LogGroupData> logGroupDatas = (ArrayList<LogGroupData>) tuple.getValueByField(LogHubSpout.FIELD_LOGGROUPS);
for (LogGroupData groupData : logGroupDatas) {
// 每個LogGroup由一條或多條日誌組成
LogGroup logGroup = groupData.GetLogGroup();
for (Log log : logGroup.getLogsList()) {
StringBuilder sb = new StringBuilder();
// 每條日誌,有一個時間字段, 以及多個Key:Value對,
int log_time = log.getTime();
sb.append("LogTime:").append(log_time);
for (Content content : log.getContentsList()) {
sb.append("t").append(content.getKey()).append(":")
.append(content.getValue());
}
logger.info(sb.toString());
}
}
// 在Loghub spout 中,強製依賴storm的ack機製,用於確認spout將消息正確
// 發送至bolt,所以在bolt中一定要調用ack
mCollector.ack(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
//do nothing
}
}
使用注意點
- 為了防止濫用,每個Logstore最多支持5個consumer group,對於不再使用的consumer group,可以使用java sdk中的DeleteConsumerGroup接口進行刪除
- Spout的個數最好和shard個數相同,否則可能會導致單個spout處理數據量過多而處理不過來。
- 如果單個shard的數據量還是太大,超過一個spout處理能,則可以使用shard split接口分裂shard,來降低每個shard的數據量
- 在Loghub spout 中,強製依賴storm的ack機製,用於確認spout將消息正確發送至bolt,所以在bolt中一定要調用ack進行確認
Maven
storm 1.0之前版本(如0.9.6),請使用:
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>loghub-storm-spout</artifactId>
<version>0.6.3</version>
</dependency>
storm 1.0版本及以後,請使用:
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>loghub-storm-1.0-spout-</artifactId>
<version>0.1.0</version>
</dependency>
最後更新:2016-11-24 11:23:47
上一篇:
多實例協同消費__loghub-消費_用戶指南_日誌服務-阿裏雲
下一篇:
通過StreamCompute消費__loghub-消費_用戶指南_日誌服務-阿裏雲
阿裏雲等4家企業違反《網絡安全法》被責令整改
步驟 3:安裝 WordPress__搭建 WordPress 網站_建站教程_雲服務器 ECS-阿裏雲
GetConfig__Logtail配置相關接口_API-Reference_日誌服務-阿裏雲
單行數據操作__Java-SDK_SDK 參考_表格存儲-阿裏雲
阿裏雲棲大會超詳幹貨分享!他宣布,3年內,將投入1000億搞研發
ForwardEntryItemType__數據類型_API 參考_雲服務器 ECS-阿裏雲
打開空的SQL窗口__使用SQL窗口_SQL操作_用戶指南(RDBMS)_數據管理-阿裏雲
通用錯誤碼__API-Reference_日誌服務-阿裏雲
分享文件__快速入門_對象存儲 OSS-阿裏雲
根據標簽檢索__標簽_用戶指南_雲服務器 ECS-阿裏雲
相關內容
常見錯誤說明__附錄_大數據計算服務-阿裏雲
發送短信接口__API使用手冊_短信服務-阿裏雲
接口文檔__Android_安全組件教程_移動安全-阿裏雲
運營商錯誤碼(聯通)__常見問題_短信服務-阿裏雲
設置短信模板__使用手冊_短信服務-阿裏雲
OSS 權限問題及排查__常見錯誤及排除_最佳實踐_對象存儲 OSS-阿裏雲
消息通知__操作指南_批量計算-阿裏雲
設備端快速接入(MQTT)__快速開始_阿裏雲物聯網套件-阿裏雲
查詢API調用流量數據__API管理相關接口_API_API 網關-阿裏雲
使用STS訪問__JavaScript-SDK_SDK 參考_對象存儲 OSS-阿裏雲