內存計算
阿裏雲數據事業部強琦為大家帶來內存計算方麵的內容,本文主要從軟硬件趨勢、分布式計算簡史與內存計算開始談起,包括HIVE、ADS的介紹,接著分析了統一的計算框架,最後講解了Spark和Flink經典的係統技術分析。一起來了解下吧。
軟硬件趨勢
我們現在使用的主流硬件從多核CPU 32核/56核,內存192G /384G,以及定製機型下更大的內存,存儲層級可以做到三T的SSD/11×6T的SATA硬盤,而網絡拓撲和帶寬從IDC內的萬兆網卡到IDC間的專線光纜,還有大數據和它的複用程度,讀寫比比較高的數據是業務價值極高的數據,我們可以針對不同的讀寫比的數據進行不同的係統優化,隨之而來會有相應的問題:
- 從小型機到分布式到單機能力提升,矛盾麼?
- 是否單機能力越強越好?
- 構建在虛擬機上的分布式?
數據密集型的計算可能會根據不同的計算平台選定不同的機型號,這就需要看計算任務到底 短板和瓶頸在哪裏,比如瓶頸在CPU,那我們適當的增加CPU核心,把混合存儲和內存降下來,這樣可以有效的提高整個的資源利用率。
從現在的軟硬件趨勢可以看到,無疑CPU越來越快、memory越來越大、存儲層級越來越豐富。
分布式計算簡史與內存計算
經典的DataBase(DB)和現在比較火的BigData(BD)有哪些異同點呢?
從DataBase來看,數據是業務用戶產生的,數據都必須schema化,而且保證強一致性,支持隨機訪問,數據實時的insert要能實時的查詢,數據的訪問按照CIUD集中的範式,機房是分散的,注重延時,還有一些其他顯著特征,這是經典的OLTP的類似功能。
現在湧現的BigData技術體係,它的數據源是業務db或者log等,強調寬表,注重掃描,它是離線數據,側重於數據計算,機房集中,BD重吞吐,最經典的領域就是數據倉庫 OLAP。
DB勢必要影響BD,而BD的一些技術最終也會推進DB領域的技術演進。
BigData
BD的數據有什麼特點如下:
Volume、Velocity、Variety。
它的數據量是極其巨大的,對成本的要求非常苛刻;速度方麵,數據不再以天、小時為單位,數據入口的吞吐、整個的時效性和用戶的體驗都有非常高的要求;多樣性方麵,數據怎麼融合,數據源采集非常廣泛,格式也是五花八門,對質量的把控也完全不一樣。
今天BigData所麵臨的情況與DataBase不可同日而語,麵臨著巨大的複雜性和難度。
BD數據量有擴展性、有成本問題,BD的技術棧表示能力也是從支持機器學習到非SQL領域, 從半結構化到非結構化,它的計算模式以掃描為主。
GFS
分布式計算作為軟件係統,也有著它與之匹配的演進。
在工業界、產業界,分布式計算蓬勃發展有賴於GFS,很快就對應著開源實現,HDFS開源社區的蓬勃發展,在某種程度上,也給國內的技術從業者提供了很大的學習機會。
- GFS主要是進行大文件或者快文件的存儲,在GFS之上的係統可以把小文件合並成大文件,然後進行存儲,流式文件同樣根據它寫入的特征特點和它讀取的特點,在大文件上可以封裝出流式文件;
- GFS具備不可修改性,在這種情況下,可以封裝出Mutable的功能特性,比如說LSM可以支持隨機讀寫;
- GFS擴展性可以達到跨核心、跨IDC、跨國;
- GFS使用廉價的服務器降低成本,通過replication來增加它的可靠性。
編程模型
在此之前,普通的技術開發者很難在集群上編寫分布式程序和並行程序,編程模型在其表達能力方麵,從RPC到MPI到SQL,它的表達能力是在不斷的降弱,但是它的應用型是在不斷的增強。
要在分布式上編程,程序怎麼能做到擴展性?
- 如何根據網絡拓撲來切分任務,怎麼利用數據存儲的本地化來避免不必要的網絡傳輸和網絡帶寬?
- 如何去容錯?1000台服務器的集群,隻要有任意一個有故障,都算做整個集群有故障。
- 長尾效應。
無論是異構機型還是異構的網絡拓普,或是根據數據分布的data scale,都會造成長尾效應,計算會被其中最慢的那個節點拖後腿,長尾會對整個用戶提交任務的延時和對雪崩造成非常大的傷害。
計算邏輯和編程模型表示方麵,ETL的工程師大多希望使用SQL建模的工程師,也會使用OLAP,而機器學習由於其對性能的極致要求,用戶會希望係統開放最底層接口。
表示能力和自由度其實是一對treadoff,表示能力越強約束就越少,如果對原語進行約束,係統就會做很多failover。
MapReduce
MapReduce約束了用戶的編寫能力、編寫範式,使得係統可以做非常多的分割容錯的工作,用戶隻關心它的業務邏輯。
MapReduce適用於通用的數據密集型編程模型,最新的Hadoop 2.0其根基在於一個係統叫做Yarn,隨著MapReduce的湧現,不同的數據密集型的編程模型也不斷的出現,例如,圖計算模型Pregel、Spark、Dremel 、Drill。
MapReduce的編程模型非常簡單,它把Data Type建模成Key-value模型,那麼Map function接受一條數據返回一個list(k,v),這個list(k,v)會通過一個本地的Combiner,Combiner接受(k,list(v)),返回list(k,v),那麼,進行這樣本地的重新組合以後,通過網絡的shuffle進行數據的重新組織,根據K來組織的Reduce function,接受(k, list(v)),輸出list(out)。
大致的執行過程如圖,提交任務後,Master會將數據進行split 切分,然後分配給不同的worker,也就是Map節點,每個Map節點回調用戶Map的implement,每個worker會回調
用戶的Map實現。每個Map實現都會對著N個Reduce,通過shuffle輸出文件,本地shuffle會進行sort和merge,進而可以調用Combiner,減少不必要的網絡傳輸,當每個Map結束,
它的N個Reduce的結果都已經存儲在本地,這時Master知道Map都已經結束,拉起Reduce節點,並且告知Reduce節點的數據,Reduce的shuffle會根據相應的地址,把它的每個Map裏麵屬於它的文件拖回到本地,然後,不同Map節點相同Reduce節點的文件進行sort merge,sort merge完畢後,調用用戶的Reduce方式,最後輸出。
MapReduce模型,每步都要落地,並且shuffle是N×M的,N假設是Map,M是Reduce數,這裏頭存在大量的文件尋道和傳輸。
MapReduce的曆史意義是非常重大,自動切分任務為多個小task,根據調度策略,兼顧本地性(gfs),將map tasks下發到執行節點,當task結束後,進行load balance,當全部map task結束後,根據reduce並發拉起進程,進行shuffle、sort merge,當本節點shuffle全部結束後,進行reduce task。
如果task失敗,重新運行task,可以看到所有數據都是Immutable,不存在版本問題;如果node掛了,挑選節點重新運行task;如果有慢的task,backuptask。
調度
第一代的MapReduce,有非常多的問題。比如,無論是hdfs還是MapReduce,其Master都是單點,一旦宕機會有較長的恢複時間或者不可恢複性;而計算的Jobtracker,全局唯一,如果使用大量的文件則會加劇元數據的膨脹,資源分配和任務內調度混合;從資源角度,靜態資源無法進行動態的劃分,一旦分配出去,資源無法與別人共享;劃分粒度較大,隔離性較差。鑒於以上原因,規模很難線性擴展,而且,單一Master造成整個升級非常困難。
圖是Hadoop2.0也就是Yarn的架構,可以看到最明顯的一個標誌,即將一層資源調度
和二層業務邏輯擺放分成了兩個角色,也就是RM與AM的分離;增加了Container,保證更精細化資源的調度。當然,現在的Hadoop軟件棧豐富了很多,從最底下的HDFS到上麵的Yarn,也出來了很多新的東西,比如說Tez。
那麼,從分布式計算來看,如何去切任務、如何去選資源、如何在這些載體裏麵擺放以最大化運行效率、如何運行下發的任務、以及如何控製時機?
Hive
Hive是facebook開源的,它總體上是工作在MapReduce基礎上,由Client 或者JDBC接受用戶的SQL請求,然後將SQL進行語法的Parser,經過邏輯執行計劃、物理執行計劃下發,將一個SQL翻譯成MapReduce的物理執行計劃,下發到MapReduce機群。
上圖是一個MapReduce Module組成的DAG有向無環圖。可以看到,HIVE是一個SQL的語法,所以可以提取出大量的元信息進行global的存儲,進行權限控製。這個圖描述了整個HIVE的下發執行邏輯,用戶提交query發布SQL,通過編譯提交編譯模塊,拿到物理執行計劃,然後通過執行引擎向job tracker提交任務,最終根據用戶的DDL信息存儲,並且通知用戶fetch結果。HIVE是完全架構在Hadoop、MapReduce的基礎上。
Back to SQL
MapReduce的工作層次太低,而SQL leval太高、表達能力很強,整個的MapReduce執行是一個串行的運行,而DAG每一步都會落在磁盤上麵,產生大量的磁盤IO、網絡IO和磁盤尋道,Hadoop引入了Distributed cache,另外,MapReduce編寫程序對整體存儲耦合過重,所以MapReduce編寫代碼成本較高。
返回SQL來看,SQL完全是麵向用戶的視角,受眾麵和用戶群非常廣,易用性非常好,已經有非常好的行業標準,而且不斷的在演進;
從係統視角來說,schema是一種知識,有了schema,就可以把數據質量控製在進入環節,可以製定很detail的數據安全策略,數據粒度可以到行、也可以到列,因為有了SQL,我們可以把基礎算子進行組合,另外 SQL的出現,使得係統可以去理解用戶的功能目的,作出大量的優化。
分布式計算領域也有按照DB的思路來做分布式計算的,比如MPP數據庫,它的架構一般都分為存儲引擎、物理執行引擎、SQL三層。而BigData的分層,一般最底下也是存儲層,但是這個存儲層跟DB的存儲層有一些不一樣,這個存儲層是分布式存儲層,BigData一般會存在物理執行引擎、這個物理執行引擎定義了最基本的原語,上麵的算子表示層可以利用這個原語實現各種不同的基礎算子,比如說distinct 、discounting 、count sum以及各種UDF 、UDEF,在此之上可以構建SQL的語意、machine learning的語意等,DB向上隻提供SQL,而BigData 向應用即暴露物理執行原語,也可以暴露表示層原語,也可以支持各種DSL。
那麼,無論是DataBase還是BigData,發展了幾十年,有相當多的理論基礎沉澱和工業實踐。DB和BD在技術層麵上肯定存在融合互相借鑒,包括Schema、邏輯執行計劃、物理執行計劃、Index、更精細化的存儲格式、數據庫領域的物化視圖、內存的有效使用等方麵,DB與BD的融合已經開始發生,無法阻擋。
數據庫領域也會學習分布式計算,支持更複雜的更多變的數據結構,支持嵌套結構、支持更複雜的計算,抽象出更多的表示層,開放出更多的表示能力,我們不希望在異構係統之間拖動數據,增加數據成本和移動成本,使得不同的計算架構在一套執行器上,在運行時複用一套元數據。如今,GFS和MapReduce的出現,Hadoop社區蓬勃發展起來,Tez 、Dremel、 Drill、Lmpala 、Hana、Percolator 、Piccolo、Spark 和Flink相繼問世,分布式計算領域係統層出不窮。
ADS
近期我們正式推出了阿裏巴巴的分析數據庫服務,叫做ADS。它支持的業務響應延時在1秒左右,它是進行OLAP分析,所處理的數據規模在千億到萬億之間;ADS無需建模,具備較高的靈活性,ADS使用了大量的數據庫索引技術和搜索的索引技術;它在成本計算模型上跟Hadoop和Impala還是有著自己獨特的定位;在請求次數少於一定層數以內,加載索引的成本高於運行時成本,這時候用它是不劃算的,但是高於這個成本以後,它基本上的使用成本
不會隨著你的調用次數而增加。
ADS完全兼容SQL92的標準,它大量引入了數據庫搜索的索引技術,支持選數據、跳略過無效用數據、掃數據,ADS支持數據庫的CRUD操作,支持多租戶,以服務化的形式提供服務。
那麼可以看到,ADS分成了三大部分:
- 一部分是Console,其中分為WebConsole和AdminConsole;
- 第二部分是控製集群,它管理整個集群的Cluster、完全用戶的Quota;
- 第三部分是計算集群,計算集群分成前端節點和計算節點,計算節點是真正一個任務,也就是SQL下發執行的地方,在每個Compute Node上執行相應表相應分區,本地將這個表的本分區任務進行計算後,匯總到一台Compute Node上進行全局的匯總,最終返回到Front Node給用戶返回。
整個ADS架設到阿裏巴巴飛天的技術平台上,ADS做到快的原因主要有:
第一,整個集群預先拉起;
第二 , 製定豐富的數據結構,比如Index和元數據、幫助計算跳數據、最大減少掃數據;
ADS采用大量的基於成本和規則的優化、以及HBO的優化來針對不同的擺放策略、數據類型、元數據,針對用戶的SQL進行物理執行計劃和SQL改寫。
內存計算
在我看來,所謂的狹義定義內存計算應該是最大化利用內存容量,並且可編程,框架內置容錯,可以對數據在內存的擺放Replication和partition進行有效控製,並且內存數據進行最大化的reuse。
流式計算引入的Batch,可以把一批數據切批,每批相對比較小,都可以放在內存裏頭運行,而Batch內進行串行運算,Batch間進行並行運算,它所需要的數據Sort Merge都可以在內存完成,而Merge是一個update oldValue的過程,即便它超出內存,我們也可以讓它全內存運算,Snapshot state可以引入Incremental一個snapshot機製進行有效的容錯和存儲訪問,並且線性擴展。
統一的計算框架
曆史上的不同計算係統,目前大有融合的趨勢。
我們從統一計算框架的視角來看,換為緯度看計算:數據是不是分批執行、數據的Shuffle方式是push還是pull、數據進程或者說用戶task是否需要預先拉起。
如圖所示,A0B1C0為傳統離線,A0B0C1為service mode,A1B0C1為流計算。
我們支持8種任意組合,隻不過有些組合效率比較低,引入一個狀態計算的計算框架MRM,提供一個靈活高效的Shuffle service以及一個靈活的APPMaster體係。
可以看到我們在進行嚐試的統一計算框架裏有很重要的角色,叫Runtime Controller,其中有Local AM,意思是在線請求或者對延時較高的請求,可以不唯一通過AM來提交任務,每個Runtime Controller裏麵有一個本地的Local AM,保留著足夠供它決策的信息;我們這裏的Session概念指的是在Session內的所有類型的job都可以複用數據,所以Session是複用數據的邊界,在Session之外的job數據複用隻能借助第三方存儲比如DFS或者類似Tachyon這樣的文件類型係統,或者其他的TV存儲;在Local AM裏有一個重要的管理組件叫做DAGManager,其中的DAGSession是管理提前拉起的物理執行dag,而與之對應的BlockManager裏的重要數據結構BlockSession管理任務間複用的數據全局運行時管理,而VertexManager管理和開放所有的控製邏輯運行時機,LocalAM保留著跟AM裏數據結構一致的本地AM,包括DAGManager、BlockManager和VertexManager,然後AM會通過心跳將新增加的DAGManager裏的信息同步給所有的Runtime Controller的localAM,所以,在線請求提交任務的時候,如果發現localAM裏已經有與這個請求相匹配的拉起的物理DAG執行資源,直接選取這些資源,然後再查看這個計算涉及到的數據是不是已經被加載在BlockSession裏頭去,如果已經有進行合適的擺放,選擇完所有所需要的資源後直接下發Worker,當然,大家可以看到不同的Runtime Controller的資源是一樣的。
算子
與傳統的離線不同的是,在Worker端會存在不同任務的競爭,所以,在Worker端會有本地的調度。算子層也提供了五類基礎算子map , reduce , merge , shuffle , union ,這五類基礎算子是“正交”的,可以組合出複合算子,或衍生出高級算子。
圖為幾個算子之間數據結構的流轉圖。
可以看到這個Case包含了離線計算、實時計算和adhoc的查詢,這段代碼是為在線準備物理拓撲,一旦拓撲建立,在線請求將不會通過AM,而直接通過本地Local AM找到合適的計算資源進行下發執行。
在線的這部分query直接可以複用離線拉起的DAGSession,從而達到秒級以內的在線請求。
問題引申
統一計算框架的引入,定義了Runtime Controller,其看到的資源基本一致,需要Worker層的本地調度,它具有靈活的表示層,但是靈活的表示層所有的數據類型都是範型,範型就會有較高的運行時和內存的代價,所以引入對象池以及內存池來盡可能的緩解這個問題;Table是一個具備schema的存儲表示,可以利用用戶schema和係統schema做大量的本地化的物理計劃執行優化和算子的改寫,我們也同時支持LocalDataSet,用戶可以指定不同的Dataset具備相同的Tag,係統在加載的時候盡量使得相同Tag擺放在相同的內存,我們定義了一些抽象行為來指導係統的優化器,比如Match接口;我們也引入了CBO和RBO的框架來進行Join和大量的Pushdown,將不同的運行方式與用戶的處理剝離出來,使得用戶的處理邏輯隻跟他的功能相關。
那麼,數據庫技術是不是可以和BigData分布式計算技術進行有效的融合和相互借鑒?
數據不拖動,更好的Schema控製,更精細化的索引元數據,本地調度策略,CBO框架以及引入更多的目標客戶,這些目標客戶已經很熟悉SQL的語意和語法。
在代碼優化方麵,從向量優化方麵來說,我們有了schema,可以進行列式存儲,列式存儲很明顯的好處是可以做高效的壓縮;另外,我們可以利用CPU單指定級多數據級的特點,如果向左邊的去運算,每次都會產生一條CPU指令,而優化成右邊,C0和C1是集中存儲,在一次迭代中,一次指令可以把多條數據進行集中運算,相比於左邊,在密集數據計算情況下可以把性能提升4到10倍以上;
大家都知道在舉證運算和圖形渲染方麵,現代CPU的技術分支預測和執行估計本質上運行時會出現比較大的問題,尤其是在switch/case、if/else、for/while這樣的環節,我們的核心要點是將運行時的不確定性變成運行時的確定性,所以,利用了Codgen技術,包括序列化/反序列化、虛函數、(Sql)表達式、DAG執行以及String優化來將運行時的不確定性變成確定性,使得整個CPU更加友好。
業界經典係統技術分析
Spark
spark在寫WordCount函數時,處理代碼非常簡單,讀一行一行的數據是udtf,把它數據按照空格打成一個一個的map,發現key,而Reduce做加法最終存儲。
spark做PageRank,這是spark做內存reuse、數據reuse很經典的案例。每輪迭代的時候,MapReduce都會把這張Link表以及PageRank表寫入HDFS,在下一輪迭代重新加載,這造成了大量的無效的IO開銷和網絡開銷,可以看到Spark將這張Link表catch以後,這張表在後麵的迭代按照partition,一直都在加載內存,而在本輪迭代的Map以後,這個Map操作的Shuffle是與這個關係表的partition是一致的,所以,保證了Map的本地化、Join的本地化,可以看到,關係表先去Join初始化的Link,然後這兩個dataset 的RDD按照partition去Shuffle切齊的,Join都是本地命中,然後,一個URL命中,通過一個flag map由一個list變成多個數據,然後每個數據去把它的新Map向自己的初邊發射出去,而這個Shuffle又會組成新的PageRank的RDD,可以看到底下reduceby就把它所有的入度給它的PageRank更新值,算出一個最終的rank值,然後進行下一輪迭代。
Flink
Flink跟Spark最大不同在於它引入了一個Pipeline的執行框架,當數據量超過了物理內存的
界限的時候,Spark要進行計算,它一定要通過sort merge,它的計算延時是線性成長上去。而Flink類似於分Batch運行,所以,每批次它都可以進行內存操作。
這是一個實測的結果圖,Flink在迭代計算有一些特殊的考量,比如說它的delta計算,大量研究表明,類似於PageRank這樣的計算,百分之三四十的節點是在最初的十輪迭代後麵的值,迭代更新已經不顯著,所以,它們完全可以不參與後階段的迭代計算,這樣就可以
節省大量的Shuffle網絡資源和IO資源,Flink支持用戶編寫非常容易的代碼來進行早停,也就是說,節點在收斂度數不大的情況下進行早停,大量的計算可以避免。在引入delta計算以後,Flink的迭代收斂速度大大提高。
在大規模數據下,stage by stage的模型其實不可避免的超過物理內存的界限,而使整個計算延遲加劇,我們到底采用怎麼樣的技術才能保證可以進行完全的內存計算呢?
對此,我們在這個方麵正在進行大量的嚐試,內存計算和統一計算框架近期發展非常迅速,但遠遠還沒有達到我們期望的階段,尚有大量的技術難點有待我們去攻克,在這個領域仍然有非常多的機會和非常大的空間等待去攻克,和大家分享就到這裏。
謝謝大家!
最後更新:2017-06-06 07:34:05