949
技術社區[雲棲]
分布式日誌收集係統Apache Flume的設計介紹
概述
Flume是Cloudera公司的一款高性能、高可能的分布式日誌收集係統。現在已經是Apache Top項目。Github地址。同Flume相似的日誌收集係統還有Facebook Scribe,Apache
Chuwka,Apache Kafka(也是LinkedIn的)。Flume是後起之秀,本文嚐試簡要分析Flume數據流通過程中提供的組件、可靠性保證來介紹Flume的主要設計,不涉及Flume具體的安裝使用,也不涉及代碼層麵的剖析。寫博文來記錄這個工具主要是覺得與最近開發的一個流式的數據搬運的工具在設計上有相似之處,想看看有沒有可以參考的地方。在博文的基礎上,還需要瀏覽一下源碼。
數據流通
Flume傳輸的數據的基本單位是event,如果是文本文件,通常是一行記錄,這也是事務的基本單位。flume運行的核心是agent。它是一個完整的數據收集工具,含有三個核心組件,分別是source、channel、sink。Event從Source,流向Channel,再到Sink,本身為一個byte數組,並可攜帶headers信息。Event代表著一個數據流的最小完整單元,從外部數據源來,向外部的目的地去。Source:完成對日誌數據的收集,分成transtion 和 event 打入到channel之中。Channel:主要提供一個隊列的功能,對source提供中的數據進行簡單的緩存。Sink:取出Channel中的數據,進行相應的存儲文件係統,數據庫,或者提交到遠程服務器。通過這些組件,event可以從一個地方流向另一個地方,如下圖所示。

Source消費從外部流進的Events,如AvroSource接收外部客戶端傳來的或是從別的agent流出來的Avro Event。Source可以把event送往一個或多個channel。channel是一個隊列,持有event等待sink來消費,一種Channel的實現:FileChannel使用本地文件係統來作為它的存儲。Sink的作用是把Event從channel裏移除,送往外部數據倉庫或給下一站agent的Source,如HDFSEventSink送往HDFS。同個agent下的source和sink是異步的。下麵再舉幾個數據流通的例子,說明不同的使用方式。



Source接入
Client端操作消費數據的來源,Flume支持Avro,log4j,syslog和http post(body為json格式)。可以讓應用程序同已有的Source直接打交道,如AvroSource,SyslogTcpSource。也可以寫一個Source,以IPC或RPC的方式接入自己的應用,Avro和Thrift都可以(分別有NettyAvroRpcClient和ThriftRpcClient實現了RpcClient接口),其中Avro是默認的RPC協議。具體代碼級別的Client端數據接入,可以參考官方手冊。
對現有程序改動最小的使用方式是使用是直接讀取程序原來記錄的日誌文件,基本可以實現無縫接入,不需要對現有程序進行任何改動。
對於直接讀取文件Source,有兩種方式:
對現有程序改動最小的使用方式是使用是直接讀取程序原來記錄的日誌文件,基本可以實現無縫接入,不需要對現有程序進行任何改動。
對於直接讀取文件Source,有兩種方式:
- ExecSource:以運行Linux命令的方式,持續的輸出最新的數據,如tail -F 文件名指令,在這種方式下,取的文件名必須是指定的。 ExecSource可以實現對日誌的實時收集,但是存在Flume不運行或者指令執行出錯時,將無法收集到日誌數據,無法保證日誌數據的完整性。
- SpoolSource:監測配置的目錄下新增的文件,並將文件中的數據讀取出來。需要注意兩點:拷貝到spool目錄下的文件不可以再打開編輯;spool目錄下不可包含相應的子目錄。SpoolSource雖然無法實現實時的收集數據,但是可以使用以分鍾的方式分割文件,趨近於實時。如果應用無法實現以分鍾切割日誌文件的話,可以兩種收集方式結合使用。 在實際使用的過程中,可以結合log4j使用,使用log4j的時候,將log4j的文件分割機製設為1分鍾一次,將文件拷貝到spool的監控目錄。log4j有一個TimeRolling的插件,可以把log4j分割的文件到spool目錄。基本實現了實時的監控。Flume在傳完文件之後,將會修改文件的後綴,變為.COMPLETED(後綴也可以在配置文件中靈活指定)
public class MySource extends AbstractSource implements Configurable, PollableSource { private String myProp; @Override public void configure(Context context) { String myProp = context.getString("myProp", "defaultValue"); // Process the myProp value (e.g. validation, convert to another type, ...) // Store myProp for later retrieval by process() method this.myProp = myProp; } @Override public void start() { // Initialize the connection to the external client } @Override public void stop () { // Disconnect from external client and do any additional cleanup // (e.g. releasing resources or nulling-out field values) .. } @Override public Status process() throws EventDeliveryException { Status status = null; // Start transaction Channel ch = getChannel(); Transaction txn = ch.getTransaction(); txn.begin(); try { // This try clause includes whatever Channel operations you want to do // Receive new data Event e = getSomeData(); // Store the Event into this Source's associated Channel(s) getChannelProcessor().processEvent(e) txn.commit(); status = Status.READY; } catch (Throwable t) { txn.rollback(); // Log exception, handle individual exceptions as needed status = Status.BACKOFF; // re-throw all Errors if (t instanceof Error) { throw (Error)t; } } finally { txn.close(); } return status; }}
Channel
Channel有多種方式:有MemoryChannel,JDBC Channel,MemoryRecoverChannel,FileChannel。
MemoryChannel可以實現高速的吞吐,但是無法保證數據的完整性。
MemoryRecoverChannel在官方文檔的建議上已經建義使用FileChannel來替換。
FileChannel保證數據的完整性與一致性。在具體配置不現的FileChannel時,建議FileChannel設置的目錄和程序日誌文件保存的目錄設成不同的磁盤,以便提高效率。
MemoryChannel可以實現高速的吞吐,但是無法保證數據的完整性。
MemoryRecoverChannel在官方文檔的建議上已經建義使用FileChannel來替換。
FileChannel保證數據的完整性與一致性。在具體配置不現的FileChannel時,建議FileChannel設置的目錄和程序日誌文件保存的目錄設成不同的磁盤,以便提高效率。
Sink
Sink在設置存儲數據時,可以向文件係統、數據庫、hadoop存數據,在日誌數據較少時,可以將數據存儲在文件係中,並且設定一定的時間間隔保存數據。在日誌數據較多時,可以將相應的日誌數據存儲到Hadoop中,便於日後進行相應的數據分析。
更多sink的內容可以參考官方手冊。
更多sink的內容可以參考官方手冊。
public class MySink extends AbstractSink implements Configurable { private String myProp; @Override public void configure(Context context) { String myProp = context.getString("myProp", "defaultValue"); // Process the myProp value (e.g. validation) // Store myProp for later retrieval by process() method this.myProp = myProp; } @Override public void start() { // Initialize the connection to the external repository (e.g. HDFS) that // this Sink will forward Events to .. } @Override public void stop () { // Disconnect from the external respository and do any // additional cleanup (e.g. releasing resources or nulling-out // field values) .. } @Override public Status process() throws EventDeliveryException { Status status = null; // Start transaction Channel ch = getChannel(); Transaction txn = ch.getTransaction(); txn.begin(); try { // This try clause includes whatever Channel operations you want to do Event event = ch.take(); // Send the Event to the external repository. // storeSomeData(e); txn.commit(); status = Status.READY; } catch (Throwable t) { txn.rollback(); // Log exception, handle individual exceptions as needed status = Status.BACKOFF; // re-throw all Errors if (t instanceof Error) { throw (Error)t; } } finally { txn.close(); } return status; }}
可靠性
Flume的核心是把數據從數據源收集過來,再送到目的地。為了保證輸送一定成功,在送到目的地之前,會先緩存數據,待數據真正到達目的地後,刪除自己緩存的數據。
Flume使用事務性的方式保證傳送Event整個過程的可靠性。Sink必須在Event被存入Channel後,或者,已經被傳達到下一站agent裏,又或者,已經被存入外部數據目的地之後,才能把Event從Channel中remove掉。這樣數據流裏的event無論是在一個agent裏還是多個agent之間流轉,都能保證可靠,因為以上的事務保證了event會被成功存儲起來。而Channel的多種實現在可恢複性上有不同的保證。也保證了event不同程度的可靠性。比如Flume支持在本地保存一份文件channel作為備份,而memory channel將event存在內存queue裏,速度快,但丟失的話無法恢複。
具體看一下Transaction。Source和Sink封裝了Channel提供的對Event的事務存、取接口,下圖為一個transaction過程:
Flume使用事務性的方式保證傳送Event整個過程的可靠性。Sink必須在Event被存入Channel後,或者,已經被傳達到下一站agent裏,又或者,已經被存入外部數據目的地之後,才能把Event從Channel中remove掉。這樣數據流裏的event無論是在一個agent裏還是多個agent之間流轉,都能保證可靠,因為以上的事務保證了event會被成功存儲起來。而Channel的多種實現在可恢複性上有不同的保證。也保證了event不同程度的可靠性。比如Flume支持在本地保存一份文件channel作為備份,而memory channel將event存在內存queue裏,速度快,但丟失的話無法恢複。
具體看一下Transaction。Source和Sink封裝了Channel提供的對Event的事務存、取接口,下圖為一個transaction過程:

一個Channel的實現裏會包括一個transaction的實現,每個與channel打交道的source和sink都得帶有一個transaction對象。下麵的例子中可以看到一個Event的狀態和變化會在一次transation中完成。transaction的狀態也對應了時序圖中的各個狀態。
Channel ch = new MemoryChannel(); Transaction txn = ch.getTransaction(); txn.begin(); try { // This try clause includes whatever Channel operations you want to do Event eventToStage = EventBuilder.withBody("Hello Flume!", Charset.forName("UTF-8")); ch.put(eventToStage); // Event takenEvent = ch.take(); // ... txn.commit(); } catch (Throwable t) { txn.rollback(); // Log exception, handle individual exceptions as needed // re-throw all Errors if (t instanceof Error) { throw (Error)t; } } finally { txn.close(); }
(全文完)
最後更新:2017-04-03 12:54:31