Flume-ng啟動過程分析
Application.java
入口函數main():
...
//加載flume的配置文件,初始化Sink,Source,Channel的工廠類
PropertiesFileConfigurationProvider configurationProvider =
new PropertiesFileConfigurationProvider(agentName,
configurationFile);
application = new Application();
//configurationProvider.getConfiguration()中實例化Sink,Source,Channel
application.handleConfigurationEvent(configurationProvider.getConfiguration());-------getConfiguration------>
//Map用於存儲所有Sink,Source,Channel
Map<String, ChannelComponent> channelComponentMap = Maps.newHashMap();
Map<String, SourceRunner> sourceRunnerMap = Maps.newHashMap();
Map<String, SinkRunner> sinkRunnerMap = Maps.newHashMap();
//先實例化channel
loadChannels(agentConf, channelComponentMap);
//將Source對應的channel注冊到ChannelSelector,Source通過ChannelSelector獲取Channel
loadSources(agentConf, channelComponentMap, sourceRunnerMap);
//向Sink注冊Channel
loadSinks(agentConf, channelComponentMap, sinkRunnerMap);
...
conf.addChannel(channelName, channelComponent.channel);
...
for(Map.Entry<String, SourceRunner> entry : sourceRunnerMap.entrySet()) {
conf.addSourceRunner(entry.getKey(), entry.getValue());
}
for(Map.Entry<String, SinkRunner> entry : sinkRunnerMap.entrySet()) {
conf.addSinkRunner(entry.getKey(), entry.getValue());
}
...
return conf
...
//application.handleConfigurationEvent(conf)---->
stopAllComponents();
startAllComponents(conf);
final Application appReference = application;
//關閉程序時,調用的鉤子
Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") {
@Override
public void run() {
appReference.stop();
}
});
startAllComponents(conf):
//通過LifecycleSupervisor類啟動組件//啟動MonitorRunnable,監控Channelfor (Entry<String, Channel> entry :
materializedConfiguration.getChannels().entrySet()) {
try{
logger.info("Starting Channel " + entry.getKey());
supervisor.supervise(entry.getValue(),
new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
} catch (Exception e){
logger.error("Error while starting {}", entry.getValue(), e);
}
}
//等待啟動
for(Channel ch: materializedConfiguration.getChannels().values()){
while(ch.getLifecycleState() != LifecycleState.START
&& !supervisor.isComponentInErrorState(ch)){
try {
logger.info("Waiting for channel: " + ch.getName() +
" to start. Sleeping for 500 ms");
Thread.sleep(500);
} catch (InterruptedException e) {
logger.error("Interrupted while waiting for channel to start.", e);
Throwables.propagate(e);
}
}
}
//啟動MonitorRunnable,監控sink
for (Entry<String, SinkRunner> entry : materializedConfiguration.getSinkRunners()
.entrySet()) {
try{
logger.info("Starting Sink " + entry.getKey());
supervisor.supervise(entry.getValue(),
new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
} catch (Exception e) {
logger.error("Error while starting {}", entry.getValue(), e);
}
}
//啟動MonitorRunnable,監控source
for (Entry<String, SourceRunner> entry : materializedConfiguration
.getSourceRunners().entrySet()) {
try{
logger.info("Starting Source " + entry.getKey());
supervisor.supervise(entry.getValue(),
new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
} catch (Exception e) {
logger.error("Error while starting {}", entry.getValue(), e);
}
}
this.loadMonitoring();
LifecycleSupervisor.java
負責啟動和監控Flume組件的類,功能如:失敗重啟組件
LifecycleSupervisor內部比較重要的幾個變量:
//監控進程的線程池
ScheduledThreadPoolExecutor monitorService
Map<LifecycleAware, ScheduledFuture<?>> monitorFutures
Map<LifecycleAware, Supervisoree> supervisedProcesses
//啟動監控
public synchronized void supervise(LifecycleAware lifecycleAware,
SupervisorPolicy policy, LifecycleState desiredState) {
...
//組件狀態
Supervisoree process = new Supervisoree();
process.status = new Status();
process.policy = policy;
process.status.desiredState = desiredState;
process.status.error = false;
//監控線程,調用啟動組件的線程。比如
MonitorRunnable monitorRunnable = new MonitorRunnable();---->
lifecycleAware.start();--->
//如果是sink
sinRunner.start()---->
runnerThread = new Thread(runner);
runnerThread.setName("SinkRunner-PollingRunner-" +
policy.getClass().getSimpleName());
runnerThread.start();---->
//在新線程裏循環調用
DefaultSinkProcessor.process();---->
//sink從channel中取數據,進行處理
sink.process();
monitorRunnable.lifecycleAware = lifecycleAware;
monitorRunnable.supervisoree = process;
monitorRunnable.monitorService = monitorService;
supervisedProcesses.put(lifecycleAware, process);
//每隔三秒監控組件運行狀況
ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay(
monitorRunnable, 0, 3, TimeUnit.SECONDS);
monitorFutures.put(lifecycleAware, future);
}
最後更新:2017-04-03 12:55:50