閱讀497 返回首頁    go 阿裏雲 go 技術社區[雲棲]


《循序漸進學Spark 》Spark 編程模型

本節書摘來自華章出版社《循序漸進學Spark 》一書中的第1章,第3節,作者 小象學院 楊 磊,更多章節內容可以訪問雲棲社區“華章計算機”公眾號查看。


Spark機製原理

本書前麵幾章分別介紹了Spark的生態係統、Spark運行模式及Spark的核心概念RDD和基本算子操作等重要基礎知識。本章重點講解Spark的主要機製原理,因為這是Spark程序得以高效執行的核心。本章先從Application、job、stage和task等層次闡述Spark的調度邏輯,並且介紹FIFO、FAIR等經典算法,然後對Spark的重要組成模塊:I/O與通信控製模塊、容錯模塊及Shuffle模塊做了深入的闡述。其中,在Spark I/O模塊中,數據以數據塊的形式管理,存儲在內存、磁盤或者Spark集群中的其他機器上。Spark集群通信機製采用了AKKA通信框架,在集群機器中傳遞命令和狀態信息。另外,容錯是分布式係統的一個重要特性,Spark采用了lineage與checkpoint機製來保證容錯性。Spark Shuffle模塊借鑒了MapReduce的Shuffle機製,但在其基礎上進行了改進與創新。

3.1 Spark應用執行機製分析

下麵對Spark Application的基本概念和執行機製進行深入介紹。

3.1.1 Spark應用的基本概念

Spark應用(Application)是用戶提交的應用程序。Spark運行模式分為:Local、Standalone、YARN、Mesos等。根據Spark Application的Driver Program是否在集群中運行,Spark應用的運行方式又可以分為Cluster模式和Client模式。

下麵介紹Spark應用涉及的一些基本概念:

1) SparkContext:Spark 應用程序的入口,負責調度各個運算資源,協調各個Worker Node 上的Executor。

2) Driver Program:運行Application的main()函數並創建SparkContext。

3) RDD:前麵已經講過,RDD是Spark的核心數據結構,可以通過一係列算子進行操作。當RDD遇到Action算子時,將之前的所有算子形成一個有向無環圖(DAG)。再在Spark中轉化為Job(Job的概念在後麵講述),提交到集群執行。一個App中可以包含多個Job。

4) Worker Node:集群中任何可以運行Application代碼的節點,運行一個或多個Executor進程。

5) Executor:為Application運行在Worker Node上的一個進程,該進程負責運行Task,並且負責將數據存在內存或者磁盤上。每個Application都會申請各自的Executor來處理任務。

下麵介紹Spark 應用(Application)執行過程中各個組件的概念:

1) Task(任務):RDD中的一個分區對應一個Task,Task是單個分區上最小的處理流程單元。

2) TaskSet(任務集): 一組關聯的,但相互之間沒有Shuffle依賴關係的Task集合。

3) Stage(調度階段):一個TaskSet對應的調度階段。每個Job會根據RDD的寬依賴關係被切分很多Stage,每個Stage都包含一個TaskSet。

4) Job(作業): 由Action算子觸發生成的由一個或多個Stage組成的計算作業。

5) Application:用戶編寫的Spark的應用程序,由一個或多個Job組成。提交到Spark之後,Spark為Application分配資源,將程序轉換並執行。

6) DAGScheduler:根據Job構建基於Stage的DAG,並提交Stage給TaskScheduler。

7) TaskScheduler:將Taskset提交給Worker Node集群運行並返回結果。

以上基本概念之間的關係如圖3-1所示。

3.1.2 Spark應用執行機製概要

Spark Application從提交後到在Worker Node執行,期間經曆了一係列變換,具體過程如圖3-2所示。

 

圖3-1 Spark基本概念之間的關係

 

圖3-2 Spark 執行流程

如圖3-2所示,前麵講過,當RDD遇見Action算子之後,觸發Job提交。提交後的Job在Spark中形成了RDD DAG有向無環圖(Directed Acyclic Graph)。RDD DAG經過DAG Scheduler調度之後,根據RDD依賴關係被切分為一係列的Stage。每個Stage包含一組task集合,再經過Task Scheduler之後,task被分配到Worker節點上的Executor線程池執行。如前文所述,RDD中的每一個邏輯分區對應一個物理的數據塊,同時每個分區對應一個Task,因此Task也有自己對應的物理數據塊,使用用戶定義的函數來處理。Spark出於節約內存的考慮,采用了延遲執行的策略,如前文所述,隻有Action算子才可以觸發整個操作序列的執行。另外,Spark對於中間計算結果也不會重新分配內存,而是在同一個數據塊上流水線操作。

Spark使用BlockManager管理數據塊,在內存或者磁盤進行存儲,如果數據不在本節點,則還可以通過遠端節點複製到本機進行計算。在計算時,Spark會在具體執行計算的Worker節點的Executor中創建線程池,Executor將需要執行的任務通過線程池來並發執行。

3.1.3 應用提交與執行

Spark使用Driver進程負責應用的解析、切分Stage並調度Task到Executor執行,包含DAGScheduler等重要對象。Driver進程的運行地點有如下兩種:

1) Driver進程運行在Client端,對應用進行管理監控。

2) Master節點指定某個Worker節點啟動Driver進程,負責監控整個應用的執行。

針對這兩種情況,應用提交及執行過程分別如下:

1. Driver運行在Client

用戶啟動Client端,在Client端啟動Driver進程。在Driver中啟動或實例化DAGS-

cheduler等組件。

1)Driver向Master注冊。

2)Worker向Master注冊,Master通過指令讓Worker啟動Executor。

3)Worker通過創建ExecutorRunner線程,進而ExecutorRunner線程啟動Executor-Backend進程。

4)ExecutorBackend啟動後,向Client端Driver進程內的SchedulerBackend注冊,因此Driver進程就可以發現計算資源。

5)Driver的DAGScheduler解析應用中的RDD DAG並生成相應的Stage,每個Stage包含的TaskSet通過TaskScheduler分配給Executor。在Executor內部啟動線程池並行化執行Task。

2. Driver運行在Worker節點

用戶啟動客戶端,客戶端提交應用程序給Master。

1)Master調度應用,指定一個Worker節點啟動Driver,即Scheduler-Backend。

2)Worker接收到Master命令後創建DriverRunner線程,在DriverRunner線程內創建SchedulerBackend進程。Driver充當整個作業的主控進程。

3)Master指定其他Worker節點啟動Exeuctor,此處流程和上麵相似,Worker創建ExecutorRunner線程,啟動ExecutorBackend進程。

4)ExecutorBackend啟動後,向Driver的SchedulerBackend注冊,這樣Driver獲取了計算資源就可以調度和將任務分發到計算節點執行。

SchedulerBackend進程中包含DAGScheduler,它會根據RDD的DAG切分Stage,生成TaskSet,並調度和分發Task到Executor。對於每個Stage的TaskSet,都會被存放到TaskScheduler中。TaskScheduler將任務分發到Executor,執行多線程並行任務。

圖3-3為Spark應用的提交與執行示意圖。

 

圖3-3 Spark應用的提交與執行

3.2 Spark調度機製

Spark調度機製是保證Spark應用高效執行的關鍵。本節從Application、job、stage和task的維度,從上層到底層來一步一步揭示Spark的調度策略。

3.2.1 Application的調度

Spark中,每個Application對應一個SparkContext。SparkContext之間的調度關係取決於Spark的運行模式。對Standalone模式而言,Spark Master節點先計算集群內的計算資源能否滿足等待隊列中的應用對內存和CPU資源的需求,如果可以,則Master創建Spark Driver,啟動應用的執行。宏觀上來講,這種對應用的調度類似於FIFO策略。在Mesos和YARN模式下,底層的資源調度係統的調度策略都是由Mesos和YARN決定的。具體分類描述如下:

1. Standalone模式

默認以用戶提交Application的順序來調度,即FIFO策略。每個應用執行時獨占所有資源。如果有多個用戶要共享集群資源,則可以使用參數spark.cores.max來配置應用在集群中可以使用的最大CPU核數。如果不配置,則采用默認參數spark.deploy.defaultCore的值來確定。

2. Mesos模式

如果在Mesos上運行Spark,用戶想要靜態配置資源的話,可以設置spark.mesos.coarse為true,這樣Mesos變為粗粒度調度模式,然後可以設置spark.cores.max指定集群中可以使用的最大核數,與上麵的Standalone模式類似。同時,在Mesos模式下,用戶還可以設置參數spark.executor.memory來配置每個executor的內存使用量。如果想使Mesos在細粒度模式下運行,可以通過mesos://<url-info>設置動態共享cpu core的執行模式。在這種模式下,應用不執行時的空閑CPU資源得以被其他用戶使用,提升了CPU使用率。

3. YARN模式

如果在YARN上運行Spark,用戶可以在YARN的客戶端上設置--num-executors 來控製為應用分配的Executor數量,然後設置--executor-memory指定每個Executor的內存大小,設置--executor-cores指定Executor占用的CPU核數。

3.2.2 job的調度

前麵章節提到過,Spark應用程序實際上是一係列對RDD的操作,這些操作直至遇見Action算子,才觸發Job的提交。事實上,在底層實現中,Action算子最後調用了runJob函數提交Job給Spark。其他的操作隻是生成對應的RDD關係鏈。如在RDD.scala程序文件中,count函數源碼所示。

def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

其中sc為SparkContext的對象。可見在Spark中,對Job的提交都是在Action算子中隱式完成的,並不需要用戶顯式地提交作業。在SparkContext中Job提交的實現中,最後會調用DAGScheduler中的Job提交接口。DAGScheduler最重要的任務之一就是計算Job與Task的依賴關係,製定調度邏輯。

Job調度的基本工作流程如圖3-4所示,每個Job從提交到完成,都要經曆一係列步驟,拆分成以Tsk為最小單位,按照一定邏輯依賴關係的執行序列。

 

圖3-4 Job的調度流程

圖3-5則從Job調度流程中的細節模塊出發,揭示了工作流程與對應模塊之間的關係。從整體上描述了各個類在Job調度流程中的交互關係。

 

圖3-5 Job調度流程細節

在Spark1.5.0的調度目錄下的SchedulingAlgorithm.scala文件中,描述了Spark對Job的調度模式。

1. FIFO模式

默認情況下,Spark對Job以FIFO(先進先出)的模式進行調度。在SchedulingAlgorithm.scala文件中聲明了FIFO算法實現。

private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {

  override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {

    //定義優先級

    val priority1 = s1.priority

    val priority2 = s2.priority

    var res = math.signum(priority1 - priority2)

    if (res == 0) {

      val stageId1 = s1.stageId

      val stageId2 = s2.stageId

      //signum是符號函數,返回0(參數等於0)、1(參數大於0)或-1(參數小於0)。

      res = math.signum(stageId1 - stageId2)

    }

    if (res < 0) {

      true

    } else {

      false

    }

  }

}

2. FAIR模式

Spark在FAIR的模式下,采用輪詢的方式為多個Job分配資源,調度Job。所有的任務優先級大致相同,共享集群計算資源。具體實現代碼在SchedulingAlgorithm.scala文件中,聲明如下:

private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {

  override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {

    val minShare1 = s1.minShare

    val minShare2 = s2.minShare

    val runningTasks1 = s1.runningTasks

    val runningTasks2 = s2.runningTasks

    val s1Needy = runningTasks1 < minShare1

    val s2Needy = runningTasks2 < minShare2

    val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0).toDouble

    val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0).toDouble

    val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble

    val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble

    var compare: Int = 0

 

    if (s1Needy && !s2Needy) {

      return true

    } else if (!s1Needy && s2Needy) {

      return false

    } else if (s1Needy && s2Needy) {

      compare = minShareRatio1.compareTo(minShareRatio2)

    } else {

      compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)

    }

 

    if (compare < 0) {

      true

    } else if (compare > 0) {

      false

    } else {

      s1.name < s2.name

    }

  }

}

3. 配置調度池

DAGScheduler構建了具有依賴關係的任務集。TaskScheduler負責提供任務給Task-SetManager作為調度的先決條件。TaskSetManager負責具體任務集內部的調度任務。調度池(pool)則用於調度每個SparkContext運行時並存的多個互相獨立無依賴關係的任務集。調度池負責管理下一級的調度池和TaskSetManager對象。

用戶可以通過配置文件定義調度池的屬性。一般調度池支持如下3個參數:

1)調度模式Scheduling mode:用戶可以設置FIFO或者FAIR調度方式。

2)weight:調度池的權重,在獲取集群資源上權重高的可以獲取多個資源。

3)miniShare:代表計算資源中的CPU核數。

用戶可以通過conf/fairscheduler.xml配置調度池的屬性,同時要在SparkConf對象中配置屬性。

3.2.3 stage(調度階段)和TasksetManager的調度

1. Stage劃分

當一個Job被提交後,DAGScheduler會從RDD依賴鏈的末端觸發,遍曆整個RDD依賴鏈,劃分Stage(調度階段)。劃分依據主要基於ShuffleDependency依賴關係。換句話說,當某RDD在計算中需要將數據進行Shuffle操作時,這個包含Shuffle操作的RDD將會被用來作為輸入信息,構成一個新的Stage。以這個基準作為劃分Stage,可以保證存在依賴關係的數據按照正確數據得到處理和運算。在Spark1.5.0的源代碼中,DAGScheduler.scala中的getParentStages函數的實現從一定角度揭示了Stage的劃分邏輯。

/**

 * 對於給定的RDD構建或獲取父Stage的鏈表。新的Stage構建時會包含參數中提供的firstJobId

 */

private def getParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {

   val parents = new HashSet[Stage]

   val visited = new HashSet[RDD[_]]

   // We are manually maintaining a stack here to prevent StackOverflowError

   // caused by recursively visiting

   val waitingForVisit = new Stack[RDD[_]]

   def visit(r: RDD[_]) {

     if (!visited(r)) {

       visited += r

       // Kind of ugly: need to register RDDs with the cache here since

       // we can't do it in its constructor because # of partitions is unknown

       /* 遍曆RDD的依賴鏈 */

       for (dep <- r.dependencies) {

         dep match {

           /*如果遇見ShuffleDependency,則依據此依賴關係劃分Stage,並添加該Stage的父Stage到哈希列表中*/

           case shufDep: ShuffleDependency[_, _, _] =>

             parents += getShuffleMapStage(shufDep, firstJobId)

           case _ =>

             waitingForVisit.push(dep.rdd)

      }

    }

  }

}

2. Stage調度

在第一步的Stage劃分過程中,會產生一個或者多個互相關聯的Stage。其中,真正執行Action算子的RDD所在的Stage被稱為Final Stage。DAGScheduler會從這個final stage生成作業實例。

在Stage提交時,DAGScheduler首先會判斷該Stage的父Stage的執行結果是否可用。如果所有父Stage的執行結果都可用,則提交該Stage。如果有任意一個父Stage的結果不可用,則嚐試迭代提交該父Stage。所有結果不可用的Stage都將會被加入waiting隊列,等待執行,如圖3-6所示。

 

圖3-6 Stage依賴

在圖3-6中,虛箭頭表示依賴關係。Stage序號越小,表示Stage越靠近上遊。

圖3-6中的Stage調度運行順序如圖3-7所示。

 

圖3-7 Stage執行順序

從圖3-7可以看出,上遊父Stage先得到執行,waiting queue中的stage隨後得到執行。

3. TasksetManager

每個Stage的提交會被轉化為一組task的提交。DAGScheduler最終通過調用taskscheduler的接口來提交這組任務。在taskScheduler內部實現中創建了taskSetManager實例來管理任務集taskSet的生命周期。事實上可以說每個stage對應一個tasksetmanager。

至此,DAGScheduler的工作基本完畢。taskScheduler在得到集群計算資源時,taskSet-Manager會分配task到具體worker節點上執行。在Spark1.5.0的taskSchedulerImpl.scala文件中,提交task的函數實現如下:

override def submitTasks(taskSet: TaskSet) {

    val tasks = taskSet.tasks

    logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")

    this.synchronized {

      /*創建TaskSetManager實例以管理stage包含的任務集*/

      val manager = createTaskSetManager(taskSet, maxTaskFailures)

      val stage = taskSet.stageId

      val stageTaskSets =

        taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])

      stageTaskSets(taskSet.stageAttemptId) = manager

      val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>

        ts.taskSet != taskSet && !ts.isZombie

      }

      if (conflictingTaskSet) {

        throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +

          s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")

      }

      /*將TaskSetManager添加到全局的調度隊列*/

      schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

 

      if (!isLocal && !hasReceivedTask) {

        starvationTimer.scheduleAtFixedRate(new TimerTask() {

          override def run() {

            if (!hasLaunchedTask) {

              logWarning("Initial job has not accepted any resources; " +

                "check your cluster UI to ensure that workers are registered " +

                "and have sufficient resources")

            } else {

              this.cancel()

            }

          }

        }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)

      }

      hasReceivedTask = true

    }

    backend.reviveOffers()

  }

當taskSetManager進入到調度池中時,會依據job id對taskSetManager排序,總體上先進入的taskSetManager先得到調度。對於同一job內的taskSetManager而言,job id較小的先得到調度。如果有的taskSetManager父Stage還未執行完,則該taskSet-Manager不會被放到調度池。

3.2.4 task的調度

在DAGScheduler.scala中,定義了函數submitMissingTasks,讀者閱讀完整實現,從中可以看到task的調度方式。限於篇幅,以下截取部分代碼。

private def submitMissingTasks(stage: Stage, jobId: Int) {

  logDebug("submitMissingTasks(" + stage + ")")

  // Get our pending tasks and remember them in our pendingTasks entry

  stage.pendingTasks.clear()

 

  // First figure out the indexes of partition ids to compute.

  /*過濾出計算位置,用以執行計算*/

  val (allPartitions: Seq[Int], partitionsToCompute: Seq[Int]) = {

    stage match {

      /*針對shuffleMap類型的Stage*/

      case stage: ShuffleMapStage =>

        val allPartitions = 0 until stage.numPartitions

        val filteredPartitions = allPartitions.filter { id =>  stage.outputLocs(id).isEmpty }

        (allPartitions, filteredPartitions)

      /*針對Result類型的Stage*/

      case stage: ResultStage =>

        val job = stage.resultOfJob.get

        val allPartitions = 0 until job.numPartitions

        val filteredPartitions = allPartitions.filter { id => ! job.finished(id) }

        (allPartitions, filteredPartitions)

    }

  }

  .....[以下代碼略]

 

  /*獲取task執行的優先節點*/

  private[spark]

  def getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation]    = {

    getPreferredLocsInternal(rdd, partition, new HashSet)

  }

計算task執行的優先節點位置的代碼實現在getPreferredLocsInternal函數中,具體如下:

/*計算位置的遞歸實現*/

private def getPreferredLocsInternal(

      rdd: RDD[_],

      partition: Int,

      visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = {

    // If the partition has already been visited, no need to re-visit.

    // This avoids exponential path exploration.  SPARK-695

    if (!visited.add((rdd, partition))) {

      // Nil has already been returned for previously visited partitions.

      return Nil

    }

    // 如果調用cache緩存過,則計算緩存位置,讀取緩存分區中的數據

    val cached = getCacheLocs(rdd)(partition)

    if (cached.nonEmpty) {

      return cached

    }

    // 如果能直接獲取到執行地點,則返回作為該task的執行地點

    val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList

    if (rddPrefs.nonEmpty) {

      return rddPrefs.map(TaskLocation(_))

    }

 

    /*針對窄依賴關係的RDD, 取出第一個窄依賴的父RDD分區的執行地點*/

    rdd.dependencies.foreach {

      case n: NarrowDependency[_] =>

        for (inPart <- n.getParents(partition)) {

          val locs = getPreferredLocsInternal(n.rdd, inPart, visited)

          if (locs != Nil) {

            return locs

          }

        }

      case _ =>

    }

 

    /*對於shuffle依賴的rdd,選取至少含REDUCER_PREF_LOCS_FRACTION這麼多數據的位置作為優先節點*/

    if (shuffleLocalityEnabled && rdd.partitions.length < SHUFFLE_PREF_REDUCE_THRESHOLD) {

      rdd.dependencies.foreach {

        case s: ShuffleDependency[_, _, _] =>

          if (s.rdd.partitions.length < SHUFFLE_PREF_MAP_THRESHOLD) {

            // Get the preferred map output locations for this reducer

            val topLocsForReducer = mapOutputTracker.getLocationsWithLargestOu-tputs(s.shuffleId,

              partition, rdd.partitions.length, REDUCER_PREF_LOCS_FRACTION)

            if (topLocsForReducer.nonEmpty) {

              return topLocsForReducer.get.map(loc => TaskLocation(loc.host, loc.executorId))

            }

          }

        case _ =>

      }

    }

    Nil

  }

3.3 Spark存儲與I/O

前麵已經講過,RDD是按照partition分區劃分的,所以RDD可以看作由一些分布在不同節點上的分區組成。由於partition分區與數據塊是一一對應的,所以RDD中保存了partitionID與物理數據塊之間的映射。物理數據塊並非都保存在磁盤上,也有可能保存在內存中。

3.3.1 Spark存儲係統概覽

Spark I/O機製可以分為兩個層次:

1)通信層:用於Master與Slave之間傳遞控製指令、狀態等信息,通信層在架構上也采用Master-Slave結構。

2)存儲層:同於保存數據塊到內存、磁盤,或遠端複製數據塊。

下麵介紹幾個Spark存儲方麵的功能模塊。

1)BlockManager:Spark提供操作Storage的統一接口類。

2)BlockManagerMasterActor:Master創建,Slave利用該模塊向Master傳遞信息。

3)BlockManagerSlaveActor:Slave創建,Master利用該模塊向Slave節點傳遞控製命令,控製Slave節點對block的讀寫。

4)BlockManagerMaster: 管理Actor通信。

5)DiskStore:支持以文件方式讀寫的方式操作block。

6)MemoryStore: 支持內存中的block讀寫。

7)BlockManagerWorker: 對遠端異步傳輸進行管理。

8)ConnectionManager:支持本地節點與遠端節點數據block的傳輸。

圖3-8概要性地揭示了Spark存儲係統各個主要模塊之間的通信。

 

圖3-8 Spark存儲係統概覽

3.3.2 BlockManager中的通信

存儲係統的通信仍然類似Master-Slave架構,節點之間傳遞命令與狀態。總體而言,Master向Slave傳遞命令,Slave向Master傳遞信息和狀態。這些Master與Slave節點之間的信息傳遞通過Actor對象實現(關於Actor的詳細功能會在下一節Spark通信機製中講述)。但在BlockManager中略有不同,下麵分別講述。

1)Master節點上的BlockManagerMaster包含內容如下:

①BlockManagerMasterActor的Actor引用。

②BlockManagerSlaveActor的Ref引用。

2)Slave節點上的BlockManagerMaster包含內容如下:

①BlockManagerMasterActor的Ref引用。

②BlockManagerSlaveActor的Actor引用。

其中,在Ref與Actor之間的通信由BlockManagerMasterActor和BlockManagerSlave-Actor完成。這個部分相關的源碼篇幅較多,此處省略,感興趣的讀者請自行研究。

3.4 Spark通信機製

前麵介紹過,Spark的部署模式可以分為local、standalone、Mesos、YARN等。

本節以Spark部署在standalone模式下為例,介紹Spark的通信機製(其他模式類似)。

3.4.1 分布式通信方式

先介紹分布式通信的幾種基本方式。

1. RPC

遠程過程調用協議(Remote Procedure Call Protocol,RPC)是一種通過網絡從遠程計算機程序上請求服務,而不需要了解底層網絡技術的協議。RPC假定某些傳輸協議的存在,如TCP或UDP,為通信程序之間攜帶信息數據。在OSI網絡通信模型中,RPC跨越了傳輸層和應用層。RPC使得開發分布式應用更加容易。RPC采用C/S架構。請求程序就是一個Client,而服務提供程序就是一個Server。首先,Client調用進程發送一個有進程參數的調用信息到Service進程,然後等待應答信息。在Server端,進程保持睡眠狀態直到調用信息到達為止。當一個調用信息到達時,Server獲得進程參數,計算結果,發送答複信息,然後等待下一個調用信息,最後,Client調用進程接收答複信息,獲得進程結果,然後調用執行繼續進行。

2.  RMI

遠程方法調用(Remote Method Invocation,RMI)是Java的一組擁護開發分布式應用程序的API。RMI使用Java語言接口定義了遠程對象,它集合了Java序列化和Java遠程方法協議(Java Remote Method Protocol)。簡單地說,這樣使原先的程序在同一操作係統的方法調用,變成了不同操作係統之間程序的方法調用。由於J2EE是分布式程序平台,它以RMI機製實現程序組件在不同操作係統之間的通信。比如,一個EJB可以通過RMI調用Web上另一台機器上的EJB遠程方法。RMI可以被看作是RPC的Java版本,但是傳統RPC並不能很好地應用於分布式對象係統。Java RMI 則支持存儲於不同地址空間的程序級對象之間彼此進行通信,實現遠程對象之間的無縫遠程調用。

3.  JMS

Java消息服務(Java Message Service,JMS)是一個與具體平台無關的API,用來訪問消息收發。JMS 使用戶能夠通過消息收發服務(有時稱為消息中介程序或路由器)從一個 JMS 客戶機向另一個JMS客戶機發送消息。消息是 JMS 中的一種類型對象,由兩部分組成:報頭和消息主體。報頭由路由信息以及有關該消息的元數據組成。消息主體則攜帶著應用程序的數據或有效負載。JMS定義了5種消息正文格式,以及調用的消息類型,允許發送並接收以一些不同形式的數據,提供現有消息格式的一些級別的兼容性。

 StreamMessage:Java原始值的數據流。

 MapMessage:一套名稱–值對。

 TextMessage:一個字符串對象。

 ObjectMessage:一個序列化的 Java對象。

 BytesMessage:一個未解釋字節的數據流。

4. EJB

JavaEE服務器端組件模型(Enterprise JavaBean,EJB)的設計目標是部署分布式應用程序。簡單來說就是把已經編寫好的程序打包放在服務器上執行。EJB定義了一個用於開發基於組件的企業多重應用程序的標準。EJB的核心是會話Bean(Session Bean)、實體Bean(Entity Bean)和消息驅動Bean(Message Driven Bean)。

5. Web Service

Web Service是一個平台獨立的、低耦合的、自包含的、基於可編程的Web應用程序。可以使用開放的XML(標準通用標記語言下的一個子集)標準來描述、發布、發現、協調和配置這些應用程序,用於開發分布式的應用程序。Web Service技術能使得運行在不同機器上的不同應用無須借助第三方軟硬件, 就可相互交換數據或集成。Web Service減少了應用接口的花費。Web Service為整個企業甚至多個組織之間的業務流程的集成提供了一個通用機製。

3.4.2 通信框架AKKA

AKKA是一個用Scala語言編寫的庫,用於簡化編寫容錯的、高可伸縮性的Java和Scala的Actor模型應用。它分為開發庫和運行環境,可以用於構建高並發、分布式、可容錯、事件驅動的基於JVM的應用。AKKA使構建高並發的分布式應用變得更加容易。Akka已經被成功運用在眾多行業的眾多大企業,從投資業到商業銀行、從零售業到社會媒體、仿真、遊戲和賭博、汽車和交通係統、數據分析等。任何需要高吞吐率和低延遲的係統都是使用AKKA的候選,因此Spark選擇AKKA通信框架來支持模塊間的通信。

Actor模型常見於並發編程,它由Carl Hewitt於20世紀70年代早期提出,目的是解決分布式編程中的一係列問題。其特點如下:

1) 係統中的所有事物都可以扮演一個Actor。

2) Actor之間完全獨立。

3) 在收到消息時Actor采取的所有動作都是並行的。

4) Actor有標識和對當前行為的描述。

Actor可以看作是一個個獨立的實體,它們之間是毫無關聯的。但是,它們可以通過消息來通信。當一個Actor收到其他Actor的信息後,它可以根據需要做出各種響應。消息的類型和內容都可以是任意的。這點與Web Service類似,隻提供接口服務,不必了解內部實現。一個Actor在處理多個Actor的請求時,通常先建立一個消息隊列,每次收到消息後,就放入隊列。Actor每次也可以從隊列中取出消息體來處理,而且這個過程是可循環的,這個特點讓Actor可以時刻處理發送來的消息。

AKKA的優勢如下:

1) 易於構建並行與分布式應用(simple concurrency & distribution):AKKA采用異步通信與分布式架構,並對上層進行抽象,如Actors、Futures、STM等。

2) 可靠性(resilient by design):係統具備自愈能力,在本地/遠程都有監護。

3) 高性能(high performance):在單機中每秒可發送5000萬個消息。內存占用小,1GB內存中可保存250萬個actors。

4) 彈性,無中心(elastic — decentralized):自適應的負責均衡、路由、分區、配置。

5) 可擴展性(extensible):可以使用Akka擴展包進行擴展。

3.4.3 Client、Master 和 Worker之間的通信

Client、Master與Worker之間的交互代碼實現位於如下路徑:

(spark-root)/core/src/main/scala/org/apache/spark/deploy

主要涉及的類包括Client.scala、Master.scala和Worker.scala。這三大模塊之間的通信框架如圖3-9所示:

 

圖3-9 Client、Master和Worker之間的通信

以Standalone部署模式為例,三大模塊分工如下:

1)Client:提交作業給Master。

2)Master:接收Client提交的作業,管理Worker,並命令Worker啟動Driver和Executor。

3)Worker:負責管理本節點的資源,定期向Master匯報心跳信息,接收Master的命令,如啟動Driver和Executor。

下麵列出Client、Master與Worker的實現代碼,讀者可以從中看到三個模塊間的通信交互。

1. Client端通信

private class ClientEndpoint(

   override val rpcEnv: RpcEnv,

   driverArgs: ClientArguments,

   masterEndpoints: Seq[RpcEndpointRef],

   conf: SparkConf)

   extends ThreadSafeRpcEndpoint with Logging {

 

   <限於篇幅,此處代碼省略……>

 

  override def onStart(): Unit = {

  driverArgs.cmd match {

 

    case "launch" =>

 

      val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"

 

      val classPathConf = "spark.driver.extraClassPath"

      val classPathEntries = sys.props.get(classPathConf).toSeq.flatMap { cp =>

        cp.split(java.io.File.pathSeparator)

      }

 

      val libraryPathConf = "spark.driver.extraLibraryPath"

      val libraryPathEntries = sys.props.get (libraryPathConf).toSeq.flatMap { cp =>

        cp.split(java.io.File.pathSeparator)

      }

 

      val extraJavaOptsConf = "spark.driver.extraJavaOptions"

      val extraJavaOpts = sys.props.get(extraJavaOptsConf)

        .map(Utils.splitCommandString).getOrElse(Seq.empty)

      val sparkJavaOpts = Utils.sparkJavaOpts(conf)

      val javaOpts = sparkJavaOpts ++ extraJavaOpts

      val command = new Command(mainClass,

        Seq("{{WORKER_URL}}", "{{USER_JAR}}", driverArgs.mainClass) ++ driverArgs.driverOptions,

        sys.env, classPathEntries, libraryPathEntries, javaOpts)

 

      /* 創建driverDescription對象 */

      val driverDescription = new DriverDescription(

        driverArgs.jarUrl,

        driverArgs.memory,

        driverArgs.cores,

        driverArgs.supervise,

        command)

 

      /* 此處向Master的Actor提交Driver*/

      ayncSendToMasterAndForwardReply[SubmitDriverResponse](

        RequestSubmitDriver(driverDescription))

 

    case "kill" =>

      val driverId = driverArgs.driverId

 

      /* 接收停止Driver是否成功的通知 */

      ayncSendToMasterAndForwardReply[KillDriverResponse](RequestKill-Driver(driverId))

  }

}

 

 /* 向Master發送消息,並異步地轉發返回信息給Client */

  private def ayncSendToMasterAndForwardReply[T: ClassTag](message: Any): Unit = {

    for (masterEndpoint <- masterEndpoints) {

      masterEndpoint.ask[T](message).onComplete {

        case Success(v) => self.send(v)

        case Failure(e) =>

          logWarning(s"Error sending messages to master $masterEndpoint", e)

      }(forwardMessageExecutionContext)

    }

  }

2. Master端通信

private[deploy] class Master(

    override val rpcEnv: RpcEnv,

    address: RpcAddress,

    webUiPort: Int,

    val securityMgr: SecurityManager,

    val conf: SparkConf)

  extends ThreadSafeRpcEndpoint with Logging with LeaderElectable {

  ……

 

  override def receive: PartialFunction[Any, Unit] = {

     /* 選舉為Master,當狀態為RecoveryState.RECOVERING時恢複 */

     case ElectedLeader => {

       val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData(rpcEnv)

       state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) {

       RecoveryState.ALIVE

       } else {

       RecoveryState.RECOVERING

       }

       logInfo("I have been elected leader! New state: " + state)

       if (state == RecoveryState.RECOVERING) {

       beginRecovery(storedApps, storedDrivers, storedWorkers)

       recoveryCompletionTask = forwardMessageThread.schedule(new Runnable {

         override def run(): Unit = Utils.tryLogNonFatalError {

         self.send(CompleteRecovery)

         }

       }, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)

      }

      }

     /* 完成恢複 */

     case CompleteRecovery => completeRecovery()

 

     case RevokedLeadership => {

        logError("Leadership has been revoked -- master shutting down.")

        System.exit(0)

     }

     /* 注冊worker */

     case RegisterWorker(

        id, workerHost, workerPort, workerRef, cores, memory, workerUiPort, publicAddress) => {

        logInfo("Registering worker %s:%d with %d cores, %s RAM".format(

        workerHost, workerPort, cores, Utils.megabytesToString(memory)))

 

        /* 當狀態為RecoveryState.STANDBY時,不注冊 */

        if (state == RecoveryState.STANDBY) {

        // ignore, don't send response

        } else if (idToWorker.contains(id)) {

 

        /* 重複注冊,通知注冊失敗 */

          workerRef.send(RegisterWorkerFailed("Duplicate worker ID"))

       } else {

          val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,

          workerRef, workerUiPort, publicAddress)

          if (registerWorker(worker)) {

 

             /* 注冊成功,通知worker節點 */

             persistenceEngine.addWorker(worker)

             workerRef.send(RegisteredWorker(self, masterWebUiUrl))

             schedule()

          } else {

             val workerAddress = worker.endpoint.address

             logWarning("Worker registration failed. Attempted to re-register worker at same " +"address: " + workerAddress)

 

             /* 注冊失敗,通知Worker節點 */

             workerRef.send(RegisterWorkerFailed("Attempted to re-register worker at same address: "+ workerAddress))

          }

        }

      }

 

      /* 通知Executor的Driver更新狀態 */

      case ExecutorStateChanged(appId, execId, state, message, exitStatus) => {

      ……

 

 

 

 

 override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {

 

  case RequestSubmitDriver(description) => {

 

     /* 當Master狀態不為ALIVE的時候,通知Client無法提交Driver */

     if (state != RecoveryState.ALIVE) {

       val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +

         "Can only accept driver submissions in ALIVE state."

       context.reply(SubmitDriverResponse(self, false, None, msg))

     } else {

       logInfo("Driver submitted " + description.command.mainClass)

       val driver = createDriver(description)

       persistenceEngine.addDriver(driver)

       waitingDrivers += driver

       drivers.add(driver)

       schedule()

 

       /* 提交Driver */

       context.reply(SubmitDriverResponse(self, true, Some(driver.id), s"Driver successfully submitted as ${driver.id}"))

     }

   }  

 

   case RequestKillDriver(driverId) => {

     if (state != RecoveryState.ALIVE) {

        val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " + s"Can only kill drivers in ALIVE state."

 

        /* 當Master不為ALIVE時,通知無法終止Driver */

        context.reply(KillDriverResponse(self, driverId, success = false, msg))

     } else {

        logInfo("Asked to kill driver " + driverId)

        val driver = drivers.find(_.id == driverId)

        driver match {

        case Some(d) =>

          if (waitingDrivers.contains(d)) {

 

            /* 當想kill的Driver在等待隊列中時,刪除Driver並更新狀態為KILLED */

            waitingDrivers -= d

            self.send(DriverStateChanged(driverId, DriverState.KILLED, None))

          } else {

 

            /* 通知worker,Driver被終止 */

            d.worker.foreach { w =>

              w.endpoint.send(KillDriver(driverId))

            }

          }

          // TODO: It would be nice for this to be a synchronous response

          val msg = s"Kill request for $driverId submitted"

          logInfo(msg)

 

          /* 通知請求者,終止Driver的請求已提交 */

          context.reply(KillDriverResponse(self, driverId, success = true, msg))

      case None =>

        val msg = s"Driver $driverId has already finished or does not exist"

        logWarning(msg)

 

        /* 通知請求者,Driver已被終止或不存在 */

        context.reply(KillDriverResponse(self, driverId, success = false, msg))

     }

   }

 }

 ……

3. Worker端通信邏輯

private[deploy] class Worker(

   override val rpcEnv: RpcEnv,

   webUiPort: Int,

   cores: Int,

   memory: Int,

   masterRpcAddresses: Array[RpcAddress],

   systemName: String,

   endpointName: String,

   workDirPath: String = null,

   val conf: SparkConf,

   val securityMgr: SecurityManager)

 extends ThreadSafeRpcEndpoint with Logging {

 

   ……

   override def receive: PartialFunction[Any, Unit] = {

      /* 注冊worker */

      case RegisteredWorker(masterRef, masterWebUiUrl) =>

          ……

 

      /* 向Master發送心跳 */

      case SendHeartbeat =>

          if (connected) { sendToMaster(Heartbeat(workerId, self)) }

 

      /* 清理舊應用的工作目錄 */

      case WorkDirCleanup =>

          // Spin up a separate thread (in a future) to do the dir cleanup; don't tie up worker

          // rpcEndpoint.

          // Copy ids so that it can be used in the cleanup thread.

          val appIds = executors.values.map(_.appId).toSet

          val cleanupFuture = concurrent.future {

          ……

 

      /* 新Master選舉產生時,Work更新Master相關信息,包括URL等 */

      case MasterChanged(masterRef, masterWebUiUrl) =>

          logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL)

          changeMaster(masterRef, masterWebUiUrl)

          ……

      /* worker向主節點注冊失敗 */

      case RegisterWorkerFailed(message) =>

         if (!registered) {

            logError("Worker registration failed: " + message)

            System.exit(1)

      }

 

      /* worker重新連接向Master注冊 */

      case ReconnectWorker(masterUrl) =>

          logInfo(s"Master with url $masterUrl requested this worker to reconnect.")

          registerWithMaster()

 

      /* 啟動Executor */

      case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>

          ……

 

          /* 啟動ExecutorRunner */

          val manager = new ExecutorRunner(

          ……

 

      /* executor狀態改變 */

      case executorStateChanged @ ExecutorStateChanged(appId, execId, state, message, exitStatus) =>

          /* 通知Master executor狀態改變 */

          handleExecutorStateChanged(executorStateChanged)

 

      /* 終止當前節點上運行的Executor */

      case KillExecutor(masterUrl, appId, execId) =>

          if (masterUrl != activeMasterUrl) {

             logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor " + execId)

          } else {

             val fullId = appId + "/" + execId

             executors.get(fullId) match {

                case Some(executor) =>

                   logInfo("Asked to kill executor " + fullId)

                   executor.kill()

                case None =>

                   logInfo("Asked to kill unknown executor " + fullId)

          }

          ……

 

      /* 啟動Driver */

      case LaunchDriver(driverId, driverDesc) => {

         logInfo(s"Asked to launch driver $driverId")

         /* 創建DriverRunner */

         val driver = new DriverRunner(...)

         drivers(driverId) = driver

         /* 啟動Driver */

         driver.start()

         ……

 

       /* 終止worker節點上運行的Driver */

       case KillDriver(driverId) => {

         logInfo(s"Asked to kill driver $driverId")

         drivers.get(driverId) match {

             case Some(runner) =>

                runner.kill()

             case None =>

                logError(s"Asked to kill unknown driver $driverId")

         ……

 

       /* Driver狀態更新 */             

       case driverStateChanged @ DriverStateChanged(driverId, state, exception) => {

          handleDriverStateChanged(driverStateChanged)

       }

 

       ……

3.5 容錯機製及依賴

一般而言,對於分布式係統,數據集的容錯性通常有兩種方式:

1) 數據檢查點(在Spark中對應Checkpoint機製)。

2) 記錄數據的更新(在Spark中對應Lineage血統機製)。

對於大數據分析而言,數據檢查點操作成本較高,需要通過數據中心的網絡連接在機器之間複製龐大的數據集,而網絡帶寬往往比內存帶寬低,同時會消耗大量存儲資源。

Spark選擇記錄更新的方式。但更新粒度過細時,記錄更新成本也不低。因此,RDD隻支持粗粒度轉換,即隻記錄單個塊上執行的單個操作,然後將創建RDD的一係列變換序列記錄下來,以便恢複丟失的分區。

3.5.1 Lineage(血統)機製

每個RDD除了包含分區信息外,還包含它從父輩RDD變換過來的步驟,以及如何重建某一塊數據的信息,因此RDD的這種容錯機製又稱“血統”(Lineage)容錯。Lineage本質上很類似於數據庫中的重做日誌(Redo Log),隻不過這個重做日誌粒度很大,是對全局數據做同樣的重做以便恢複數據。

相比其他係統的細顆粒度的內存數據更新級別的備份或者LOG機製,RDD的Lineage記錄的是粗顆粒度的特定數據Transformation操作(如filter、map、join等)。當這個RDD的部分分區數據丟失時,它可以通過Lineage獲取足夠的信息來重新計算和恢複丟失的數據分區。但這種數據模型粒度較粗,因此限製了Spark的應用場景。所以可以說Spark並不適用於所有高性能要求的場景,但同時相比細顆粒度的數據模型,也帶來了性能方麵的提升。

RDD在Lineage容錯方麵采用如下兩種依賴來保證容錯方麵的性能:

窄依賴(Narrow Dependeny):窄依賴是指父RDD的每一個分區最多被一個子RDD的分區所用,表現為一個父RDD的分區對應於一個子RDD的分區,或多個父RDD的分區對應於一個子RDD的分區。也就是說一個父RDD的一個分區不可能對應一個子RDD的多個分區。其中,1個父RDD分區對應1個子RDD分區,可以分為如下兩種情況:

子RDD分區與父RDD分區一一對應(如map、filter等算子)。

一個子RDD分區對應N個父RDD分區(如co-paritioned(協同劃分)過的Join)。

 寬依賴(Wide Dependency,源碼中稱為Shuffle Dependency):

寬依賴是指一個父RDD分區對應多個子RDD分區,可以分為如下兩種情況:

一個父RDD對應所有子RDD分區(未經協同劃分的Join)。

一個父RDD對應多個RDD分區(非全部分區)(如groupByKey)。

窄依賴與寬依賴關係如圖3-10所示。

從圖3-10可以看出對依賴類型的劃分:根據父RDD分區是對應一個還是多個子RDD分區來區分窄依賴(父分區對應一個子分區)和寬依賴(父分區對應多個子分區)。如果對應多個,則當容錯重算分區時,對於需要重新計算的子分區而言,隻需要父分區的一部分數據,因此其餘數據的重算就導致了冗餘計算。

 

圖3-10 兩種依賴關係

對於寬依賴,Stage計算的輸入和輸出在不同的節點上,對於輸入節點完好,而輸出節點死機的情況,在通過重新計算恢複數據的情況下,這種方法容錯是有效的,否則無效,因為無法重試,需要向上追溯其祖先看是否可以重試(這就是lineage,血統的意思),窄依賴對於數據的重算開銷要遠小於寬依賴的數據重算開銷。

窄依賴和寬依賴的概念主要用在兩個地方:一個是容錯中相當於Redo日誌的功能;另一個是在調度中構建DAG作為不同Stage的劃分點(前麵調度機製中已講過)。

依賴關係在lineage容錯中的應用總結如下:

1)窄依賴可以在某個計算節點上直接通過計算父RDD的某塊數據計算得到子RDD對應的某塊數據;寬依賴則要等到父RDD所有數據都計算完成,並且父RDD的計算結果進行hash並傳到對應節點上之後,才能計算子RDD。

2)數據丟失時,對於窄依賴,隻需要重新計算丟失的那一塊數據來恢複;對於寬依賴,則要將祖先RDD中的所有數據塊全部重新計算來恢複。所以在長“血統”鏈特別是有寬依賴時,需要在適當的時機設置數據檢查點(checkpoint機製在下節講述)。可見Spark在容錯性方麵要求對於不同依賴關係要采取不同的任務調度機製和容錯恢複機製。

在Spark容錯機製中,如果一個節點宕機了,而且運算屬於窄依賴,則隻要重算丟失的父RDD分區即可,不依賴於其他節點。而寬依賴需要父RDD的所有分區都存在,重算就很昂貴了。更深入地來說:在窄依賴關係中,當子RDD的分區丟失,重算其父RDD分區時,父RDD相應分區的所有數據都是子RDD分區的數據,因此不存在冗餘計算。而在寬依賴情況下,丟失一個子RDD分區重算的每個父RDD的每個分區的所有數據並不是都給丟失的子RDD分區使用,其中有一部分數據對應的是其他不需要重新計算的子RDD分區中的數據,因此在寬依賴關係下,這樣計算就會產生冗餘開銷,這也是寬依賴開銷更大的原因。為了減少這種冗餘開銷,通常在Lineage血統鏈比較長,並且含有寬依賴關係的容錯中使用Checkpoint機製設置檢查點。

3.5.2 Checkpoint(檢查點)機製

通過上述分析可以看出Checkpoint的本質是將RDD寫入Disk來作為檢查點。這種做法是為了通過lineage血統做容錯的輔助,lineage過長會造成容錯成本過高,這樣就不如在中間階段做檢查點容錯,如果之後有節點出現問題而丟失分區,從做檢查點的RDD開始重做Lineage,就會減少開銷。

下麵從代碼層麵介紹Checkpoint的實現。

1. 設置檢查點數據的存取路徑[SparkContext.scala]

/* 設置作為RDD檢查點的目錄,如果是集群上運行,則必須為HDFS路徑 */

def setCheckpointDir(directory: String) {

 

    // If we are running on a cluster, log a warning if the directory is local.

    // Otherwise, the driver may attempt to reconstruct the checkpointed RDD from

    // its own local file system, which is incorrect because the checkpoint files

    // are actually on the executor machines.

    if (!isLocal && Utils.nonLocalPaths(directory).isEmpty) {

       logWarning("Checkpoint directory must be non-local " +

       "if Spark is running on a cluster: " + directory)

    }

 

    checkpointDir = Option(directory).map { dir =>

        val path = new Path(dir, UUID.randomUUID().toString)

        val fs = path.getFileSystem(hadoopConfiguration)

        fs.mkdirs(path)

        fs.getFileStatus(path).getPath.toString

    }

 }

2. 設置檢查點的具體實現

[RDD.scala]

/* 設置檢查點入口 */

private[spark] def doCheckpoint(): Unit = {

    RDDOperationScope.withScope(sc, "checkpoint", allowNesting = false, ignoreParent = true) {

      if (!doCheckpointCalled) {

          doCheckpointCalled = true

      if (checkpointData.isDefined) {

          checkpointData.get.checkpoint()

      } else {

          /*  */              

          dependencies.foreach(_.rdd.doCheckpoint())

      }

    }

   }

}

 

[RDDCheckPointData.scala]

/* 設置檢查點,在子類中會覆蓋此函數以實現具體功能 */

protected def doCheckpoint(): CheckpointRDD[T]

 

[ReliableRDDCheckpointData.scala]

/* 設置檢查點,將RDD內容寫入可靠的分布式文件係統中 */

protected override def doCheckpoint(): CheckpointRDD[T] = {

 

    /* 為檢查點創建輸出目錄 */

    val path = new Path(cpDir)

    val fs = path.getFileSystem(rdd.context.hadoopConfiguration)

    if (!fs.mkdirs(path)) {

        throw new SparkException(s"Failed to create checkpoint path $cpDir")

    }

 

    /* 保存為文件,加載時作為一個RDD加載 */

    val broadcastedConf = rdd.context.broadcast(

       new SerializableConfiguration(rdd.context.hadoopConfiguration))

 

    /* 重新計算RDD */

    rdd.context.runJob(rdd, ReliableCheckpointRDD.writeCheckpointFile[T](cpDir, broadcastedConf) _)

    val newRDD = new ReliableCheckpointRDD[T](rdd.context, cpDir)

    if (newRDD.partitions.length != rdd.partitions.length) {

    throw new SparkException(

        s"Checkpoint RDD $newRDD(${newRDD.partitions.length}) has different " +

        s"number of partitions from original RDD $rdd(${rdd.partitions.length})")

    }

 

    /* 當引用不在此範圍時,清除檢查點文件 */

    if (rdd.conf.getBoolean("spark.cleaner.referenceTracking.cleanCheckpoints", false)) {

         rdd.context.cleaner.foreach { cleaner =>

         cleaner.registerRDDCheckpointDataForCleanup(newRDD, rdd.id)

    }

   }

 

   logInfo(s"Done checkpointing RDD ${rdd.id} to $cpDir, new parent is RDD ${newRDD.id}")

 

   newRDD

 

   }

}

3.6 Shuffle機製

在MapReduce框架中,Shuffle是連接Map和Reduce之間的橋梁,Map的輸出要用到Reduce中必須經過Shuffle這個環節,Shuffle的性能高低直接影響了整個程序的性能和吞吐量。Spark作為MapReduce框架的一種實現,自然也實現了Shuffle的邏輯。對於大數據計算框架而言,Shuffle階段的效率是決定性能好壞的關鍵因素之一。

3.6.1 什麼是Shuffle

Shuffle是MapReduce框架中的一個特定的階段,介於Map階段和Reduce階段之間,當Map的輸出結果要被Reduce使用時,輸出結果需要按關鍵字值(key)哈希,並且分發到每一個Reducer上,這個過程就是Shuffle。直觀來講,Spark Shuffle機製是將一組無規則的數據轉換為一組具有一定規則數據的過程。由於Shuffle涉及了磁盤的讀寫和網絡的傳輸,因此Shuffle性能的高低直接影響整個程序的運行效率。

在MapReduce計算框架中,Shuffle連接了Map階段和Reduce階段,即每個Reduce Task從每個Map Task產生的數據中讀取一片數據,極限情況下可能觸發M*R個數據拷貝通道(M是Map Task數目,R是Reduce Task數目)。通常Shuffle分為兩部分:Map階段的數據準備和Reduce階段的數據拷貝。首先,Map階段需根據Reduce階段的Task數量決定每個Map Task輸出的數據分片數目,有多種方式存放這些數據分片:

1) 保存在內存中或者磁盤上(Spark和MapReduce都存放在磁盤上)。

2) 每個分片對應一個文件(現在Spark采用的方式,以及以前MapReduce采用的方式),或者所有分片放到一個數據文件中,外加一個索引文件記錄每個分片在數據文件中的偏移量(現在MapReduce采用的方式)。

因此可以認為Spark Shuffle與Mapreduce Shuffle的設計思想相同,但在實現細節和優化方式上不同。

在Spark中,任務通常分為兩種,Shuffle mapTask和reduceTask,具體邏輯如圖3-11所示:

 

圖3-11 Spark Shuffle

圖3-11中的主要邏輯如下:

1)首先每一個MapTask會根據ReduceTask的數量創建出相應的bucket,bucket的數量是M×R,其中M是Map的個數,R是Reduce的個數。

2)其次MapTask產生的結果會根據設置的partition算法填充到每個bucket中。這裏的partition算法是可以自定義的,當然默認的算法是根據key哈希到不同的bucket中。

當ReduceTask啟動時,它會根據自己task的id和所依賴的Mapper的id從遠端或本地的block manager中取得相應的bucket作為Reducer的輸入進行處理。

這裏的bucket是一個抽象概念,在實現中每個bucket可以對應一個文件,可以對應文件的一部分或是其他等。Spark shuffle可以分為兩部分:

1) 將數據分成bucket,並將其寫入磁盤的過程稱為Shuffle Write。

2) 在存儲Shuffle數據的節點Fetch數據,並執行用戶定義的聚集操作,這個過程稱為Shuffle Fetch。

3.6.2 Shuffle曆史及細節

下麵介紹Shuffle Write與Fetch。

1. Shuffle Write

在Spark的早期版本實現中,Spark在每一個MapTask中為每個ReduceTask創建一個bucket,並將RDD計算結果放進bucket中。

但早期的Shuffle Write有兩個比較大的問題。

1)Map的輸出必須先全部存儲到內存中,然後寫入磁盤。這對內存是非常大的開銷,當內存不足以存儲所有的Map輸出時就會出現OOM(Out of Memory)。

2)每個MapTask會產生與ReduceTask數量一致的Shuffle文件,如果MapTask個數是1k,ReduceTask個數也是1k,就會產生1M個Shuffle文件。這對於文件係統是比較大的壓力,同時在Shuffle數據量不大而Shuffle文件又非常多的情況下,隨機寫也會嚴重降低IO的性能。

後來到了Spark 0.8版實現時,顯著減少了Shuffle的內存壓力,現在Map輸出不需要先全部存儲在內存中,再flush到硬盤,而是record-by-record寫入磁盤中。對於Shuffle文件的管理也獨立出新的ShuffleBlockManager進行管理,而不是與RDD cache文件在一起了。

但是Spark 0.8版的Shuffle Write仍然有兩個大的問題沒有解決。

1)Shuffle文件過多的問題。這會導致文件係統的壓力過大並降低IO的吞吐量。

2)雖然Map輸出數據不再需要預先存儲在內存中然後寫入磁盤,從而顯著減少了內存壓力。但是新引入的DiskObjectWriter所帶來的buffer開銷也是不容小視的內存開銷。假定有1k個MapTask和1k個ReduceTask,就會有1M個bucket,相應地就會有1M個write handler,而每一個write handler默認需要100KB內存,那麼總共需要100GB內存。這樣僅僅是buffer就需要這麼多的內存。因此當ReduceTask數量很多時,內存開銷會很大。

為了解決shuffle文件過多的情況,Spark後來引入了新的Shuffle consolidation,以期顯著減少Shuffle文件的數量。

Shuffle consolidation的原理如圖3-12所示:

在圖3-12中,假定該job有4個Mapper和4個Reducer,有2個core能並行

最後更新:2017-05-19 15:03:50

  上一篇:go  微服務
  下一篇:go  用SLF4J和Guidce記錄日誌