閱讀307 返回首頁    go iPhone_iPad_Mac_手機_平板_蘋果apple


producer-lib__loghub-采集_用戶指南_日誌服務-阿裏雲

LogHub Producer Library是針對Java應用程序高並發寫LogHub類庫,Producer Library和Consumer Library是對LogHub讀寫包裝,降低數據收集與消費的門檻。

LogHub Producer Library解決的問題:

  1. 客戶端日誌不落盤:既數據產生後直接通過網絡發往服務端。
  2. 客戶端高並發寫入:例如一秒鍾會有百次以上寫操作。
  3. 客戶端計算與IO邏輯分離:打日誌不影響計算耗時。

在以上場景中,Producer Library會簡化你程序開發的代價,幫助你批量聚合寫請求,通過異步的方式發往LogHub服務端。在整個過程中,用戶可以配置批量聚合的參數,服務端異常處理的邏輯等。

0c5e22da184eec0f93979cec8ff159394b1143e0

以上各種接入方式的對比:

接入方式 優點/缺點 針對場景
日誌落盤+Logtail 日誌收集與打日誌解耦,無需修改代碼 常用場景
Syslog + Logtail 性能較好(80MB/S),日誌不落盤,需支持syslog協議 Syslog場景
SDK直發 不落盤,直接發往服務端,需要處理好網絡IO與程序IO之間的切換 日誌不落盤
Producer Library 不落盤,異步合並發送服務端,吞吐量較好 日誌不落盤,客戶端QPS高

(目前Producer Library隻支持Java 版本,其他語言待開發)

LogHub Producer Library功能

  1. 提供異步的發送接口,線程安全。
  2. 可以添加多個project的配置。
  3. 用於發送的網絡IO線程數量可以配置。
  4. merge成的包的日誌數量以及大小都可以配置。
  5. 內存使用可控,當內存使用達到用戶配置的閾值時,producer的send接口會阻塞,直到有空閑的內存可用。

使用方法

producer使用分為以下幾個步驟:

step 1: maven工程中添加依賴:

  1. <dependency>
  2. <groupId>com.aliyun.openservices</groupId>
  3. <artifactId>log-loghub-producer</artifactId>
  4. <version>0.1.4</version>
  5. </dependency>

step 2:程序中配置ProducerConfig,其中各個參數說明如下。

  1. public class ProducerConfig
  2. {
  3. //被緩存起來的日誌的發送超時時間,如果緩存超時,則會被立即發送,單位是毫秒
  4. public int packageTimeoutInMS = 3000;
  5. //每個緩存的日誌包中包含日誌數量的最大值,不能超過4096
  6. public int logsCountPerPackage = 4096;
  7. //每個緩存的日誌包的大小的上限,不能超過5MB,單位是字節
  8. public int logsBytesPerPackage = 5 * 1024 * 1024;
  9. //單個producer實例可以使用的內存的上限,單位是字節
  10. public int memPoolSizeInByte = 1000 * 1024 * 1024;
  11. //IO線程池最大線程數量,主要用於發送數據到日誌服務
  12. public int maxIOThreadSizeInPool = 50;
  13. //當使用指定shardhash的方式發送日誌時,這個參數需要被設置,否則不需要關心。後端merge線程會將映射到同一個shard的數據merge在一起,而shard關聯的是一個hash區間,
  14. //producer在處理時會將用戶傳入的hash映射成shard關聯hash區間的最小值。每一個shard關聯的hash區間,producer會定時從從loghub拉取,該參數的含義是每隔shardHashUpdateIntervalInMS毫秒,
  15. //更新一次shard的hash區間。
  16. public int shardHashUpdateIntervalInMS = 10 * 60 * 1000;
  17. //如果發送失敗,重試的次數,如果超過該值,就會將異常作為callback的參數,交由用戶處理。
  18. public int retryTimes = 3;
  19. }

step 3:繼承ILogCallback,callback主要用於日誌發送結果的處理,結果包括發送成功和發生異常。用戶也可以選擇不處理,這樣就不需要繼承ILogCallback。

step 4:創建producer實例,調用send接口發數據。

下麵是一個完整的示例。

示例

main:

  1. public class ProducerSample {
  2. private final static int ThreadsCount = 25;
  3. public static String RandomString(int length) {
  4. String str = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
  5. Random random = new Random();
  6. StringBuffer buf = new StringBuffer();
  7. for (int i = 0; i < length; i++) {
  8. int num = random.nextInt(62);
  9. buf.append(str.charAt(num));
  10. }
  11. return buf.toString();
  12. }
  13. public static void main(String args[]) throws InterruptedException {
  14. ProducerConfig producerConfig = new ProducerConfig();
  15. //使用默認配置創建producer實例
  16. final LogProducer producer = new LogProducer(producerConfig);
  17. // 添加多個project配置
  18. producer.setProjectConfig(new ProjectConfig("your project 1",
  19. "endpoint", "your accesskey id", "your accesskey"));
  20. producer.setProjectConfig(new ProjectConfig("your project 2",
  21. "endpoint", "your accesskey id", "your accesskey",
  22. "your sts token"));
  23. // 更新project 1的配置
  24. producer.setProjectConfig(new ProjectConfig("your project 1",
  25. "endpoint", "your new accesskey id", "your new accesskey"));
  26. // 刪除project 2的配置
  27. producer.removeProjectConfig("your project 2");
  28. // 生成日誌集合,用於測試
  29. final Vector<Vector<LogItem>> logGroups = new Vector<Vector<LogItem>>();
  30. for (int i = 0; i < 100000; ++i) {
  31. Vector<LogItem> tmpLogGroup = new Vector<LogItem>();
  32. LogItem logItem = new LogItem((int) (new Date().getTime() / 1000));
  33. logItem.PushBack("level", "info" + System.currentTimeMillis());
  34. logItem.PushBack("message", "test producer send perf "
  35. + RandomString(50));
  36. logItem.PushBack("method", "SenderToServer " + RandomString(10));
  37. tmpLogGroup.add(logItem);
  38. logGroups.add(tmpLogGroup);
  39. }
  40. // 並發調用send發送日誌
  41. Thread[] threads = new Thread[ThreadsCount];
  42. for (int i = 0; i < ThreadsCount; ++i) {
  43. threads[i] = new Thread(null, new Runnable() {
  44. Random random = new Random();
  45. public void run() {
  46. int j = 0, rand = random.nextInt(99999);
  47. while (++j < Integer.MAX_VALUE) {
  48. producer.send("project 1", "logstore 1", "topic",
  49. "source ip", logGroups.get(rand),
  50. new CallbackSample("project 1", "logstore 1", "topic", "source ip", null, logGroups.get(rand), producer));
  51. }
  52. }
  53. }, i + "");
  54. threads[i].start();
  55. }
  56. //等待發送線程退出
  57. Thread.sleep(1 * 60 * 60 * 1000);
  58. //主動刷新緩存起來的還沒有被發送的日誌
  59. producer.flush();
  60. //關閉後台io線程,close會將調用時刻內存中緩存的數據發送出去
  61. producer.close();
  62. }
  63. }

callback:

  1. public class CallbackSample extends ILogCallback {
  2. //保存要發送的數據,當時發生異常時,進行重試
  3. public String project;
  4. public String logstore;
  5. public String topic;
  6. public String shardHash;
  7. public String source;
  8. public Vector<LogItem> items;
  9. public LogProducer producer;
  10. public int retryTimes = 0;
  11. public CallbackSample(String project, String logstore, String topic,
  12. String shardHash, String source, Vector<LogItem> items, LogProducer producer) {
  13. super();
  14. this.project = project;
  15. this.logstore = logstore;
  16. this.topic = topic;
  17. this.shardHash = shardHash;
  18. this.source = source;
  19. this.items = items;
  20. this.producer = producer;
  21. }
  22. public void onCompletion(PutLogsResponse response, LogException e) {
  23. if (e != null) {
  24. // 打印異常
  25. System.out.println(e.GetErrorCode() + ", " + e.GetErrorMessage() + ", " + e.GetRequestId());
  26. //最多重試三次
  27. if(retryTimes++ < 3)
  28. {
  29. producer.send(project, logstore, topic, source, shardHash, items, this);
  30. }
  31. }
  32. else{
  33. System.out.println("send success, request id: " + response.GetRequestId());
  34. }
  35. }
  36. }

最後更新:2016-12-14 20:33:55

  上一篇:go log4j-appender__loghub-采集_用戶指南_日誌服務-阿裏雲
  下一篇:go syslog__loghub-采集_用戶指南_日誌服務-阿裏雲