閱讀765 返回首頁    go 阿裏雲 go 技術社區[雲棲]


Flume-ng啟動過程分析

Application.java

入口函數main():

    ...
    //加載flume的配置文件,初始化Sink,Source,Channel的工廠類
    PropertiesFileConfigurationProvider configurationProvider =
            new PropertiesFileConfigurationProvider(agentName,
                configurationFile);
    application = new Application();
    //configurationProvider.getConfiguration()中實例化Sink,Source,Channel
    application.handleConfigurationEvent(configurationProvider.getConfiguration());-------getConfiguration------>
            //Map用於存儲所有Sink,Source,Channel
            Map<String, ChannelComponent> channelComponentMap = Maps.newHashMap();
            Map<String, SourceRunner> sourceRunnerMap = Maps.newHashMap();
            Map<String, SinkRunner> sinkRunnerMap = Maps.newHashMap();
            //先實例化channel
            loadChannels(agentConf, channelComponentMap);
            //將Source對應的channel注冊到ChannelSelector,Source通過ChannelSelector獲取Channel
            loadSources(agentConf, channelComponentMap, sourceRunnerMap);
            //向Sink注冊Channel
            loadSinks(agentConf, channelComponentMap, sinkRunnerMap);
            ...
            conf.addChannel(channelName, channelComponent.channel);
            ...
            for(Map.Entry<String, SourceRunner> entry : sourceRunnerMap.entrySet()) {
              conf.addSourceRunner(entry.getKey(), entry.getValue());
            }
            for(Map.Entry<String, SinkRunner> entry : sinkRunnerMap.entrySet()) {
              conf.addSinkRunner(entry.getKey(), entry.getValue());
            }
            ...
            return conf

    ...
    //application.handleConfigurationEvent(conf)---->
            stopAllComponents();
            startAllComponents(conf);
    final Application appReference = application;
    //關閉程序時,調用的鉤子
    Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") {
    @Override
    public void run() {
      appReference.stop();
    }
  });

startAllComponents(conf):

//通過LifecycleSupervisor類啟動組件//啟動MonitorRunnable,監控Channelfor (Entry<String, Channel> entry :
      materializedConfiguration.getChannels().entrySet()) {
      try{
        logger.info("Starting Channel " + entry.getKey());
        supervisor.supervise(entry.getValue(),
            new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
      } catch (Exception e){
        logger.error("Error while starting {}", entry.getValue(), e);
      }
    }

    //等待啟動
    for(Channel ch: materializedConfiguration.getChannels().values()){
      while(ch.getLifecycleState() != LifecycleState.START
          && !supervisor.isComponentInErrorState(ch)){
        try {
          logger.info("Waiting for channel: " + ch.getName() +
              " to start. Sleeping for 500 ms");
          Thread.sleep(500);
        } catch (InterruptedException e) {
          logger.error("Interrupted while waiting for channel to start.", e);
          Throwables.propagate(e);
        }
      }
    }

    //啟動MonitorRunnable,監控sink
    for (Entry<String, SinkRunner> entry : materializedConfiguration.getSinkRunners()
        .entrySet()) {
      try{
        logger.info("Starting Sink " + entry.getKey());
        supervisor.supervise(entry.getValue(),
          new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
      } catch (Exception e) {
        logger.error("Error while starting {}", entry.getValue(), e);
      }
    }

    //啟動MonitorRunnable,監控source
    for (Entry<String, SourceRunner> entry : materializedConfiguration
        .getSourceRunners().entrySet()) {
      try{
        logger.info("Starting Source " + entry.getKey());
        supervisor.supervise(entry.getValue(),
          new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
      } catch (Exception e) {
        logger.error("Error while starting {}", entry.getValue(), e);
      }
    }

    this.loadMonitoring();

LifecycleSupervisor.java

負責啟動和監控Flume組件的類,功能如:失敗重啟組件
LifecycleSupervisor內部比較重要的幾個變量:

//監控進程的線程池
ScheduledThreadPoolExecutor monitorService
Map<LifecycleAware, ScheduledFuture<?>> monitorFutures
Map<LifecycleAware, Supervisoree> supervisedProcesses
//啟動監控
  public synchronized void supervise(LifecycleAware lifecycleAware,
      SupervisorPolicy policy, LifecycleState desiredState) {
    ...

    //組件狀態
    Supervisoree process = new Supervisoree();
    process.status = new Status();
    process.policy = policy;
    process.status.desiredState = desiredState;
    process.status.error = false;

    //監控線程,調用啟動組件的線程。比如
    MonitorRunnable monitorRunnable = new MonitorRunnable();---->
            lifecycleAware.start();--->
                //如果是sink
                sinRunner.start()---->
                    runnerThread = new Thread(runner);
                    runnerThread.setName("SinkRunner-PollingRunner-" +
                    policy.getClass().getSimpleName());
                    runnerThread.start();---->
                        //在新線程裏循環調用
                        DefaultSinkProcessor.process();---->
                            //sink從channel中取數據,進行處理
                            sink.process();



    monitorRunnable.lifecycleAware = lifecycleAware;
    monitorRunnable.supervisoree = process;
    monitorRunnable.monitorService = monitorService;

    supervisedProcesses.put(lifecycleAware, process);

    //每隔三秒監控組件運行狀況
    ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay(
        monitorRunnable, 0, 3, TimeUnit.SECONDS);
    monitorFutures.put(lifecycleAware, future);
  }


最後更新:2017-04-03 12:55:50

  上一篇:go DevExpress GridControl複合表頭(多行表頭)設置
  下一篇:go 連載:麵向對象葵花寶典:思想、技巧與實踐(22) - 領域模型