295
阿裏雲
通過Storm消費__loghub-消費_用戶指南_日誌服務-阿裏雲
日誌服務的Loghub提供了高效、可靠的日誌通道功能,用戶可以通過logtail、sdk等多種方式來實時收集日誌數據。之後,可以在spark stream、storm等各實時係統來消費寫入到Loghub中的數據。
為了降低Storm用戶消費Loghub的代價,我們提供了Loghub Storm spout來實時讀取Loghub數據。
基本結構和流程

- 上圖中紅色虛線框中就是提供的Loghub storm spout,每個storm topology會有一組spout,同組內的spout共同負責讀取logstore中全部數據。不同topology中的spout相互不幹擾。
- 每個topology需要選擇唯一的loghub consume group名字來相互標識,同一topology內的spout通過Loghub client lib來完成負載均衡和自動failover。
- spout從loghub中實時讀取數據之後,發送至topology中的bolt節點,定期保存消費完成位置作為check point到loghubserver端。
使用樣例
Spout使用示例(用於構建Topology)
public static void main( String[] args ){String mode = "Local"; // 使用本地測試模式String conumser_group_name = ""; // 每個topology需要設定唯一的consumer group名字,不能為空,支持[a-z][0-9]和'_','-',長度在[3-63]字符,隻能以小寫字母和數字開頭結尾String project = ""; // 日誌服務的projectString logstore = ""; // 日誌服務的logstoreString endpoint = ""; // 日誌服務訪問域名String access_id = ""; // 用戶ak信息String access_key = "";// 構建一個Loghub storm spout需要使用的配置LogHubSpoutConfig config = new LogHubSpoutConfig(conumser_group_name,endpoint, project, logstore, access_id,access_key, LogHubCursorPosition.END_CURSOR);TopologyBuilder builder = new TopologyBuilder();// 構建loghub storm spoutLogHubSpout spout = new LogHubSpout(config);// 在實際場景中,spout的個數可以和logstore shard個數相同builder.setSpout("spout", spout, 1);builder.setBolt("exclaim", new SampleBolt()).shuffleGrouping("spout");Config conf = new Config();conf.setDebug(false);conf.setMaxSpoutPending(1);// 如果使用 Kryo 進行數據的序列化和反序列化,則需要顯示設置LogGroupData的序列化方法 LogGroupDataSerializSerializerConfig.registerSerialization(conf, LogGroupData.class, LogGroupDataSerializSerializer.class);if (mode.equals("Local")) {logger.info("Local mode...");LocalCluster cluster = new LocalCluster();cluster.submitTopology("test-jstorm-spout", conf, builder.createTopology());try {Thread.sleep(6000 * 1000); //waiting for several minutes} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}cluster.killTopology("test-jstorm-spout");cluster.shutdown();} else if (mode.equals("Remote")) {logger.info("Remote mode...");conf.setNumWorkers(2);try {StormSubmitter.submitTopology("stt-jstorm-spout-4", conf, builder.createTopology());} catch (AlreadyAliveException e) {// TODO Auto-generated catch blocke.printStackTrace();} catch (InvalidTopologyException e) {// TODO Auto-generated catch blocke.printStackTrace();}} else {logger.error("invalid mode: " + mode);}}}
消費數據的bolt代碼樣例,隻打印每條日誌的內容
public class SampleBolt extends BaseRichBolt {private static final long serialVersionUID = 4752656887774402264L;private static final Logger logger = Logger.getLogger(BaseBasicBolt.class);private OutputCollector mCollector;@Overridepublic void prepare(@SuppressWarnings("rawtypes") Map stormConf, TopologyContext context,OutputCollector collector) {mCollector = collector;}@Overridepublic void execute(Tuple tuple) {String shardId = (String) tuple.getValueByField(LogHubSpout.FIELD_SHARD_ID);@SuppressWarnings("unchecked")List<LogGroupData> logGroupDatas = (ArrayList<LogGroupData>) tuple.getValueByField(LogHubSpout.FIELD_LOGGROUPS);for (LogGroupData groupData : logGroupDatas) {// 每個LogGroup由一條或多條日誌組成LogGroup logGroup = groupData.GetLogGroup();for (Log log : logGroup.getLogsList()) {StringBuilder sb = new StringBuilder();// 每條日誌,有一個時間字段, 以及多個Key:Value對,int log_time = log.getTime();sb.append("LogTime:").append(log_time);for (Content content : log.getContentsList()) {sb.append("t").append(content.getKey()).append(":").append(content.getValue());}logger.info(sb.toString());}}// 在Loghub spout 中,強製依賴storm的ack機製,用於確認spout將消息正確// 發送至bolt,所以在bolt中一定要調用ackmCollector.ack(tuple);}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {//do nothing}}
使用注意點
- 為了防止濫用,每個Logstore最多支持5個consumer group,對於不再使用的consumer group,可以使用java sdk中的DeleteConsumerGroup接口進行刪除
- Spout的個數最好和shard個數相同,否則可能會導致單個spout處理數據量過多而處理不過來。
- 如果單個shard的數據量還是太大,超過一個spout處理能,則可以使用shard split接口分裂shard,來降低每個shard的數據量
- 在Loghub spout 中,強製依賴storm的ack機製,用於確認spout將消息正確發送至bolt,所以在bolt中一定要調用ack進行確認
Maven
storm 1.0之前版本(如0.9.6),請使用:
<dependency><groupId>com.aliyun.openservices</groupId><artifactId>loghub-storm-spout</artifactId><version>0.6.3</version></dependency>
storm 1.0版本及以後,請使用:
<dependency><groupId>com.aliyun.openservices</groupId><artifactId>loghub-storm-1.0-spout-</artifactId><version>0.1.0</version></dependency>
最後更新:2016-11-24 11:23:47
上一篇:
多實例協同消費__loghub-消費_用戶指南_日誌服務-阿裏雲
下一篇:
通過StreamCompute消費__loghub-消費_用戶指南_日誌服務-阿裏雲
阿裏雲等4家企業違反《網絡安全法》被責令整改
步驟 3:安裝 WordPress__搭建 WordPress 網站_建站教程_雲服務器 ECS-阿裏雲
GetConfig__Logtail配置相關接口_API-Reference_日誌服務-阿裏雲
單行數據操作__Java-SDK_SDK 參考_表格存儲-阿裏雲
阿裏雲棲大會超詳幹貨分享!他宣布,3年內,將投入1000億搞研發
ForwardEntryItemType__數據類型_API 參考_雲服務器 ECS-阿裏雲
打開空的SQL窗口__使用SQL窗口_SQL操作_用戶指南(RDBMS)_數據管理-阿裏雲
通用錯誤碼__API-Reference_日誌服務-阿裏雲
分享文件__快速入門_對象存儲 OSS-阿裏雲
根據標簽檢索__標簽_用戶指南_雲服務器 ECS-阿裏雲
相關內容
常見錯誤說明__附錄_大數據計算服務-阿裏雲
發送短信接口__API使用手冊_短信服務-阿裏雲
接口文檔__Android_安全組件教程_移動安全-阿裏雲
運營商錯誤碼(聯通)__常見問題_短信服務-阿裏雲
設置短信模板__使用手冊_短信服務-阿裏雲
OSS 權限問題及排查__常見錯誤及排除_最佳實踐_對象存儲 OSS-阿裏雲
消息通知__操作指南_批量計算-阿裏雲
設備端快速接入(MQTT)__快速開始_阿裏雲物聯網套件-阿裏雲
查詢API調用流量數據__API管理相關接口_API_API 網關-阿裏雲
使用STS訪問__JavaScript-SDK_SDK 參考_對象存儲 OSS-阿裏雲