閱讀547 返回首頁    go 阿裏雲 go 技術社區[雲棲]


《Spark 官方文檔》Spark作業調度

Spark作業調度

概覽

Spark有好幾種計算資源調度的方式。首先,回憶一下集群模式概覽(cluster mode overview)中每個Spark應用(包含一個SparkContext實例)中運行了一些其獨占的執行器(executor)進程。集群管理器提供了Spark應用之間的資源調度(scheduling across applications)。其次,在各個Spark應用內部,各個線程可能並發地通過action算子提交多個Spark作業(job)。如果你的應用服務於網絡請求,那這種情況是很常見的。在Spark應用內部(對應同一個SparkContext)各個作業之間,Spark默認FIFO調度,同時也可以支持公平調度(fair scheduler)。

 

Spark應用之間的資源調度

如果在集群上運行,每個Spark應用都會獲得一批獨占的執行器JVM,來運行其任務並存儲數據。如果有多個用戶共享集群,那麼會有很多資源分配相關的選項,如何設置還取決於具體的集群管理器。

對Spark所支持的各個集群管理器而言,最簡單的資源分配,就是對資源靜態劃分。這種方式就意味著,每個Spark應用都是設定一個最大可用資源總量,並且該應用在整個生命周期內都會占住這些資源。這種方式在Spark獨立部署(standalone)和YARN調度,以及Mesos粗粒度模式(coarse-grained Mesos mode)下都可用。

  • Standalone mode: 默認情況下,Spark應用在獨立部署的集群中都會以FIFO(first-in-first-out)模式順序提交運行,並且每個Spark應用都會占用集群中所有可用節點。不過你可以通過設置spark.cores.max或者spark.deploy.defaultCores 來限製單個應用所占用的節點個數。最後,除了可以控製對CPU的使用數量之外,還可以通過 spark.executor.memory 來控製各個應用的內存占用量。
  • Mesos: 在Mesos中要使用靜態劃分的話,需要將 spark.mesos.coarse 設為true,同樣,你也需要設置 spark.cores.max 來控製各個應用的CPU總數,以及 spark.executor.memory 來控製各個應用的內存占用。
  • YARN: 在YARN中需要使用 –num-executors 選項來控製Spark應用在集群中分配的執行器的個數,對於單個執行器(executor)所占用的資源,可以使用 –executor-memory 和 –executor-cores 來控製。

Mesos上另一種可用的方式是動態共享CPU。在這種模式下,每個Spark應用的內存占用仍然是固定且獨占的(仍由spark.executor.memory決定),但是如果該Spark應用沒有在某個機器上執行任務的話,那麼其他應用可以占用該機器上的CPU。這種模式對集群中有大量不是很活躍應用的場景非常有效,例如:集群中有很多不同用戶的Spark shell session。但這種模式不適用於低延遲的場景,因為當Spark應用需要使用CPU的時候,可能需要等待一段時間才能取得CPU的使用權。要使用這種模式,隻需要在 mesos:// URL 上設置 spark.mesos.coarse 屬性為flase即可。

注意,目前還沒有任何一種資源分配模式能支持跨Spark應用的內存共享。如果你需要跨Spark應用共享內存,我們建議你用單獨用一個server來計算和保留同一個RDD查詢的結果,這樣就能在多個請求(request)之間共享同一個RDD的數據。在未來的發布版本中,一些內存存儲係統(如:Tachyon)或許能夠提供這種跨Spark應用共享RDD的能力。

動態資源分配

Spark還提供了一種基於負載來動態調節Spark應用資源占用的機製。這意味著,你的應用會在資源空閑的時候將其釋放給集群,而後續用到的時候再重新申請。這一特性在多個應用共享Spark集群資源的情況下特別有用。

注意,這個特性默認是禁用的,但是在所有的粗粒度集群管理器上都是可用的,如:獨立部署模式(standalone mode),YARN模式(YARN mode)以及Mesos粗粒度模式(Mesos coarse-grained mode)。

配置和部署

要使用這一特性有兩個前提條件。首先,你的應用必須設置 spark.dynamicAllocation.enabled 為 true。其次,你必須在每個節點上啟動一個外部混洗服務(external shuffle service),並在你的應用中將 spark.shuffle.service.enabled 設為true。外部混洗服務的目的就是為了在刪除執行器的時候,能夠保留其輸出的混洗文件(本文後續有更詳細的描述)。啟用外部混洗的方式在各個集群管理器上各不相同:

在Spark獨立部署的集群中,你隻需要在worker啟動前設置 spark.shuffle.server.enabled 為true即可。

在Mesos粗粒度模式下,你需要在各個節點上運行 ${SPARK_HOME}/sbin/start-mesos-shuffle-service.sh 並設置 spark.shuffle.service.enabled 為true 即可。例如,你可以用Marathon來啟用這一功能。

在YARN模式下,混洗服務需要按以下步驟在各個NodeManager上啟動:

  1. 首先按照YARN profile 構建Spark。如果你已經有打好包的Spark,可以忽略這一步。
  2. 找到 spark-<version>-yarn-shuffle.jar。如果你是自定義編譯,其位置應該在 ${SPARK_HOME}/network/yarn/target/scala-<version>,否則應該可以在 lib 目錄下找到這個jar包。
  3. 將該jar包添加到NodeManager的classpath路徑中。
  4. 配置各個節點上的yarn-site.xml,將 spark_shuffle 添加到 yarn.nodemanager.aux-services 中,然後將 yarn.nodemanager.aux-services.spark_shuffle.class 設為 org.apache.spark.network.yarn.YarnShuffleService,並將 spark.shuffle.service.enabled 設為 true。
  5. 最後重啟各節點上的NodeManager。

所有相關的配置都是可選的,並且都在 spark.dynamicAllocation.* 和 spark.shuffle.service.* 命名空間下。更詳細請參考:configurations page

資源分配策略

總體上來說,Spark應該在執行器空閑時將其關閉,而在後續要用時再次申請。因為沒有一個固定的方法,可以預測一個執行器在後續是否馬上回被分配去執行任務,或者一個新分配的執行器實際上是空閑的,所以我們需要一些試探性的方法,來決定是否申請或移除一個執行器。

請求策略

一個啟用了動態分配的Spark應用會在有等待任務需要調度的時候,申請額外的執行器。這種情況下,必定意味著已有的執行器已經不足以同時執行所有未完成的任務。

Spark會分輪次來申請執行器。實際的資源申請,會在任務掛起 spark.dynamicAllocation.schedulerBacklogTimeout 秒後首次觸發,其後如果等待隊列中仍有掛起的任務,則每過 spark.dynamicAlloction.sustainedSchedulerBacklogTimeout 秒觸發一次資源申請。另外,每一輪所申請的執行器個數以指數形式增長。例如,一個Spark應用可能在首輪申請1個執行器,後續的輪次申請個數可能是2個、4個、8個… … 。

采用指數級增長策略的原因有兩個:第一,對於任何一個Spark應用如果隻是需要多申請少數幾個執行器的話,那麼必須非常謹慎地啟動資源申請,這和TCP慢啟動有些類似;第二,如果一旦Spark應用確實需要申請很多個執行器的話,那麼可以確保其所需的計算資源及時地增長。

移除策略

移除執行器的策略就簡單多了。Spark應用會在某個執行器空閑超過 spark.dynamicAllocation.executorIdleTimeout 秒後將其刪除。在絕大多數情況下,執行器的移除條件和申請條件都是互斥的,也就是說,執行器在有待執行任務掛起時,不應該空閑。

優雅地關閉執行器

非動態分配模式下,執行器可能的退出原因有執行失敗或者相關Spark應用已經退出。不管是那種原因,執行器的所有狀態都已經不再需要,可以丟棄掉。但在動態分配的情形下,執行器有可能在Spark應用運行期間被移除。這時候,如果Spark應用嚐試去訪問該執行器存儲的狀態,就必須重算這一部分數據。因此,Spark需要一種機製,能夠優雅地關閉執行器,同時還保留其狀態數據。

這種需求對於混洗操作尤其重要。混洗過程中,Spark執行器首先將map輸出寫到本地磁盤,同時執行器本身又是一個文件服務器,這樣其他執行器就能夠通過該執行器獲得對應的map結果數據。一旦有某些任務執行時間過長,動態分配有可能在混洗結束前移除任務異常的執行器,而這些被移除的執行器對應的數據將會被重新計算,但這些重算其實是不必要的。

要解決這一問題,就需要用到一個外部混洗服務(external shuffle service),該服務在Spark 1.2引入。該服務在每個節點上都會啟動一個不依賴於任何Spark應用或執行器的獨立進程。一旦該服務啟用,Spark執行器不再從各個執行器上獲取shuffle文件,轉而從這個service獲取。這意味著,任何執行器輸出的混洗狀態數據都可能存留時間比對應的執行器進程還長。

除了混洗文件之外,執行器也會在磁盤或者內存中緩存數。一旦執行器被移除,其緩存數據將無法訪問。這個問題目前還沒有解決。或許在未來的版本中,可能會采用外部混洗服務類似的方法,將緩存數據保存在堆外存儲中以解決這一問題。

Spark應用內部的資源調度

在指定的Spark應用內部(對應同一SparkContext實例),多個線程可能並發地提交Spark作業(job)。在本節中,作業(job)是指,由Spark action算子(如:collect)觸發的一係列計算任務的集合。Spark調度器是完全線程安全的,並且能夠支持Spark應用同時處理多個請求(比如:來自不同用戶的查詢)。

默認,Spark應用內部使用FIFO調度策略。每個作業被劃分為多個階段(stage)(例如:map階段和reduce階段),第一個作業在其啟動後會優先獲取所有的可用資源,然後是第二個作業再申請,再第三個……。如果前麵的作業沒有把集群資源占滿,則後續的作業可以立即啟動運行,否則,後提交的作業會有明顯的延遲等待。

不過從Spark 0.8開始,Spark也能支持各個作業間的公平(Fair)調度。公平調度時,Spark以輪詢的方式給每個作業分配資源,因此所有的作業獲得的資源大體上是平均分配。這意味著,即使有大作業在運行,小的作業再提交也能立即獲得計算資源而不是等待前麵的作業結束,大大減少了延遲時間。這種模式特別適合於多用戶配置。

要啟用公平調度器,隻需設置一下 SparkContext中spark.scheduler.mode 屬性為 FAIR即可:

val conf = new SparkConf().setMaster(...).setAppName(...)
conf.set("spark.scheduler.mode", "FAIR")
val sc = new SparkContext(conf)

公平調度資源池

公平調度器還可以支持將作業分組放入資源池(pool),然後給每個資源池配置不同的選項(如:權重)。這樣你就可以給一些比較重要的作業創建一個“高優先級”資源池,或者你也可以把每個用戶的作業分到一組,這樣一來就是各個用戶平均分享集群資源,而不是各個作業平分集群資源。Spark公平調度的實現方式基本都是模仿 Hadoop Fair Scheduler 來實現的。

默認情況下,新提交的作業都會進入到默認資源池中,不過作業對應於哪個資源池,可以在提交作業的線程中用SparkContext.setLocalProperty 設定 spark.scheduler.pool 屬性。示例代碼如下:

// Assuming sc is your SparkContext variable
sc.setLocalProperty("spark.scheduler.pool", "pool1")

一旦設好了局部屬性,所有該線程所提交的作業(即:在該線程中調用action算子,如:RDD.save/count/collect 等)都會使用這個資源池。這個設置是以線程為單位保存的,你很容易實現用同一線程來提交同一用戶的所有作業到同一個資源池中。同樣,如果需要清除資源池設置,隻需在對應線程中調用如下代碼:

sc.setLocalProperty("spark.scheduler.pool", null)

資源池默認行為

默認地,各個資源池之間平分整個集群的資源(包括default資源池),但在資源池內部,默認情況下,作業是FIFO順序執行的。舉例來說,如果你為每個用戶創建了一個資源池,那麼久意味著各個用戶之間共享整個集群的資源,但每個用戶自己提交的作業是按順序執行的,而不會出現後提交的作業搶占前麵作業的資源。

配置資源池屬性

資源池的屬性需要通過配置文件來指定。每個資源池都支持以下3個屬性:

  • schedulingMode:可以是FIFO或FAIR,控製資源池內部的作業是如何調度的。
  • weight:控製資源池相對其他資源池,可以分配到資源的比例。默認所有資源池的weight都是1。如果你將某個資源池的weight設為2,那麼該資源池中的資源將是其他池子的2倍。如果將weight設得很高,如1000,可以實現資源池之間的調度優先級 – 也就是說,weight=1000的資源池總能立即啟動其對應的作業。
  • minShare:除了整體weight之外,每個資源池還能指定一個最小資源分配值(CPU個數),管理員可能會需要這個設置。公平調度器總是會嚐試優先滿足所有活躍(active)資源池的最小資源分配值,然後再根據各個池子的weight來分配剩下的資源。因此,minShare屬性能夠確保每個資源池都能至少獲得一定量的集群資源。minShare的默認值是0。

資源池屬性是一個XML文件,可以基於 conf/fairscheduler.xml.template 修改,然後在 SparkConf 的 spark.scheduler.allocation.file 屬性指定文件路徑:

conf.set("spark.scheduler.allocation.file", "/path/to/file")

資源池XML配置文件格式如下,其中每個池子對應一個<pool>元素,每個資源池可以有其獨立的配置:

<?xml version="1.0"?>
<allocations>
  <pool name="production">
    <schedulingMode>FAIR</schedulingMode>
    <weight>1</weight>
    <minShare>2</minShare>
  </pool>
  <pool name="test">
    <schedulingMode>FIFO</schedulingMode>
    <weight>2</weight>
    <minShare>3</minShare>
  </pool>
</allocations>

完整的例子可以參考 conf/fairscheduler.xml.template。注意,沒有在配置文件中配置的資源池都會使用默認配置(schedulingMode:FIFO,weight:1,minShare:0)。

 轉載自 並發編程網 - ifeve.com

最後更新:2017-05-19 16:02:01

  上一篇:go  《用戶至上:用戶研究方法與實踐》研究之前:先理解目標用戶
  下一篇:go  《用戶至上:用戶研究方法與實踐》用戶體驗入門