閱讀834 返回首頁    go 人物


Spark源碼閱讀筆記一——part of core

內部accumulator通過心跳報告給driver
task運行時可以累加accumulator,但是不能讀取value,value隻能在driver獲取
spark內部用一個weakhashmap保存accumulator,便於gc的清理

CacheManager
spark的類用於負責傳遞RDD的分區內容給BlockManager,並保證一個節點不會載入一個rdd的兩份拷貝,這個通過一個hashset實現,已載入的rdd會將id保存到set中
獲取和計算rdd時,先判斷是否已經計算,如果沒有再從blockmanager獲取block然後計算結果。
除非是本地模式,不然rdd的計算結果都會緩存
如果rdd不需要在內存中緩存,則直接將計算結果通過iterator直接傳給blockmanager
在rdd需要緩存時,我們必須小心不能在內存中一次性展開全部的partition,否則如果jvm沒有足夠的空間給這個單個的partition可能會引發OOM異常。
取而代之的是,我們展開這些value,小心的、可能的放棄並丟掉這個partition到磁盤如果合適。
如果空間足夠就全部緩存到內存中,否則如果使用磁盤就放到磁盤,不然直接就返回value

Dependency
NarrowDependency:一個子partition依賴於多個父partition
ShuffleDependency:shuffle stage的輸出依賴,在shuffle中,rdd是短暫的因為我們在executor端不需要它

ExecutorAllocationClient
與cluster manager請求或殺掉executor的客戶端
根據我們的調度需要更新集群,依賴於三個信息
1 executor的數量,我們需要的全部的executor數,cluster manager不能殺掉任何運行中的executor來達到這個數量,這是我們想要分配的executor數量
2 所有要運行的stage中有本地偏好的task數量,包括運行等待和完成的task
3 task到運行host的map

ExecutorAllocationManager(EAM)
一個代理,根據工作負載動態的分配和移除executor
EAM維護一個移動的目標executor數量,定期的同步到cluster manager。target的數量從配置的一個初始值開始,並根據等待和運行task數變化
在當前的target數量多於需要控製的當前的負載時,會減少target數量。target總是會一次性減到可以運行所以當前運行和等待task的數量
當需要響應積壓的等待需要調度的task時,會增加target的數量。如果一個隊列在N秒內沒有排空,則新的executor被加入。如果這個隊列仍然在另外的M秒內存在,則更多的executor會被加入。增加的數量在每輪以上一輪的指數級增加,直到達到上限。上限是基於一個配置的屬性和當前運行和等待任務的數量。
指數增長有雙重理由。
1 executor應該在開始緩慢的增加,以防萬一額外需要的executor數量很小。否則我們增加了多於我們需要的executor數量則我們需要在後麵移除他們。
2 executor的數量需要快速增加,以防萬一executor的數量最大值非常高,否則在繁重的工作負載下性能提升需要很長時間。
executor移除的策略很簡單,如果一個executor已經空閑了K秒,意味著它沒有被調度用於執行任何task,因而移除它。
這裏沒有重試的邏輯,因為我們假定cluster manager最終會異步的執行所有它收到的請求。
相關的spark屬性如下

成員變量initializing,是否需要一直等待初始化的executor集合被分配,當這個變量為true的時候,我們不會取消未執行的executor請求。這個在下麵兩種情況會被設置成false
1 一個stage被提交
2 一個executor的空閑時間超時
用於增加減少executor的調度任務是一個定時任務,每100毫秒執行一次
調度方法上,首先基於添加時間和我們當前的需要調整我們請求的executor,然後如果一個已存在的executor已經過期了,則殺掉。

updateAndSyncNumExecutorsTarget:更新target數量並同步結果到cluster manager。檢查我們已存在的分配和之前的請求超過我們現在的需要。如果滿足,truncate target數量並讓cluster manager知道以便於它可以取消不需要的等待的請求。如果不滿足,並且添加的時間超時,看看我們是否能請求新的executor,並刷新添加時間。

當一個executor(程它為executor X)因為到達了下限而沒有被刪除,則它不會再被標記位空閑。當有新的executor加入,我們不再在最低下限,則我們必須再次標記executor X為空閑,以使我們不會忘記它是一個被移除的候選。
當scheduler的隊列是空的時候,就會將addtime設為未設置
所有cache的block會被報告給driver,但不包括廣播的block
當executor執行任務了(busy),就會清除它的idle time
private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, (Int, Map[String, Int])]
stageid到tuple的map,tuple是節點和將會在這個節點上運行的task的數量
taskstart和blockmanageradded這些事件是在不同的線程執行的,因而順序不一定,taskstart事件中將對應的executor置為busy
taskend,如果executor不再運行任何調度的任務,則標記為idle
如果task失敗,則會將scheduler置為積壓任務的狀態,將這個task從這個stage對應的task列表中移除

最後更新:2017-07-14 20:32:36

  上一篇:go  Android 連麥Demo App 接入指南
  下一篇:go  阿裏雲第三期榮譽榜單公布,居然沒有我的名字?