閱讀122 返回首頁    go 阿裏雲 go 技術社區[雲棲]


HdfsSink原理解析

我們先了解幾個概念:

  • batchSize sink從channel中取batchSize數量的event才會flush,sync到hdfs
  • transactionCapcity source在收集滿transactionCapcity數量的event才會put到channel

接著看一下類圖:

HDFSEventSink

HDFSEventSink,是我們在flume配置文件中指定的channel.type=hdfs時對應的java類。它主要有三個方法:

  • start() 初始化線程池等
  • stop() 清理工作
  • process() SinkRunner調用的方法

process()

HDFS sink主要處理過程為process方法。

//循環batchSize次或者Channel為空
 for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) {
        Event event = channel.take();
        if (event == null) {
          break;
        }
    ......
    //sfWriter是一個LRU緩存,緩存對文件Handler,最大打開文件由參數maxopenfiles控製
    BucketWriter bucketWriter = sfWriters.get(lookupPath);
    // 如果不存在,則構造一個緩存
    if (bucketWriter == null) {
    //通過HDFSWriterFactory根據filetype生成一個hdfswriter,由參數hdfs.Filetype控製;eg:HDFSDataStream
    HDFSWriter hdfsWriter = writerFactory.getWriter(fileType);
    //idleCallback會在bucketWriter flush完畢後從LRU中刪除;
    WriterCallback idleCallback = null;
    if(idleTimeout != 0) {
    idleCallback = new WriterCallback() {
      @Override
      public void run(String bucketPath) {
        sfWriters.remove(bucketPath);
      }
    };
    }
    bucketWriter = new BucketWriter(rollInterval, rollSize, rollCount,
    batchSize, context, realPath, realName, inUsePrefix, inUseSuffix,
    suffix, codeC, compType,hdfsWriter, timedRollerPool,
    proxyTicket, sinkCounter, idleTimeout, idleCallback,
    lookupPath, callTimeout, callTimeoutPool);
    sfWriters.put(lookupPath, bucketWriter);
    }
    ......
    // track一個事務內的bucket
    if (!writers.contains(bucketWriter)) {
    writers.add(bucketWriter);
    }
    // 寫數據到HDFS;
    bucketWriter.append(event);-------------apend()內部如下------------>

    open();//如果底層支持append,則通過open接口打開;否則create接口
    //判斷是否進行日誌切換
    //根據複製的副本書和目標副本數做對比,如果不滿足則doRotate=false
    if(doRotate) {
    close();
    open();
    }

    //寫數據,超時時間hdfs.callTimeout
    callWithTimeout(new CallRunner<Void>() {
        @Override
        public Void call() throws Exception {
          writer.append(event); // could block
          return null;
        }
      });
    if(batchCounter == batchSize) {//如果達到batchSize行進行一次flush
    flush();---------->
    doFlush()--------->
    HDFSWriter.sync()----------->
    FSDataoutputStream.flush/sync
    }
    // 提交事務之前,刷新所有的bucket
    for(BucketWriter bucketWriter : writers){
    bucketWriter.flush();
    }
    transaction.commit();

BucketWriter

BucketWriter主要封裝了hadoop hdfs api

Reference

https://boylook.blog.51cto.com/7934327/d-4

轉載請注明:https://blog.csdn.net/wsscy2004

最後更新:2017-04-03 12:55:52

  上一篇:go [C/C++基礎知識] 那些被遺忘的鏈表知識
  下一篇:go 談android界麵設計