最近分布式係統開發小結: Slave模塊Executors設計
更新一段我在linkedin上對這個項目的描述,目前項目已經開發完在使用了。本文並不是最新的設計。
背景
解決HDFS/Hive/RDBMS/FTP/MongoDB等數據源之間的批量數據同步問題
特性
跨機房場景下的鏈路優化;多路輸入和輸出的任務模型;數據容錯和可持久化;任務失敗恢複
任務調度
把任務配置解析為物理執行計劃,Master控製任務的調度和失敗恢複,基於Mesos完成資源分配和任務調度。Slave分布在各個數據中心,具體傳輸任務的調起做到鏈路優化選擇。高並發場景下,增加Mesos Slave節點來保證可擴展性(CPU和MEM資源),Master將元數據記錄在ZK上,並通過爭搶ZK鎖實現互備。
數據傳輸
傳輸組件分為Input、Cache和Output三種Executor,各自進程內通過雙隊列優化傳輸速度。數據以bundle為單位傳輸,通常上百行為一個bundle且可壓縮,Netty作網絡通信。Input端異步備份一份數據在BookKeeper,Cache使用Beanstalkd做消息隊列,Output端處理Bundle成功或失敗,會有守護線程異步刪除或更新beanstalkd內的Message(類似Storm Topology裏的Ack),Executors會把bundle傳輸狀態更新在ZookKeeper上,某一Executor掛掉都可以在一台slave上重新調起並恢複任務繼續進行。Input和Output端的Reader和Writer是插件化的。
==================================== 我是更新線 ====================================
之前在最近分布式係統開發小結裏,提到了一個在開發中的係統的大致設計,本文是我負責部分的一個詳細設計。在閱讀本文前可以先瀏覽下之前那篇文章,對於係統的功能和概況有個基本了解。
1. Slave總體設計

Input,也叫Reader。每個Task內隻有一個Input Executor,負責從數據源(HDFS、FTP、MySQL、MongoDB等)讀出數據,將數據經過切分、處理、壓縮後通過Netty流式傳輸給MemoryStorage。
Cache,也叫MemoryStorage。每個Task內隻有一個Cache Executor,負責從Input端接收Bundle,將Bundle存取往一個隊列內,當有Output連接的時候,將Bundle取出輸送給Output
Output,也叫Writer。每個Task可能有多個Output Executor,負責將數據最終輸出到數據目的源。Output從Cache端得到Bundle的過程也是流式的。
整個Task的流通都是流式的,且Slave之間的網絡通信使用的是Netty這個NIO框架,傳輸過程中還涉及到Bundle高效的正反序列化和壓縮、解壓縮。最重要的一點是Input、Cache、Output三個部分各自都有容錯設計,其中Input和Output通過向Zookeeper記錄和獲取Bundle狀態保證處理Bundle的不重不漏,而Cache通過對隊列內消息內容的鈍化,保證自身已保存的Bundle不丟失,並能在新的Cache Executor起來後,可以繼續為Output提供Bundle輸出。
2. Slave 詳細設計
下麵詳細介紹三種Executor的設計,閱讀過程中請參考這張Task進程圖。
2.1 Input設計
2.1.1 數據流通
每個Input負責一次Job(每個Job對應多個Tasks)內最小粒度的文件塊讀取,比如可能是一個HDFS Block,一張Hive表的一個分區甚至是一張MySQL表。
Input內還分有Writer、Buffer(雙隊列)和Reader。Writer是一個單線程,從數據源獲取數據並切分好Bundle,每個Bundle有唯一ID和定長的字符串數組,然後將Bundle存入雙隊列的輸入頭,在雙隊列的讀出頭有若幹個Reader線程搶占Bundle,每個Reader獲取到Bundle後釋放鎖並做二次處理、壓縮,最終Reader通過Netty Client將Bundle包裝成一個傳輸格式,以二進製流的方式通過Channel流向Cache。
2.1.2 容錯
2.2 Cache設計
2.2.1 消息隊列
Cache本身是一個Netty Server,接收Input和Output多個Netty Client的連接,並對不同的Channel做不同Event處理。Cache Executor需要一個多狀態的消息隊列,這裏采用的是Beanstalkd隊列,下圖為該Beanstalkd內消息(job)的狀態變化圖。

每次Cache將新的Bundle put進Beanstalkd的時候需要選擇一個tube(管道),Beanstalkd可以開啟多個獨立的tube,tube內存放jobs,每個job有自己唯一的job id,而job消息體就是我們的bundleBytes(Bundle存入Cache直接存的就是序列化後的byte[])。
每個job存入queue後是ready狀態,被reserve之後,就不能被客戶端再次獲取到,即Cache每次會從每個tube裏按順序reserve一個job,並發送消息體給Output(一個output對應一個tube),這個過程保證每個job被消費一次,且隻能被一個Output消費。如果Output端消費成功,則該job會被delete掉;如果該job消費失敗,則會被重新置為ready,重新置為ready可能是因為超時(每個job被reserve的時候都有一個Time-To-Run時間設置)了,也可以是客戶端release掉該job。2.2.2 Acker設計
這裏,對於tube內job的後續處理交給Acker這個線程來做。Acker的設計靈感來源於Storm。Storm Topology內每個bolt對tuple的執行和處理最終都會給Spout一個ack響應,而拓撲過程中整棵Tuple樹的成功/失敗執行狀態會由Acker守護進程進行跟蹤,以此來保證每個tuple被完全處理,而acker對tuple的跟蹤算法是Storm的主要突破之一。
Cache端的Acker線程會監聽zookeeper上znode樹上各個節點的事件變化,從而掌握被Output消費的所有Bundle的最後狀態,對應地刪除、釋放,或者更新Queue裏的job。需要注意的是這裏還涉及到一個更新job的過程。前麵提到Bundle內維護了一個index,而Output消費bundle的時候,如果是數據行寫了一半出現了異常或者掛掉了,我們需要記錄bundle內數據行的最新index並將此信息也記錄在znode上。對於這種最壞情況,Acker負責將該fail的job從queue裏delete掉,並更改job內bundle bytes內容,重置新的index,再把新的job put進queue裏。這是我們最不希望看到的情況,同時也是我們對Bundle能做的最細粒度的容錯設計。2.2.3 容錯
Beanstalkd啟動之後可以打開binlog開關,binlog是Beanstalkd容錯恢複的機製,將內存裏的消息隊列結構映射到硬盤上。對於Cache的容錯設計,直觀的辦法在於將這份binlog存在NFS或HDFS上,來保證Cache掛掉重啟後,能獲取到之前保存的Bundle數據,繼續提供服務。2.3 Output設計
Output在最終的Bundle消費階段,會把數據導向新的數據源。每個Output獲取的Bundle來自於Cache裏的一個tube,而每個Bundle的執行情況也會由Companion線程異步更新到Zookeeper上。
對於Output來說,它隻需要關心從Cache端獲取的每個Bundle都照常處理就可以了,不需要關心這個Bundle之前是否被消費過,被消費到哪裏。原因在於,Cache端的job狀態的變更和job的更新可以由Acker保障,而Acker是從zk上得到這些job的狀態並對Queue異步更新。如果Acker掛了,隻要重新起一個線程獲取znode上最新的狀態就可以了。對於Output來說,能傳過來的Bundle,對應到queue裏就是ready狀態的job,這個job可能被消費過了,但是他的index也因此得到了更新,Output端對於所有Bundle的處理是一致的,唯一需要關心的是Output需要把Bundle的信息異步更新給zk,如果Output掛了,重新起一個Output接著從Cache讀Bundle就可以了。
3. Slave模塊總結
Slave模塊三種Executor的設計,主要考慮的是各個Executor掛掉之後,怎樣保證數據處理的不重複和不遺漏。我們依賴Zookeeper的可靠性,記錄、更新、判斷Bundle的狀態,做到Input、Cache、Output各司其職,最到最小粒度的容錯。Executor本身的失敗和重啟則由Mesos保障,Mesos作為資源管理係統,由Master監控Slave上各個Executor的執行狀況,通過回調,可以在合適的Slave上再次啟動掛掉的Executor進程,保證業務Task的順利進行。
(全文完)
最後更新:2017-04-03 12:54:03
上一篇:
J2EE中獲取IP地址
下一篇:
2013百度校園招聘數據挖掘工程師
python xpath
C# Winform OpenFileDialog 控件
ITEXT實例學習與研究(二) 之 創建一個細長的淺黃色背景的頁麵以及縱向頁麵與橫向頁麵之間的切換
GridView + ViewFlipper布局界麵,模仿“機鋒市場”
Android下阻止係統掃描SD卡上的媒體圖像文件
優雲經驗談:交付自動化的探索與展望
Android TextView中文字通過SpannableString來設置超鏈接、顏色、字體等屬性
使用Go代替Ruby,將服務器數量從30降到2
馬雲&白若溪:有一種懂得,來自用心聆聽《雲棲蝦米音樂節》
物聯網數據本身用處不大 經過分析才有價值