HdfsSink原理解析
我們先了解幾個概念:
- batchSize sink從channel中取batchSize數量的event才會flush,sync到hdfs
- transactionCapcity source在收集滿transactionCapcity數量的event才會put到channel
接著看一下類圖:
HDFSEventSink
HDFSEventSink,是我們在flume配置文件中指定的channel.type=hdfs時對應的java類。它主要有三個方法:
- start() 初始化線程池等
- stop() 清理工作
- process() SinkRunner調用的方法
process()
HDFS sink主要處理過程為process方法。
//循環batchSize次或者Channel為空
for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) {
Event event = channel.take();
if (event == null) {
break;
}
......
//sfWriter是一個LRU緩存,緩存對文件Handler,最大打開文件由參數maxopenfiles控製
BucketWriter bucketWriter = sfWriters.get(lookupPath);
// 如果不存在,則構造一個緩存
if (bucketWriter == null) {
//通過HDFSWriterFactory根據filetype生成一個hdfswriter,由參數hdfs.Filetype控製;eg:HDFSDataStream
HDFSWriter hdfsWriter = writerFactory.getWriter(fileType);
//idleCallback會在bucketWriter flush完畢後從LRU中刪除;
WriterCallback idleCallback = null;
if(idleTimeout != 0) {
idleCallback = new WriterCallback() {
@Override
public void run(String bucketPath) {
sfWriters.remove(bucketPath);
}
};
}
bucketWriter = new BucketWriter(rollInterval, rollSize, rollCount,
batchSize, context, realPath, realName, inUsePrefix, inUseSuffix,
suffix, codeC, compType,hdfsWriter, timedRollerPool,
proxyTicket, sinkCounter, idleTimeout, idleCallback,
lookupPath, callTimeout, callTimeoutPool);
sfWriters.put(lookupPath, bucketWriter);
}
......
// track一個事務內的bucket
if (!writers.contains(bucketWriter)) {
writers.add(bucketWriter);
}
// 寫數據到HDFS;
bucketWriter.append(event);-------------apend()內部如下------------>
open();//如果底層支持append,則通過open接口打開;否則create接口
//判斷是否進行日誌切換
//根據複製的副本書和目標副本數做對比,如果不滿足則doRotate=false
if(doRotate) {
close();
open();
}
//寫數據,超時時間hdfs.callTimeout
callWithTimeout(new CallRunner<Void>() {
@Override
public Void call() throws Exception {
writer.append(event); // could block
return null;
}
});
if(batchCounter == batchSize) {//如果達到batchSize行進行一次flush
flush();---------->
doFlush()--------->
HDFSWriter.sync()----------->
FSDataoutputStream.flush/sync
}
// 提交事務之前,刷新所有的bucket
for(BucketWriter bucketWriter : writers){
bucketWriter.flush();
}
transaction.commit();
BucketWriter
BucketWriter主要封裝了hadoop hdfs api
Reference
https://boylook.blog.51cto.com/7934327/d-4
轉載請注明:https://blog.csdn.net/wsscy2004
最後更新:2017-04-03 12:55:52