閱讀295 返回首頁    go 阿裏雲


通過Storm消費__loghub-消費_用戶指南_日誌服務-阿裏雲

日誌服務的Loghub提供了高效、可靠的日誌通道功能,用戶可以通過logtail、sdk等多種方式來實時收集日誌數據。之後,可以在spark stream、storm等各實時係統來消費寫入到Loghub中的數據。

為了降低Storm用戶消費Loghub的代價,我們提供了Loghub Storm spout來實時讀取Loghub數據。

基本結構和流程

  • 上圖中紅色虛線框中就是提供的Loghub storm spout,每個storm topology會有一組spout,同組內的spout共同負責讀取logstore中全部數據。不同topology中的spout相互不幹擾。
  • 每個topology需要選擇唯一的loghub consume group名字來相互標識,同一topology內的spout通過Loghub client lib來完成負載均衡和自動failover。
  • spout從loghub中實時讀取數據之後,發送至topology中的bolt節點,定期保存消費完成位置作為check point到loghubserver端。

使用樣例

Spout使用示例(用於構建Topology)

  1. public static void main( String[] args )
  2. {
  3. String mode = "Local"; // 使用本地測試模式
  4. String conumser_group_name = ""; // 每個topology需要設定唯一的consumer group名字,不能為空,支持[a-z][0-9]和'_','-',長度在[3-63]字符,隻能以小寫字母和數字開頭結尾
  5. String project = ""; // 日誌服務的project
  6. String logstore = ""; // 日誌服務的logstore
  7. String endpoint = ""; // 日誌服務訪問域名
  8. String access_id = ""; // 用戶ak信息
  9. String access_key = "";
  10. // 構建一個Loghub storm spout需要使用的配置
  11. LogHubSpoutConfig config = new LogHubSpoutConfig(conumser_group_name,
  12. endpoint, project, logstore, access_id,
  13. access_key, LogHubCursorPosition.END_CURSOR);
  14. TopologyBuilder builder = new TopologyBuilder();
  15. // 構建loghub storm spout
  16. LogHubSpout spout = new LogHubSpout(config);
  17. // 在實際場景中,spout的個數可以和logstore shard個數相同
  18. builder.setSpout("spout", spout, 1);
  19. builder.setBolt("exclaim", new SampleBolt()).shuffleGrouping("spout");
  20. Config conf = new Config();
  21. conf.setDebug(false);
  22. conf.setMaxSpoutPending(1);
  23. // 如果使用 Kryo 進行數據的序列化和反序列化,則需要顯示設置LogGroupData的序列化方法 LogGroupDataSerializSerializer
  24. Config.registerSerialization(conf, LogGroupData.class, LogGroupDataSerializSerializer.class);
  25. if (mode.equals("Local")) {
  26. logger.info("Local mode...");
  27. LocalCluster cluster = new LocalCluster();
  28. cluster.submitTopology("test-jstorm-spout", conf, builder.createTopology());
  29. try {
  30. Thread.sleep(6000 * 1000); //waiting for several minutes
  31. } catch (InterruptedException e) {
  32. // TODO Auto-generated catch block
  33. e.printStackTrace();
  34. }
  35. cluster.killTopology("test-jstorm-spout");
  36. cluster.shutdown();
  37. } else if (mode.equals("Remote")) {
  38. logger.info("Remote mode...");
  39. conf.setNumWorkers(2);
  40. try {
  41. StormSubmitter.submitTopology("stt-jstorm-spout-4", conf, builder.createTopology());
  42. } catch (AlreadyAliveException e) {
  43. // TODO Auto-generated catch block
  44. e.printStackTrace();
  45. } catch (InvalidTopologyException e) {
  46. // TODO Auto-generated catch block
  47. e.printStackTrace();
  48. }
  49. } else {
  50. logger.error("invalid mode: " + mode);
  51. }
  52. }
  53. }

消費數據的bolt代碼樣例,隻打印每條日誌的內容

  1. public class SampleBolt extends BaseRichBolt {
  2. private static final long serialVersionUID = 4752656887774402264L;
  3. private static final Logger logger = Logger.getLogger(BaseBasicBolt.class);
  4. private OutputCollector mCollector;
  5. @Override
  6. public void prepare(@SuppressWarnings("rawtypes") Map stormConf, TopologyContext context,
  7. OutputCollector collector) {
  8. mCollector = collector;
  9. }
  10. @Override
  11. public void execute(Tuple tuple) {
  12. String shardId = (String) tuple
  13. .getValueByField(LogHubSpout.FIELD_SHARD_ID);
  14. @SuppressWarnings("unchecked")
  15. List<LogGroupData> logGroupDatas = (ArrayList<LogGroupData>) tuple.getValueByField(LogHubSpout.FIELD_LOGGROUPS);
  16. for (LogGroupData groupData : logGroupDatas) {
  17. // 每個LogGroup由一條或多條日誌組成
  18. LogGroup logGroup = groupData.GetLogGroup();
  19. for (Log log : logGroup.getLogsList()) {
  20. StringBuilder sb = new StringBuilder();
  21. // 每條日誌,有一個時間字段, 以及多個Key:Value對,
  22. int log_time = log.getTime();
  23. sb.append("LogTime:").append(log_time);
  24. for (Content content : log.getContentsList()) {
  25. sb.append("t").append(content.getKey()).append(":")
  26. .append(content.getValue());
  27. }
  28. logger.info(sb.toString());
  29. }
  30. }
  31. // 在Loghub spout 中,強製依賴storm的ack機製,用於確認spout將消息正確
  32. // 發送至bolt,所以在bolt中一定要調用ack
  33. mCollector.ack(tuple);
  34. }
  35. @Override
  36. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  37. //do nothing
  38. }
  39. }

使用注意點

  • 為了防止濫用,每個Logstore最多支持5個consumer group,對於不再使用的consumer group,可以使用java sdk中的DeleteConsumerGroup接口進行刪除
  • Spout的個數最好和shard個數相同,否則可能會導致單個spout處理數據量過多而處理不過來。
  • 如果單個shard的數據量還是太大,超過一個spout處理能,則可以使用shard split接口分裂shard,來降低每個shard的數據量
  • 在Loghub spout 中,強製依賴storm的ack機製,用於確認spout將消息正確發送至bolt,所以在bolt中一定要調用ack進行確認

Maven

storm 1.0之前版本(如0.9.6),請使用:

  1. <dependency>
  2. <groupId>com.aliyun.openservices</groupId>
  3. <artifactId>loghub-storm-spout</artifactId>
  4. <version>0.6.3</version>
  5. </dependency>

storm 1.0版本及以後,請使用:

  1. <dependency>
  2. <groupId>com.aliyun.openservices</groupId>
  3. <artifactId>loghub-storm-1.0-spout-</artifactId>
  4. <version>0.1.0</version>
  5. </dependency>

最後更新:2016-11-24 11:23:47

  上一篇:go 多實例協同消費__loghub-消費_用戶指南_日誌服務-阿裏雲
  下一篇:go 通過StreamCompute消費__loghub-消費_用戶指南_日誌服務-阿裏雲