閱讀706 返回首頁    go 阿裏雲 go 技術社區[雲棲]


最近分布式係統開發小結: Slave模塊Executors設計

更新一段我在linkedin上對這個項目的描述,目前項目已經開發完在使用了。本文並不是最新的設計。

背景
解決HDFS/Hive/RDBMS/FTP/MongoDB等數據源之間的批量數據同步問題


特性
跨機房場景下的鏈路優化;多路輸入和輸出的任務模型;數據容錯和可持久化;任務失敗恢複


任務調度
把任務配置解析為物理執行計劃,Master控製任務的調度和失敗恢複,基於Mesos完成資源分配和任務調度。Slave分布在各個數據中心,具體傳輸任務的調起做到鏈路優化選擇。高並發場景下,增加Mesos Slave節點來保證可擴展性(CPU和MEM資源),Master將元數據記錄在ZK上,並通過爭搶ZK鎖實現互備。


數據傳輸
傳輸組件分為Input、Cache和Output三種Executor,各自進程內通過雙隊列優化傳輸速度。數據以bundle為單位傳輸,通常上百行為一個bundle且可壓縮,Netty作網絡通信。Input端異步備份一份數據在BookKeeper,Cache使用Beanstalkd做消息隊列,Output端處理Bundle成功或失敗,會有守護線程異步刪除或更新beanstalkd內的Message(類似Storm Topology裏的Ack),Executors會把bundle傳輸狀態更新在ZookKeeper上,某一Executor掛掉都可以在一台slave上重新調起並恢複任務繼續進行。Input和Output端的Reader和Writer是插件化的。


====================================  我是更新線 ====================================


之前在最近分布式係統開發小結裏,提到了一個在開發中的係統的大致設計,本文是我負責部分的一個詳細設計。在閱讀本文前可以先瀏覽下之前那篇文章,對於係統的功能和概況有個基本了解。


1. Slave總體設計


Slave模塊主要需要實現不同的Mesos Executors,包括Input, MemoryStorage和Output三種Executor。每個Dpump任務會由Scheduler Manager經過邏輯執行計劃和物理執行計劃的拆分,從Knowledge Center獲取知識,最終將切分後的Task分配給相應的Slaves執行,並通過Mesos Master,分配資源並調起Slave上的各自的Executor。三種Executors的執行邏輯圖如下。


數據通過Bundle形式在三種Executor之間的流通,每個Bundle有唯一ID、一個String[]、以及一個Index。Index用於標記每個Bundle最後數據輸出的最新成功行,即我們容錯粒度控製在行級別。對Input、Cache、Output作一個簡單介紹:
  •  Input,也叫Reader。每個Task內隻有一個Input Executor,負責從數據源(HDFS、FTP、MySQL、MongoDB等)讀出數據,將數據經過切分、處理、壓縮後通過Netty流式傳輸給MemoryStorage。

  • Cache,也叫MemoryStorage。每個Task內隻有一個Cache Executor,負責從Input端接收Bundle,將Bundle存取往一個隊列內,當有Output連接的時候,將Bundle取出輸送給Output

  • Output,也叫Writer。每個Task可能有多個Output Executor,負責將數據最終輸出到數據目的源。Output從Cache端得到Bundle的過程也是流式的。

整個Task的流通都是流式的,且Slave之間的網絡通信使用的是Netty這個NIO框架,傳輸過程中還涉及到Bundle高效的正反序列化和壓縮、解壓縮。最重要的一點是Input、Cache、Output三個部分各自都有容錯設計,其中Input和Output通過向Zookeeper記錄和獲取Bundle狀態保證處理Bundle的不重不漏,而Cache通過對隊列內消息內容的鈍化,保證自身已保存的Bundle不丟失,並能在新的Cache Executor起來後,可以繼續為Output提供Bundle輸出。


2. Slave 詳細設計

下麵詳細介紹三種Executor的設計,閱讀過程中請參考這張Task進程圖。



2.1 Input設計


2.1.1 數據流通

每個Input負責一次Job(每個Job對應多個Tasks)內最小粒度的文件塊讀取,比如可能是一個HDFS Block,一張Hive表的一個分區甚至是一張MySQL表。

Input內還分有Writer、Buffer(雙隊列)和Reader。Writer是一個單線程,從數據源獲取數據並切分好Bundle,每個Bundle有唯一ID和定長的字符串數組,然後將Bundle存入雙隊列的輸入頭,在雙隊列的讀出頭有若幹個Reader線程搶占Bundle,每個Reader獲取到Bundle後釋放鎖並做二次處理、壓縮,最終Reader通過Netty Client將Bundle包裝成一個傳輸格式,以二進製流的方式通過Channel流向Cache。


2.1.2 容錯

Writer端切分Bundle保證了從同一個數據源的同份文件塊讀取數據生成Bundle是有序的,每次Netty往Channel裏寫入一份Bundle的時候,會通過Companion線程異步更新此Task下znode內的Bitmap,該Bitmap標記每個Bundle在Input端是否被傳輸。每次Input啟動的時候,Netty會讀取znode上的Bitmap緩存在內存裏,發送Bundle前根據id作一次校對。所以當Input掛掉或重啟時,可以保證發送給Cache的Bundle不重不漏。

2.2 Cache設計


2.2.1 消息隊列

Cache本身是一個Netty Server,接收Input和Output多個Netty Client的連接,並對不同的Channel做不同Event處理。Cache Executor需要一個多狀態的消息隊列,這裏采用的是Beanstalkd隊列,下圖為該Beanstalkd內消息(job)的狀態變化圖。



 每次Cache將新的Bundle put進Beanstalkd的時候需要選擇一個tube(管道),Beanstalkd可以開啟多個獨立的tube,tube內存放jobs,每個job有自己唯一的job id,而job消息體就是我們的bundleBytes(Bundle存入Cache直接存的就是序列化後的byte[])。

每個job存入queue後是ready狀態,被reserve之後,就不能被客戶端再次獲取到,即Cache每次會從每個tube裏按順序reserve一個job,並發送消息體給Output(一個output對應一個tube),這個過程保證每個job被消費一次,且隻能被一個Output消費。如果Output端消費成功,則該job會被delete掉;如果該job消費失敗,則會被重新置為ready,重新置為ready可能是因為超時(每個job被reserve的時候都有一個Time-To-Run時間設置)了,也可以是客戶端release掉該job。

2.2.2 Acker設計

這裏,對於tube內job的後續處理交給Acker這個線程來做。Acker的設計靈感來源於Storm。Storm Topology內每個bolt對tuple的執行和處理最終都會給Spout一個ack響應,而拓撲過程中整棵Tuple樹的成功/失敗執行狀態會由Acker守護進程進行跟蹤,以此來保證每個tuple被完全處理,而acker對tuple的跟蹤算法是Storm的主要突破之一。

Cache端的Acker線程會監聽zookeeper上znode樹上各個節點的事件變化,從而掌握被Output消費的所有Bundle的最後狀態,對應地刪除、釋放,或者更新Queue裏的job。需要注意的是這裏還涉及到一個更新job的過程。前麵提到Bundle內維護了一個index,而Output消費bundle的時候,如果是數據行寫了一半出現了異常或者掛掉了,我們需要記錄bundle內數據行的最新index並將此信息也記錄在znode上。對於這種最壞情況,Acker負責將該fail的job從queue裏delete掉,並更改job內bundle bytes內容,重置新的index,再把新的job put進queue裏。這是我們最不希望看到的情況,同時也是我們對Bundle能做的最細粒度的容錯設計。

2.2.3 容錯

Beanstalkd啟動之後可以打開binlog開關,binlog是Beanstalkd容錯恢複的機製,將內存裏的消息隊列結構映射到硬盤上。對於Cache的容錯設計,直觀的辦法在於將這份binlog存在NFS或HDFS上,來保證Cache掛掉重啟後,能獲取到之前保存的Bundle數據,繼續提供服務。

2.3 Output設計


Output在最終的Bundle消費階段,會把數據導向新的數據源。每個Output獲取的Bundle來自於Cache裏的一個tube,而每個Bundle的執行情況也會由Companion線程異步更新到Zookeeper上。

 對於Output來說,它隻需要關心從Cache端獲取的每個Bundle都照常處理就可以了,不需要關心這個Bundle之前是否被消費過,被消費到哪裏。原因在於,Cache端的job狀態的變更和job的更新可以由Acker保障,而Acker是從zk上得到這些job的狀態並對Queue異步更新。如果Acker掛了,隻要重新起一個線程獲取znode上最新的狀態就可以了。對於Output來說,能傳過來的Bundle,對應到queue裏就是ready狀態的job,這個job可能被消費過了,但是他的index也因此得到了更新,Output端對於所有Bundle的處理是一致的,唯一需要關心的是Output需要把Bundle的信息異步更新給zk,如果Output掛了,重新起一個Output接著從Cache讀Bundle就可以了。


3. Slave模塊總結

Slave模塊三種Executor的設計,主要考慮的是各個Executor掛掉之後,怎樣保證數據處理的不重複和不遺漏。我們依賴Zookeeper的可靠性,記錄、更新、判斷Bundle的狀態,做到Input、Cache、Output各司其職,最到最小粒度的容錯。Executor本身的失敗和重啟則由Mesos保障,Mesos作為資源管理係統,由Master監控Slave上各個Executor的執行狀況,通過回調,可以在合適的Slave上再次啟動掛掉的Executor進程,保證業務Task的順利進行。



(全文完)


最後更新:2017-04-03 12:54:03

  上一篇:go J2EE中獲取IP地址
  下一篇:go 2013百度校園招聘數據挖掘工程師