閱讀296 返回首頁    go 阿裏雲 go 技術社區[雲棲]


Flume-ng ThriftSource原理分析


Thrift IDL

Flume Thrift IDL在client包裏麵,定義如下:

namespace java org.apache.flume.thrift

struct ThriftFlumeEvent {
  1: required map <string, string> headers,
  2: required binary body,
}

enum Status {
  OK,
  FAILED,
  ERROR,
  UNKNOWN
}

service ThriftSourceProtocol {
  Status append(1: ThriftFlumeEvent event),
  Status appendBatch(1: list<ThriftFlumeEvent> events),
}

注意:event在C#裏麵是關鍵字,所以利用Thrift編譯器生成客戶端的接口時,要把所有event關鍵字改成其他的、比如events.

Thrift Service

Flume的Source分兩種:

  • 實現PollableSource接口
    通過SinkRunner管理Source
  • 實現EventDrivenSource接口
    可以自己接受數據、發送到channel。比如ThriftSource

Flume Thrift Service的實現類在core包

public class ThriftSource extends AbstractSource implements Configurable,
  EventDrivenSource {
  public static final String CONFIG_THREADS = "threads";
  public static final String CONFIG_BIND = "bind";
  public static final String CONFIG_PORT = "port";
  private Integer port;
  private String bindAddress;
  private int maxThreads = 0;
  private SourceCounter sourceCounter;
  private TServer server;
  private TServerTransport serverTransport;
  private ExecutorService servingExecutor;
  public void start() {
      //創建工作者線程池
      ...
      args.protocolFactory(new TCompactProtocol.Factory());
      args.inputTransportFactory(new TFastFramedTransport.Factory());
      args.outputTransportFactory(new TFastFramedTransport.Factory());

      //ThriftSourceProtocol是Flume Thrift Service的真正實現
      args.processor(new ThriftSourceProtocol
        .Processor<ThriftSourceHandler>(new ThriftSourceHandler()));
     /**
     * Start serving.
     */
    servingExecutor.submit(new Runnable() {
      @Override
      public void run() {
        server.serve();
      }
    });
    ...
  }

Flume Thrift Service真正的實現類是內部類ThriftSourceHandler

  private class ThriftSourceHandler implements ThriftSourceProtocol.Iface {

    @Override
    public Status append(ThriftFlumeEvent event) throws TException {
      Event flumeEvent = EventBuilder.withBody(event.getBody(),
        event.getHeaders());

      sourceCounter.incrementAppendReceivedCount();
      sourceCounter.incrementEventReceivedCount();

      try {
        //傳給channel
        getChannelProcessor().processEvent(flumeEvent);
      } catch (ChannelException ex) {
        logger.warn("Thrift source " + getName() + " could not append events " +
          "to the channel.", ex);
        return Status.FAILED;
      }
      sourceCounter.incrementAppendAcceptedCount();
      sourceCounter.incrementEventAcceptedCount();
      return Status.OK;
    }

    @Override
    public Status appendBatch(List<ThriftFlumeEvent> events) throws TException {
      sourceCounter.incrementAppendBatchReceivedCount();
      sourceCounter.addToEventReceivedCount(events.size());

      List<Event> flumeEvents = Lists.newArrayList();
      for(ThriftFlumeEvent event : events) {
        flumeEvents.add(EventBuilder.withBody(event.getBody(),
          event.getHeaders()));
      }

      try {
        getChannelProcessor().processEventBatch(flumeEvents);
      } catch (ChannelException ex) {
        logger.warn("Thrift source %s could not append events to the " +
          "channel.", getName());
        return Status.FAILED;
      }

      sourceCounter.incrementAppendBatchAcceptedCount();
      sourceCounter.addToEventAcceptedCount(events.size());
      return Status.OK;
    }
  }

最後更新:2017-04-03 12:55:13

  上一篇:go 怎樣在vs2010中添加圖片資源呢?
  下一篇:go 2012年藍橋杯【初賽試題】 轉方陣