307
iPhone_iPad_Mac_手機_平板_蘋果apple
producer-lib__loghub-采集_用戶指南_日誌服務-阿裏雲
LogHub Producer Library是針對Java應用程序高並發寫LogHub類庫,Producer Library和Consumer Library是對LogHub讀寫包裝,降低數據收集與消費的門檻。
LogHub Producer Library解決的問題:
- 客戶端日誌不落盤:既數據產生後直接通過網絡發往服務端。
- 客戶端高並發寫入:例如一秒鍾會有百次以上寫操作。
- 客戶端計算與IO邏輯分離:打日誌不影響計算耗時。
在以上場景中,Producer Library會簡化你程序開發的代價,幫助你批量聚合寫請求,通過異步的方式發往LogHub服務端。在整個過程中,用戶可以配置批量聚合的參數,服務端異常處理的邏輯等。
以上各種接入方式的對比:
接入方式 | 優點/缺點 | 針對場景 |
---|---|---|
日誌落盤+Logtail | 日誌收集與打日誌解耦,無需修改代碼 | 常用場景 |
Syslog + Logtail | 性能較好(80MB/S),日誌不落盤,需支持syslog協議 | Syslog場景 |
SDK直發 | 不落盤,直接發往服務端,需要處理好網絡IO與程序IO之間的切換 | 日誌不落盤 |
Producer Library | 不落盤,異步合並發送服務端,吞吐量較好 | 日誌不落盤,客戶端QPS高 |
(目前Producer Library隻支持Java 版本,其他語言待開發)
LogHub Producer Library功能
- 提供異步的發送接口,線程安全。
- 可以添加多個project的配置。
- 用於發送的網絡IO線程數量可以配置。
- merge成的包的日誌數量以及大小都可以配置。
- 內存使用可控,當內存使用達到用戶配置的閾值時,producer的send接口會阻塞,直到有空閑的內存可用。
使用方法
producer使用分為以下幾個步驟:
step 1: maven工程中添加依賴:
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>log-loghub-producer</artifactId>
<version>0.1.4</version>
</dependency>
step 2:程序中配置ProducerConfig,其中各個參數說明如下。
public class ProducerConfig
{
//被緩存起來的日誌的發送超時時間,如果緩存超時,則會被立即發送,單位是毫秒
public int packageTimeoutInMS = 3000;
//每個緩存的日誌包中包含日誌數量的最大值,不能超過4096
public int logsCountPerPackage = 4096;
//每個緩存的日誌包的大小的上限,不能超過5MB,單位是字節
public int logsBytesPerPackage = 5 * 1024 * 1024;
//單個producer實例可以使用的內存的上限,單位是字節
public int memPoolSizeInByte = 1000 * 1024 * 1024;
//IO線程池最大線程數量,主要用於發送數據到日誌服務
public int maxIOThreadSizeInPool = 50;
//當使用指定shardhash的方式發送日誌時,這個參數需要被設置,否則不需要關心。後端merge線程會將映射到同一個shard的數據merge在一起,而shard關聯的是一個hash區間,
//producer在處理時會將用戶傳入的hash映射成shard關聯hash區間的最小值。每一個shard關聯的hash區間,producer會定時從從loghub拉取,該參數的含義是每隔shardHashUpdateIntervalInMS毫秒,
//更新一次shard的hash區間。
public int shardHashUpdateIntervalInMS = 10 * 60 * 1000;
//如果發送失敗,重試的次數,如果超過該值,就會將異常作為callback的參數,交由用戶處理。
public int retryTimes = 3;
}
step 3:繼承ILogCallback,callback主要用於日誌發送結果的處理,結果包括發送成功和發生異常。用戶也可以選擇不處理,這樣就不需要繼承ILogCallback。
step 4:創建producer實例,調用send接口發數據。
下麵是一個完整的示例。
示例
main:
public class ProducerSample {
private final static int ThreadsCount = 25;
public static String RandomString(int length) {
String str = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
Random random = new Random();
StringBuffer buf = new StringBuffer();
for (int i = 0; i < length; i++) {
int num = random.nextInt(62);
buf.append(str.charAt(num));
}
return buf.toString();
}
public static void main(String args[]) throws InterruptedException {
ProducerConfig producerConfig = new ProducerConfig();
//使用默認配置創建producer實例
final LogProducer producer = new LogProducer(producerConfig);
// 添加多個project配置
producer.setProjectConfig(new ProjectConfig("your project 1",
"endpoint", "your accesskey id", "your accesskey"));
producer.setProjectConfig(new ProjectConfig("your project 2",
"endpoint", "your accesskey id", "your accesskey",
"your sts token"));
// 更新project 1的配置
producer.setProjectConfig(new ProjectConfig("your project 1",
"endpoint", "your new accesskey id", "your new accesskey"));
// 刪除project 2的配置
producer.removeProjectConfig("your project 2");
// 生成日誌集合,用於測試
final Vector<Vector<LogItem>> logGroups = new Vector<Vector<LogItem>>();
for (int i = 0; i < 100000; ++i) {
Vector<LogItem> tmpLogGroup = new Vector<LogItem>();
LogItem logItem = new LogItem((int) (new Date().getTime() / 1000));
logItem.PushBack("level", "info" + System.currentTimeMillis());
logItem.PushBack("message", "test producer send perf "
+ RandomString(50));
logItem.PushBack("method", "SenderToServer " + RandomString(10));
tmpLogGroup.add(logItem);
logGroups.add(tmpLogGroup);
}
// 並發調用send發送日誌
Thread[] threads = new Thread[ThreadsCount];
for (int i = 0; i < ThreadsCount; ++i) {
threads[i] = new Thread(null, new Runnable() {
Random random = new Random();
public void run() {
int j = 0, rand = random.nextInt(99999);
while (++j < Integer.MAX_VALUE) {
producer.send("project 1", "logstore 1", "topic",
"source ip", logGroups.get(rand),
new CallbackSample("project 1", "logstore 1", "topic", "source ip", null, logGroups.get(rand), producer));
}
}
}, i + "");
threads[i].start();
}
//等待發送線程退出
Thread.sleep(1 * 60 * 60 * 1000);
//主動刷新緩存起來的還沒有被發送的日誌
producer.flush();
//關閉後台io線程,close會將調用時刻內存中緩存的數據發送出去
producer.close();
}
}
callback:
public class CallbackSample extends ILogCallback {
//保存要發送的數據,當時發生異常時,進行重試
public String project;
public String logstore;
public String topic;
public String shardHash;
public String source;
public Vector<LogItem> items;
public LogProducer producer;
public int retryTimes = 0;
public CallbackSample(String project, String logstore, String topic,
String shardHash, String source, Vector<LogItem> items, LogProducer producer) {
super();
this.project = project;
this.logstore = logstore;
this.topic = topic;
this.shardHash = shardHash;
this.source = source;
this.items = items;
this.producer = producer;
}
public void onCompletion(PutLogsResponse response, LogException e) {
if (e != null) {
// 打印異常
System.out.println(e.GetErrorCode() + ", " + e.GetErrorMessage() + ", " + e.GetRequestId());
//最多重試三次
if(retryTimes++ < 3)
{
producer.send(project, logstore, topic, source, shardHash, items, this);
}
}
else{
System.out.println("send success, request id: " + response.GetRequestId());
}
}
}
最後更新:2016-12-14 20:33:55
上一篇:
log4j-appender__loghub-采集_用戶指南_日誌服務-阿裏雲
下一篇:
syslog__loghub-采集_用戶指南_日誌服務-阿裏雲
功能實時性__常見問題_日誌服務-阿裏雲
雲數據庫Redis版__訪問其他雲產品_操作指南_高性能計算-阿裏雲
查詢應用加固結果接口__應用加固API_API手冊_移動安全-阿裏雲
步驟 2:日誌清洗__創建監控任務_用戶指南_業務實時監控服務 ARMS-阿裏雲
查詢推送列表__查詢相關_API 列表_OpenAPI 2.0_移動推送-阿裏雲
有關Object命令__osscmd_常用工具_對象存儲 OSS-阿裏雲
在哪裏可以查到OSS的權限定義___對象存儲(OSS)授權問題_授權常見問題_訪問控製-阿裏雲
性能測試核心技術__中級課程_性能測試視頻教程_性能測試-阿裏雲
使用 psql 命令遷移 PostgreSQL 數據__快速入門(PostgreSQL)_雲數據庫 RDS 版-阿裏雲
8.2 數據導入任務生產指南__第八章 在生產中使用分析型數據庫_使用手冊_分析型數據庫-阿裏雲
相關內容
常見錯誤說明__附錄_大數據計算服務-阿裏雲
發送短信接口__API使用手冊_短信服務-阿裏雲
接口文檔__Android_安全組件教程_移動安全-阿裏雲
運營商錯誤碼(聯通)__常見問題_短信服務-阿裏雲
設置短信模板__使用手冊_短信服務-阿裏雲
OSS 權限問題及排查__常見錯誤及排除_最佳實踐_對象存儲 OSS-阿裏雲
消息通知__操作指南_批量計算-阿裏雲
設備端快速接入(MQTT)__快速開始_阿裏雲物聯網套件-阿裏雲
查詢API調用流量數據__API管理相關接口_API_API 網關-阿裏雲
使用STS訪問__JavaScript-SDK_SDK 參考_對象存儲 OSS-阿裏雲