802
京東網上商城
從Storm和Spark 學習流式實時分布式計算的設計
0. 背景
最近我在做流式實時分布式計算係統的架構設計,而正好又要參加CSDN博文大賽的決賽。本來想就寫Spark源碼分析的文章吧。但是又想畢竟是決賽,要拿出一些自己的幹貨出來,僅僅是源碼分析貌似分量不夠。因此,我將最近一直在做的係統架構的思路整理出來,形成此文。為什麼要參考Storm和Spark,因為沒有參照效果可能不會太好,尤其是對於Storm和Spark由了解的同學來說,可能通過對比,更能體會到每個具體實現背後的意義。
本文對流式係統出現的背景,特點,數據HA,服務HA,節點間和計算邏輯間的消息傳遞,存儲模型,計算模型,與生產環境融合都有涉及。希望對大家的工作和學習有所幫助。如果本文對您有所幫助,別忘了投一票!點我投票 (如果已經在投票頁麵,請接著向下看)
正文開始:
流式實時分布式計算係統在互聯網公司占有舉足輕重的地位,尤其在在線和近線的海量數據處理上。在線係統負責處理在線請求,因此低延時高可靠是核心指標。在線係統是互聯網公司的核心,係統的好壞直接影響了流量,而流量對互聯網公司來說意味著一切。在線係統使用的數據是來自於後台的計算係統產生的。
對於在線(區別於響應互聯網用戶請求的在線係統,這個在線係統主要是內部使用的,也就是說並不直接服務於互聯網用戶)/近線係統來說,處理的是線上產生的數據,比如在線係統產生的日誌,記錄用戶行為的數據庫等,因此近線係統也需要低延時高可靠的處理海量數據。對於那些時效性很強的數據,比如新聞熱點,電商的促銷,微博熱詞等都需要在很短的時間內完成數據處理以供在線係統使用。
而處理這些海量數據的,就是實時流式計算係統。Spark是實時計算的係統,支持流式計算,批處理和實時查詢。它使用一個通用的stack解決了很多問題,畢竟任何公司都想要Unified的平台去處理遇到的問題,可以減少開發和維護的人力成本和部署平台的物力成本。除了Spark,流式計算係統最有名的就是Twitter的Storm和Yahoo的S4(其實Spark的流式計算還是要弱於Storm的,個人認為互聯網公司對於Storm的部署還是多於Spark的)。
本文主要探討流式計算係統的設計要點,並且通過對Spark和Storm的實現來給出實例。通過對於係統設計要點的梳理,也可以幫助我們更好的學習這些係統的實現。最後,看一下國內互聯網公司對於這些流式係統的應用(僅限於公開發表的內容)。
1. 流式計算的背景和特點
現在很多公司每天都會產生數以TB級的大數據,如何對這些數據進行挖掘,分析成了很重要的課題。比如:
- 電子商務:需要處理並且挖掘用戶行為產生的數據,產生推薦,從而帶來更多的流量和收益。最理想的推薦就是根據興趣推薦給用戶本來不需要的東西!而每天處理海量的用戶數據,需要一個低延時高可靠的實時流式分布式計算係統。
- 新聞聚合:新聞時效性非常重要,如果在一個重大事情發生後能夠實時的推薦給用戶,那麼肯定能增大用戶粘性,帶來可觀的流量。
- 社交網站:大家每天都會去社交網站是為了看看現在發生了什麼,周圍人在做什麼。流式計算可以把用戶關注的熱點聚合,實時反饋給用戶,從而達到一個圈子的聚合效果。
- 交通監管部門:每個城市的交通監管部門每天都要產生海量的視頻數據,這些視頻數據也是以流的形式源源不斷的輸係統中。實時流式計算係統需要以最快的速度來處理這些數據。
- 數據挖掘和機器學習:它們實際上是互聯網公司內部使用的係統,主要為線上服務提供數據支撐。它們可以說是互聯網公司的最核心的平台之一。係統的效率是挖掘的關鍵,理想條件下就是每天產生的海量數據都能得到有效處理,對於原來的數據進行全量更新。
- 大型集群的監控:自動化運維很重要,集群監控的實時預警機製也非常重要,而流式係統對於日誌的實時處理,往往是監控係統的關鍵。
- 等等。
流式實時分布式計算係統就是要解決上述問題的。這些係統的共同特征是什麼?
- 非常方便的運行用戶編寫的計算邏輯:就如Hadoop定義了Map和Reduce的原語一樣,這些係統也需要讓用戶關注與數據處理的具體邏輯上,他們不應該也不需要去了解這些usder defined codes是如何在分布式係統上運轉起來的。因為他們僅僅關注與數據處理的邏輯,因此可以極大的提高效率。而且應該盡量不要限製編程語言,畢竟不同的公司甚至同一公司的不同部門使用的語言可能是千差萬別的。支持多語言無疑可以搶占更多的用戶。
- Scale-out的設計:分布式係統天生就是scale-out的。
- 無數據丟失:係統需要保證無數據丟失,這也是係統高可用性的保證。係統為了無數據丟失,需要在數據處理失敗的時候選擇另外的執行路徑進行replay(係統不是簡單的重新提交運算,而是重新執行調度,否則按照來源的call stack有可能使得係統永遠都在相同的地方出同樣的錯誤)。
- 容錯透明:用戶不會也不需要關心容錯。係統會自動處理容錯,調度並且管理資源,而這些行為對於運行於其上的應用來說都是透明的。
- 數據持久化:為了保證高可用性和無數據丟失,數據持久化是無法躲避的問題。的確,數據持久化可能在低延時的係統中比較影響性能,但是這無法避免。當然了,如果考慮到出錯情況比較少,在出錯的時候我們能夠忍受數據可以從頭replay,那麼中間的運算可以不進行持久化。注意,這隻有在持久化的成本要比計算的replay高的情況下有效。一般來說,計算的結果需要replica,當然了,可以使用將數據replica到其他的節點的內存中去(這又會占用集群的網絡帶寬)。
- 超時設置:超時之所以在在這裏被提出來,因為超時時間的大小設置需要重視,如果太短可以會誤殺正常運行的計算,如果太長則不能快速的檢測錯誤。還有就是對於錯誤的快速發現可以這類係統的一個設計要點,畢竟,超時了才發現錯誤很多時候在時效性上是不可接受的。
2. 原語設計
Hadoop定義了Map和Reduce,使得應用者隻需要實現MR就可以實現數據處理。而流式係統的特點,允許它們可以進行更加具體一些的原語設計。流式的數據的特點就是數據時源源不斷進入係統的,而這些數據的處理一般都需要幾個階段。拿普通的日誌處理來說,我們可能僅僅關注Error的日誌,那麼係統的第一個計算邏輯就是進行filer。接下來可能需要對這個日誌進行分段,分段後可能交給不同的規則處理器進行處理。因此,數據處理一般是分階段的,可以說是一個有向無環圖,或者說是一個拓撲。實際上,Spark抽象出的運算邏輯就是由RDD(Resilient Distributed Dataset)構成DAG(Directed Acyclic Graph),而Storm則有Spout和Blot構成Topology(拓撲)。
2.1 Spark的設計
Spark Streaming是將流式計算分解成一係列短小的批處理作業。這裏的批處理引擎是Spark,也就是把Spark Streaming的輸入數據按照batch size(如1秒)分成一段一段的數據,每一段數據都轉換成Spark中的RDD,然後將Spark Streaming中對DStream的Transformation操作變為針對Spark中對RDD的Transformation操作,將RDD經過操作變成中間結果保存在內存中。整個流式計算根據業務的需求可以對中間的結果進行疊加,或者存儲到外部設備。下圖顯示了Spark Streaming的整個流程。
WordCount的例子:
// Create the context and set up a network input stream to receive from a host:port val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1)) val lines = ssc.socketTextStream(args(1), args(2).toInt) // Split the lines into words, count them, and print some of the counts on the master val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() // Start the computation ssc.start()
這個例子使用Scala寫的,一個簡單優雅的函數式編程語言,同時也是基於JVM的後Java類語言。
2.2 Storm的設計
Storm將計算邏輯成為Topology,其中Spout是Topology的數據源,這個數據源可能是文件係統的某個日誌,也可能是MessageQueue的某個消息隊列,也有可能是數據庫的某個表等等;Bolt負責數據的護理。Bolt有可能由另外兩個Bolt的join而來。
而Storm最核心的抽象Streaming就是連接Spout,Bolt以及Bolt與Bolt之間的數據流。而數據流的組成單位就是Tuple(元組),這個Tuple可能由多個Fields構成,每個Field的含義都在Bolt的定義的時候製定。也就是說,對於一個Bolt來說,Tuple的格式是定義好的。
2.3 原語設計的要點
流式係統的原語設計,要關注一下幾點:
- 如何定義計算拓撲:要方便算法開發者開發算法與策略。最好的實現是定義一個算法與框架的交互方式,定義好算法的輸入結構和算法的輸出結構。然後拓撲能夠組合不同的算法來為用戶提供一個統一的服務。計算平台最大的意義在於算法開發者不需要了解程序的運行,並發的處理,高可用性的實現,隻需要提供算法與計算邏輯即可以快速可靠的處理海量的數據。
- 拓撲的加載與啟動:對於每個節點來說,啟動時需要加載拓撲,節點需要其他的信息,比如上遊的數據來源與下遊的數據輸出。當然了下遊的數據輸出的拓撲信息可以存儲到Tuple中,對於數據需要放到那裏去拓撲本身是無狀態的。這就取決於具體的設計了。
- 拓撲的在線更新:對於每個算法邏輯來說,更新是不可避免的,如何在不停止服務的情況下進行更新是必要的。由於實現了架構與算法的剝離,因此算法可以以一個單獨的個體進行更新。可以操作如下:Master將算法實體保存到一個Worker可見的地方,比如HDFS或者是NFS或者ZK,然後通過心跳發送命令到拓撲,拓撲會暫時停止處理數據而加載新的算法實體,加載之後重新開始處理數據。數據一般都會放到buffer中,這個buffer可能是一個queue。但是從外界看來,拓撲實際上是一直處於服務狀態的。
- 數據如何流動:流式係統最重要的抽象就是Streaming了。那麼Steaming如何流動?實際上涉及到消息的傳遞和分發,數據如何從一個節點傳遞到另外一個節點,這是拓撲定義的,具體實現可以參照第三小節。
- 計算的終點及結果處理:流式計算的特點就是計算一直在進行,流是源源不斷的流入到係統中的。但是對於每個數據單位來說它的處理結果是確定的,這個結果一般是需要返回調用者或者需要持久化的。比如處理一個時間段的交通違章,那麼輸入的數據是一段時間的視頻監控,輸出這是違章的信息,比如車牌,還有違章時刻的抓拍的圖片。這個數據要麼返回調用者,由調用者負責數據的處理,包括持久化等。或者是拓撲最後的節點將這些信息進行持久化。係統需要對這些常見的case進行指導性的說明,需要在Programmer Guide的sample中給出使用例子。
3. 消息傳遞和分發
對於實現的邏輯來說,它們都是有向無環圖的一個節點,那麼如何設計它們之間的消息傳遞呢?或者說數據如何流動的?因為對於分布式係統來說,我們不能假定整個運算都是在同一個節點上(事實上,對於閉源軟件來說,這是可以的,比如就是滿足一個特定運算下的計算,計算平台也不需要做的那麼通用,那麼對於一個運算邏輯讓他在一個節點完成也是可以了,畢竟節省了調度和網絡傳輸的開銷)。或者說,對於一個通用的計算平台來說,我們不能假定任何事情。
消息傳遞和分發是取決於係統的具體實現的。通過對比Storm和Spark,你就明白我為什麼這麼說了。
3.1 Spark的消息傳遞
對於Spark來說,數據流是在通過將用戶定義的一係列的RDD轉化成DAG圖,然後DAG Scheduler把這個DAG轉化成一個TaskSet,而這個TaskSet就可以向集群申請計算資源,集群把這個TaskSet部署到Worker中去運算了。當然了,對於開發者來說,他的任務是定義一些RDD,在RDD上做相應的轉化動作,最後係統會將這一係列的RDD投放到Spark的集群中去運行。
3.2 Storm的消息傳遞
對於Storm來說,他的消息分發機製是在定義Topology的時候就顯式定義好的。也就是說,應用程序的開發者需要清楚的定義各個Bolts之間的關係,下遊的Bolt是以什麼樣的方式獲取上遊的Bolt發出的Tuple。Storm有六種消息分發模式:
- Shuffle Grouping: 隨機分組,Storm會盡量把數據平均分發到下遊Bolt中。
- Fields Grouping:按字段分組, 比如按userid來分組, 具有同樣userid的tuple會被分到相同的Bolt。這個對於類似於WordCount這種應用非常有幫助。
- All Grouping: 廣播, 對於每一個Tuple, 所有的Bolts都會收到。這種分發模式要慎用,會造成資源的極大浪費。
- Global Grouping: 全局分組, 這個Tuple被分配到storm中的一個bolt的其中一個task。這個對於實現事務性的Topology非常有用。
- Non Grouping: 不分組, 這個分組的意思是說stream不關心到底誰會收到它的tuple。目前這種分組和Shuffle grouping是一樣的效果, 有一點不同的是storm會把這個bolt放到這個bolt的訂閱者同一個線程裏麵去執行。
- Direct Grouping: 直接分組, 這是一種比較特別的分組方法,用這種分組意味著消息的發送者指定由消息接收者的哪個task處理這個消息。
3.3 消息傳遞要點
消息隊列現在是模塊之間通信的非常通用的解決方案了。消息隊列使得進程間的通信可以跨越物理機,這對於分布式係統尤為重要,畢竟我們不能假定進程究竟是部署在同一台物理機上還是部署到不同的物理機上。RabbitMQ是應用比較廣泛的MQ,關於RabbitMQ可以看我的一個專欄:RabbitMQ
提到MQ,不得不提的是ZeroMQ。ZeroMQ封裝了Socket,引用官方的說法: “ZMQ (以下 ZeroMQ 簡稱 ZMQ)是一個簡單好用的傳輸層,像框架一樣的一個 socket library,他使得 Socket 編程更加簡單、簡潔和性能更高。是一個消息處理隊列庫,可在多個線程、內核和主機盒之間彈性伸縮。ZMQ 的明確目標是“成為標準網絡協議棧的一部分,之後進入 Linux 內核”。現在還未看到它們的成功。但是,它無疑是極具前景的、並且是人們更加需要的“傳統”BSD 套接字之上的一層封裝。ZMQ 讓編寫高性能網絡應用程序極為簡單和有趣。”
因此, ZeroMQ不是傳統意義上的MQ。它比較適用於節點之間和節點與Master之間的通信。Storm在0.8之前的Worker之間的通信就是通過ZeroMQ。但是為什麼0.9就是用Netty替代了ZeroMQ呢?說替代不大合適,隻是0.9的默認的Worker之間的通信是使用了Netty,ZeroMQ還是支持的。Storm官方認為ZeroMQ有以下缺點:
- 不容易部署,尤其是在雲環境下:以為ZMQ是以C寫的,因此它還是緊依賴於操作係統環境的。
- 無法限製其內存。通過JVM可以很容易的限製java所占用的內存。但是ZMQ對於Storm來說是個黑盒似得存在。
- Storm無法從ZMQ獲取信息。比如Storm無法知道當前buffer中有多少數據為發送。
當然了還有所謂的性能問題,具體可以訪問Netty作者的blog。結論就是Netty的性能比ZMQ(在默認配置下)好兩倍。不知道所謂的ZMQ的默認配置是什麼。反正我對這個結果挺驚訝。當然了,Netty使用Java實現的確方便了在Worker之間的通信加上授權和認證機製。這個使用ZMQ的確是不太好做。
4. 高可用性
HA是分布式係統的必要屬性。如果沒有HA,其實係統是不可用的。那麼如果實現HA?對於Storm來說,它認為Master節點Nimbus是無狀態的,無狀態意味著可以快速恢複,因此Nimbus並沒有實現HA(不知道以後的Nimbus是否會實現HA,實際上使用ZooKeeper實現節點的HA是開源領域的通用做法)。為什麼說Nimbus是無狀態的呢?因為集群所有的元數據都保存到了ZooKeeper(ZK)中。Nimbus定時從ZK獲取信息,並且通過向ZK寫信息來控製Worker。Worker也是通過從ZK中獲取信息,通過這種方式,Worker執行從Nimbus傳遞過來的命令。
Storm的這種使用ZK的方式還是很值得借鑒的。
Spark是如何實現HA的?我的另外一篇文章分析過Spark的Master是怎麼實現HA的:Spark技術內幕:Master基於ZooKeeper的High Availability(HA)源碼實現 。
也是通過ZK的leader 選舉實現的。Spark使用了百行代碼的級別實現了Master的HA,由此可見ZK的功力。
除了這些Master的HA,還有每個Worker的HA。或者說Worker的HA說法不太準確,因此對於集群裏的工作節點來說,它可以非常容易失敗的。這裏的HA可以說是如何讓Worker失敗後快速重啟,重新提供服務。實現方式也可以由很多種。一個簡單的方法就是使用一個容器(Container)啟動Worker並且監控Worker的狀態,如果Worker異常退出,那麼就重新啟動它。這個方法很簡單也很有效。
如果是節點宕機呢?上述方法肯定是不能用的。這種情況下Master會檢測到Worker的心跳超時,那麼就會從資源池中把這個節點刪除。回到正題,宕機後的節點重啟涉及到了運維方麵的知識。對於一個集群來說,硬件宕機這種情況應該需要統一的管理,也就是集群也可以由一個Master,維持每個節點的心跳來確定硬件的狀態。如果節點宕機,那麼集群首先是重啟它。如果啟動失敗可能會通過電話或者短信或者郵件通知運維人員。因此運維人員為了保證集群的高可用性付出了很多的努力,尤其是大型互聯網公司的運維人員,非常值得點讚。當然了這個已經不是Storm或者Spark所能涵蓋的了。
5. 存儲模型與數據不丟失
其實,數據不丟失有時候和處理速度是矛盾的。為了數據不丟失就要進行數據持久化,數據持久化意味著要寫硬盤,在固態硬盤還沒有成為標配的今天,硬盤的IO速度永遠是係統的痛點。當然了可以在另外節點的內存上進行備份,但是這涉及到了集群的兩個稀缺資源:內存和網絡。如果因為備份而占用了大量的網絡帶寬的話,那必將影響係統的性能,吞吐量。
當然了,可以使用日誌的方式。但是日誌的話對於錯誤恢複的時間又是不太能接受的。流式計算係統的特點就是要快,如果錯誤恢複時間太長,那麼可能不如直接replay來的快,而且係統設計還更為簡單。
其實如果不是為了追求100%的數據丟失,可以使用checkpoint的機製,允許一個時間窗口內的數據丟失。
回到係統設計本身,實際上流式計算係統主要是為了離線和近線的機器學習和數據挖掘,因此肯定要保證數據的處理速度:至少係統可以處理一天的新增數據,否則數據堆積越來越大。因此即使有的數據處理丟失了數據,可以讓源頭重新發送數據。
還有另外一個話題,就是係統的元數據信心如何保存,因為係統的路由信息等需要是全局可見的,需要保存類似的這些數據以供集群查詢。當然了Master節點保持了和所有節點的心跳,它完全可以保存這些數據,並且在心跳中可以返回這些數據。實際上HDFS的NameNode就是這麼做的。HDFS的NN這種設計非常合理,為什麼這麼說?HDFS的元數據包含了非常多的數據:
- 目錄文件樹結構和文件與數據塊的對應關係:會持久化到物理存儲中,文件名叫做fsimage。
- DN與數據塊的對應關係,即數據塊存儲在哪些DN中:在DN啟動時會上報到NN它所維護的數據塊。這個是動態建立的,不會持久化。因此,集群的啟動可能需要比較長的時間。
那麼對於流式計算係統這種算得上輕量級的元數據來說,Master處理這些元數據實際上要簡單的多,當然了,Master需要實現服務的HA和數據的HA。這些不是一個輕鬆的事情。實際上,可以采用ZooKeeper來保存係統的元數據。ZooKeeper使用一個目錄樹的結構來保存集群的元數據。節點可以監控感興趣的數據,如果數據有變化,那麼節點會收到通知,然後就保證了係統級別的數據一致性。這點對於係統比較重要,因為節點都是不穩定的,因此係統的其他服務可能都會因為節點失效而發生變化,這些都需要通知相關的節點更新器服務列表,保證了部分節點的失效並不會影響係統的整體的服務,從而也就實現了故障對於用戶的透明性。
6. 如何與公司已有的生產環境進行融合
包括Spark和Storm,在國內著名的互聯網公司比如百度,淘寶和阿裏巴巴都有應用,但是它究竟貢獻了多少流量是不得而知的。我了解到的是實際上大部分的流量,尤其是核心流量還是走公司的老架構的。著名的博主陳皓在微博上關於閉源軟件和開源軟件“特點”之爭算是引起了軒然大波,具體討論可以見知乎。之所以引用這個爭論也是為了切合本小節的主題:如何與公司已有的生產環境進行融合。
雖然互聯網公司的產品迭代很快,但是公司的核心算法和架構基本上改動不會那麼多,因此公司不可能為了推動Storm和Spark這種開源產品而進行大規模的重新開發。隻有那麼後起的項目,從零開始的項目,比如小規模的調研項目才可能用這些產品。當然了開源產品首先是一個通用的平台,但是通用有可能產生的代價就是不那麼高效,對於某些特殊地方的不能根據特殊的應用場景進行優化。如果對這個開源平台進行二次開發,使得性能方麵滿足自己的需求,首先不管法務上的問題,對於自己私有版本和社區版本進行merge也是個很大的challenge。就像現在很多公司對於Linux進行了二次裁剪,開發自己需要的Linux一樣。都需要一些對於這些架構非常熟悉,並且非常熟悉社區動態的人去做這些事情。而這些在互聯網公司,基本上是不可能的。因此大部分時候,都是自己做一個係統,去非常高效切合的去滿足自身的需求。
當然了,開源社區的閃光點也會影響到閉源產品,閉源產品也會影響開源產品,這個相互影響是良性的,可以推動技術向前發展。
7. 總結
Storm和Spark的設計,絕對不是一篇文章所能解決的。它裏邊由非常多的哲學需要我們仔細去學習。它們可以說是我們進行係統設計的良好的範例。本博客在接下來的半年會通過Spark的源碼來學習Spark的係統架構。敬請期待!
如果本文對您有所幫助,別忘了投一票!點我投票 如果已經在投票頁麵,那麼點擊下麵吧!
最後更新:2017-04-03 05:39:37