閱讀406 返回首頁    go 阿裏雲 go 技術社區[雲棲]


Spark on Angel:Spark機器學習的核心加速器

Spark的核心概念是RDD,而RDD的關鍵特性之一是其不可變性,來規避分布式環境下複雜的各種並行問題。這個抽象,在數據分析的領域是沒有問題的,它能最大化的解決分布式問題,簡化各種算子的複雜度,並提供高性能的分布式數據處理運算能力。

然而在機器學習領域,RDD的弱點很快也暴露了。機器學習的核心是迭代和參數更新。RDD憑借著邏輯上不落地的內存計算特性,可以很好的解決迭代的問題,然而RDD的不可變性,卻非常不適合參數反複多次更新的需求。這本質上的不匹配性,導致了Spark的MLlib庫,發展一直非常緩慢,從2015年開始就沒有實質性的創新,性能也不好。

為此,Angel在設計生態圈的時候,優先考慮了Spark。在V1.0.0推出的時候,就已經具備了Spark on Angel的功能,基於Angel為Spark加上了PS功能,在不變中加入了變化的因素,可謂如虎添翼。

我們將以L-BFGS為例,來分析Spark在機器學習算法的實現上的問題,以及Spark on Angel是如何解決Spark在機器學習任務中的遇到的瓶頸,讓Spark的機器學習更加強大。

1. L-BFGS算法說明

L-BFGS模型參數更新過程如下:

20170821111213867.png

計算pk = Hk-1 gk 偽代碼如下所示,這是人們常說的two-loop recursion算法,是Limited-BFGS算法的核心部分。 
返回值 r 是我們說要的pk。

20170821111213735.png

其中,H0-1 是單位陣,yk=gk-gk-1, sk=wk-w k-1k-1,L-BFGS算法將最近 m 輪生成的 yk 和 sk 序列,記做 {yk} 和 {sk}。基於計算 {yk} 和 {sk} 計算 pk 。

2.L-BFGS的Spark實現

2.1 實現框架

Spark中的driver負責協調整個Spark任務執行的同時,需要保存最近 m 輪的 {yk} 和 {sk} 序列,並在driver上執行two-loop recursion算法。而executor負責分布式地計算梯度向量。

20170821111213755.png

迭代過程: 
(1)每輪迭代,將每個executor計算的梯度Aggregate到driver 
(2)yk 和 sk 保存在driver上,在driver端執行two-loop recursion算法 
(3)driver上更新模型 w,並將 w 廣播到每個Executor

2.2 性能分析

基於Spark的L-BFGS實現的算法優點比較明顯:

HDFS I/O   
Spark可以快速讀寫HDFS上的訓練數據;

細粒度的負載均衡   
並行計算梯度時,Spark具有強大的並行調度機製,保證task快速執行;

容錯機製   
當計算節點掛掉、任務失敗,Spark會根據RDD的DAG關係鏈實現數據的重計算。但是對於迭代式算法,每輪迭代要用RDD的action操作,打斷RDD的DAG,避免因為重計算引起邏輯的錯亂;

基於內存的計算   
基於內存的計算過程,可以加速機器學習算法中計算梯度過程的耗時。

該實現的缺點:

treeAggregate引起的網絡瓶頸   
Spark用treeAggregate聚合梯度時,如果模型維度達到億級,每個梯度向量都可能達到幾百兆;此時treeAggregate的shuffle的效率非常低;

driver單點

保存{yk}和{sk}序列需要較大的內存空間; two-loop recursion算法是由driver單點執行,該過程是多個高維度的向量的運算; 每輪迭代,driver都需要和executor完成高維度向量的aggregate和broadcast。

3.L-BFGS的Spark on Angel實現

3.1 實現框架

Spark on Angel借助Angel PS-Service的功能為Spark引入PS的角色,減輕整個算法流程對driver的依賴。two-loop recursion算法的運算交給PS,而driver隻負責任務的調度,大大減輕的對driver性能的依賴。

Angel PS由一組分布式節點組成,每個vector、matrix被切分成多個partition保存到不同的節點上,同時支持vector和matrix之間的運算;

{yk} 和 {sk} 序列分布式地保存到Angel PS上,two-loop recursion算法中高維度的向量計算也是在PS上完成。Spark executor每輪迭代過程會從PS上Pull w 到本地,並將計算的梯度向量Push到PS。

20170821111213779.png

  迭代過程:

(1)每輪迭代,executor 將PS上的模型 w pull 到本地,計算梯度,然後梯度向量push給PS

(2)yk 和 sk 保存在PS上,在PS端執行two-loop recursion算法

(3)PS上更新模型 w

3.2 性能分析

整個算法過程,driver隻負責任務調度,而複雜的two-loop recursion運算在PS上運行,梯度的Aggregate和模型的同步是executor和PS之間進行,所有運算都變成分布式。在網絡傳輸中,高維度的PSVector會被切成小的數據塊再發送到目標節點,這種節點之間多對多的傳輸大大提高了梯度聚合和模型同步的速度。

這樣Spark on Angel完全避開了Spark中driver單點的瓶頸,以及網絡傳輸高維度向量的問題。

4.“輕易強快”的Spark on Angel

Spark on Angel是Angel為解決Spark在機器學習模型訓練中的缺陷而設計的“插件”,沒有對Spark做"侵入式"的修改,是一個獨立的框架。可以用 “”、“”、“”、“” 來概括Spark on Angel的特點。

4.1 輕——"插件式"的框架

Spark on Angel是Angel為解決Spark在機器學習模型訓練中的缺陷而設計的“插件”。Spark on Angel沒有對Spark中的RDD做侵入式的修改,Spark on Angel是依賴於Spark和Angel的框架,同時其邏輯又獨立於Spark和Angel。

因此,Spark用戶使用Spark on Angel非常簡單,隻需在Spark的提交腳本裏做三處改動即可,詳情可見Angel的Github Spark on Angel Quick Start文檔。

可以看到提交的Spark on Angel任務,其本質上依然是一個Spark任務,整個任務的執行過程與Spark一樣的。

source ${Angel_HOME}/bin/spark-on-angel-env.sh$SPARK_HOME/bin/spark-submit --master yarn-cluster --conf spark.ps.jars=$SONA_ANGEL_JARS --conf spark.ps.instances=20 --conf spark.ps.cores=4 --conf spark.ps.memory=10g --jars $SONA_SPARK_JARS ....

Spark on Angel能夠成為如此輕量級的框架,得益於Angel對PS-Service的封裝,使Spark的driver和executor可以通過PsAgent、PSClient與Angel PS做數據交互。

20170821111213205.jpg

  4.2 強——功能強大,支持breeze庫

breeze庫是scala實現的麵向機器學習的數值運算庫。Spark MLlib的大部分數值優化算法都是通過調用breeze來完成的。如下所示,Spark和Spark on Angel兩種實現都是通過調用breeze.optimize.LBFGS實現的。Spark的實現是傳入的類型是breeze庫的DenseVector,而Spark on Angel的實現是傳入BreezePSVector。

BreezePSVector是指Angel PS上的Vector,該Vector實現了breeze NumericOps下的方法,如常用的 dot,scale,axpy,add等運算,因此在LBFGS[BreezePSVector] two-loop recursion算法中的高維度向量運算是BreezePSVector之間的運算,而BreezePSVector之間全部在Angel PS上分布式完成。

Spark的L-BFGS實現import breeze.optimize.LBFGS val lbfgs = new LBFGS[DenseVector](maxIter, m, tol) val states = lbfgs.iterations(Cost(trainData), initWeight)
 
Spark on Angel的L-BFGS實現  
接口調用裏的Vector泛型從DenseVector變成 BreezePSVector import breeze.optimize.LBFGS val lbfgs = new LBFGS[BreezePSVector](maxIter, m, tol) val states = lbfgs.iterations(PSCost(trainData), initWeightPS)
 

4.3 易——編程接口簡單

Spark能夠在大數據領域這麼流行的另外一個原因是:其編程方式簡單、容易理解,Spark on Angel同樣繼承了這個特性。

Spark on Angel本質是一個Spark任務,整個代碼實現邏輯跟Spark是一致的;當需要與PSVector做運算時,調用相應的接口即可。

如下代碼所示,LBFGS在Spark和Spark on Angel上的實現,二者代碼的整體思路是一樣的,主要的區別是梯度向量的Aggregate和模型 的pull/push。 
因此,如果將Spark的算法改造成Spark on Angel的任務,隻需要修改少量的代碼即可。

L-BFGS需要用戶實現DiffFunction,DiffFunction的calculte接口輸入參數是 ,遍曆訓練數據並返回 loss 和 gradient。

其完整代碼,請前往Github SparseLogistic。

Spark的DiffFunction實現 case class Cost(trainData: RDD[Instance]) extends DiffFunction[DenseVector] { def calculate(w: DenseVector): (Double, DenseVector) = { // 廣播 w val bcW = sc.broadcast(w) // 通過treeAggregate的方式計算loss和gradient val (cumGradient, cumLoss) = trainData .treeAggregate((new DenseVector(x.length), 0.0)) (seqOp, combOp) val resGradient = new DenseVector(cumGradient.toArray.map(_ / sampleNum)) (cumLoss / sampleNum, resGradient) }
 
Spark on Angel的DiffFunction實現

calculate接口輸入參數是 w ,遍曆訓練數據並返回 loss 和 cumGradient。其中 w 和 cumGradient都是BreezePSVector;計算梯度時,需要將 Pull 到本地,本地的gradient值,需要通過PSVector的incrementAndFlush方式Push到遠程PS上的cumGradient向量。

case class PSCost(trainData: RDD[Instance]) extends DiffFunction[BreezePSVector] { override def calculate(w: BreezePSVector): (Double, BreezePSVector) = { // 初始化gradient向量:cumGradient val cumGradient = pool.createZero().mkBreeze() // 計算梯度和loss val cumLoss = trainData.mapPartitions { iter => // pull模型 w 到 executor 本地 val localW = w.toRemote.pull() val (gradient, loss) = calculateGradAndLoss(iter, localW) // incement本地的grad到PS的cumGradient cumGradient.toRemote.incrementAndFlush(gradient) Iterator.single(loss) }.sum() cumGradient *= 1.0 / sampleNum (cumLoss / sampleNum, cumGradient) } }

4.4 快——性能強勁

我們分別實現了SGD、LBFGS、OWLQN三種優化方法的LR,並在Spark和Spark on Angel上做了實驗對比。 
該實驗代碼請前往Github SparseLRWithX.scala。

數據集:騰訊內部某業務的一份數據集,2.3億樣本,5千萬維度 實驗設置:  
說明1:三組對比實驗的資源配置如下,我們盡可能保證所有任務在資源充足的情況下執行,因此配置的資源比實際需要的偏多;  
說明2:執行Spark任務時,需要加大spark.driver.maxResultSize參數;而Spark on Angel就不用配置此參數。

20170821111213466.png

如上數據所示,Spark on Angel相較於Spark在訓練LR模型時有50%以上的加速;對於越複雜的模型,其加速的比例越大。

5.結語

Spark on Angel的出現可以高效、低成本地克服Spark在機器學習領域遇到的瓶頸;我們將繼續優化Spark on Angel,並提高其性能。也歡迎大家在Github上一起參與我們的改進。


本文轉自d1net(轉載)

最後更新:2017-08-21 17:03:09

  上一篇:go  物聯網將如何改變大數據分析
  下一篇:go  PHP Notice: Undefined index錯誤提示的解決辦法