閱讀259 返回首頁    go 阿裏雲 go 技術社區[雲棲]


Spark Core源碼分析: Spark任務執行模型

DAGScheduler

麵向stage的調度層,為job生成以stage組成的DAG,提交TaskSet給TaskScheduler執行。

每一個Stage內,都是獨立的tasks,他們共同執行同一個compute function,享有相同的shuffledependencies。DAG在切分stage的時候是依照出現shuffle為界限的。

private[spark]
class DAGScheduler(
    taskScheduler: TaskScheduler,
    listenerBus: LiveListenerBus,
    mapOutputTracker: MapOutputTrackerMaster,
    blockManagerMaster: BlockManagerMaster,
    env: SparkEnv)
  extends Logging {

// Actor模式收取發來的DAGSchedulerEvent,並進行processEvent
private var eventProcessActor: ActorRef = _

  private[scheduler] val nextJobId = new AtomicInteger(0)
  private[scheduler] def numTotalJobs: Int = nextJobId.get()
  private val nextStageId = new AtomicInteger(0)

  // 一係列信息維護,很清晰
  private[scheduler] val jobIdToStageIds = new HashMap[Int, HashSet[Int]]
  private[scheduler] val stageIdToJobIds = new HashMap[Int, HashSet[Int]]
  private[scheduler] val stageIdToStage = new HashMap[Int, Stage]
  private[scheduler] val shuffleToMapStage = new HashMap[Int, Stage]
  private[scheduler] val jobIdToActiveJob = new HashMap[Int, ActiveJob]
  private[scheduler] val resultStageToJob = new HashMap[Stage, ActiveJob]
  private[scheduler] val stageToInfos = new HashMap[Stage, StageInfo]

  // 不同狀態stages的維護,很清晰
  // Stages we need to run whose parents aren't done
  private[scheduler] val waitingStages = new HashSet[Stage]

  // Stages we are running right now
  private[scheduler] val runningStages = new HashSet[Stage]

  // Stages that must be resubmitted due to fetch failures
  private[scheduler] val failedStages = new HashSet[Stage]

  // Missing tasks from each stage
  private[scheduler] val pendingTasks = new HashMap[Stage, HashSet[Task[_]]]

  private[scheduler] val activeJobs = new HashSet[ActiveJob]

  // Contains the locations that each RDD's partitions are cached on
  private val cacheLocs = new HashMap[Int, Array[Seq[TaskLocation]]]

在start()方法中會初始化Actor,然後接收DAGSchedulerEvent處理。Scheduler會在SparkContext裏start起來。


Event處理

源碼的閱讀入口:可以根據processEvent(event:DAGSchedulerEvent)方法展開。

處理的事件包括這麼一些:



Submit Job

JobSubmitted事件:



提交任務的事件傳入參數如下

case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties)

處理過程可以拆成三步看,每一步裏麵涉及的具體邏輯在下麵補充展開


finalStage = newStage(rdd, partitions.size, None, jobId, Some(callSite))

本次newStage()操作可以對應新的result stage或者shuffle stage。返回Stage類(裏麵記錄一些信息)。Stage類會傳入Option[ShuffleDependency[_,_]]參數,內部有一個isShuffleMap變量,以標識該Stage是shuffle or result。


val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)

ActiveJob類也是記錄一些信息的類,可以當作是一個VO類


if (allowLocal && finalStage.parents.size == 0 && partitions.length == 1) {
// Compute very short actions like first() or take() 
// with no parent stages locally.
listenerBus.post(SparkListenerJobStart(
job.jobId, Array[Int](), properties))
runLocally(job)
} else {
jobIdToActiveJob(jobId) = job
activeJobs += job
resultStageToJob(finalStage) = job
listenerBus.post(
SparkListenerJobStart(
job.jobId, jobIdToStageIds(jobId).toArray, properties))
submitStage(finalStage)
}

首先判斷stage沒有父親依賴,且partition為1的話,就執行本地任務。否則,submitStage。

 

submitStage的邏輯為,首先尋找本次stage的parents。如果沒有missing的parent stage,那麼就submitMissingTask,即提交本次stage的tasks了。如果有,會對parent stage進行遞歸submitStage,而且getMissingParentStages得到的結果集是按id降序排的,也就是說遞歸submitStage的時候會按parent stage的id順序進行。

 

submitMissingTask處理的是stage的parent已經available的stage。主要邏輯如下:

第一步:通過stage.isShuffleMap來決定生成ShuffleMapTask還是ResultTask,生成的ShuffleMapTask數目和partition數目相等。

第二步:把生成的tasks組建成一個TaskSet,提交給TaskScheduler的submitTasks方法。


TaskScheduler

DAGScheduler以stage為單位,提tasks給TaskScheduer,實現類為TaskSchedulerImpl。

 

TaskSchedulerImpl幾個內部部件:

SchedulerBackend

SchedulableBuilder

DAGScheduler

TaskSetManager

TaskResultGetter

Tasks信息(taskIdToTaskSetId,taskIdToExecutorId,activeExecutorIds)

別的信息(SchedulerMode)

 

TaskScheduler做接收task、接收分到的資源和executor、維護信息、與backend打交道、把任務分配好等事情。

 

start(),stop()的時候,backend的start(),stop()


submitTasks(TaskSet)邏輯:

為這批Task生成新的TaskSetManager,把TaskSetManager加到SchedulerBuilder裏,然後向backend進行一次reviveOffer()操作。


SchedulerBuilder

SchedulableBuilder有FIFO和Fair兩種實現, addTaskSetManager會把TaskSetManager加到pool裏。FIFO的話隻有一個pool。Fair有多個pool,Pool也分FIFO和Fair兩種模式。

SchedulableBuilder的rootPool裏麵可以新增pool或者TaskSetManager,兩者都是Scheduable的繼承類,所以SchedulableBuilder用於維護rootPool這棵Scheduable 樹結構。Pool是樹上的非葉子節點,而TaskSetManager就是葉子節點。

在TaskScheduler初始化的時候會buildDafaultPool。


                            


TaskSetManager

TaskSetManager負責這批Tasks的啟動,失敗重試,感知本地化等事情。每次reourseOffer方法會尋找合適(符合條件execId, host, locality)的Task並啟動它。

 

reourseOffer方法,

  def resourceOffer(
      execId: String,
      host: String,
      maxLocality: TaskLocality.TaskLocality)

尋找符合execId, host和locality的task,找到的話就啟動這個Task。啟動的時候,把task加到runningTask的HashSet裏,然後調DAGScheduler的taskStarted方法,taskStarted方法向eventProcessorActor發出BeginEvent的DAGSchedulerEvent。


TaskResultGetter

維護一個線程池,用來反序列化和從遠端獲取task結果。


def enqueueSuccessfulTask(taskSetManager: TaskSetManager, tid: Long, serializedData: ByteBuffer)

把序列化的數據反序列化解析出來之後,有兩種情況:直接可讀的result和間接task result。


前者是DirectTaskResult[T]類:

class DirectTaskResult[T](var valueBytes: ByteBuffer, var accumUpdates: Map[Long, Any], var metrics: TaskMetrics)

後者是IndirectTaskResult[T]類:

case class IndirectTaskResult[T](blockId: BlockId) extends TaskResult[T] with Serializable

在反解析出IndirectTaskResult後,可以得到BlockId這個類,他的實現有這麼些:


在TaskResultGetter裏,會通過blockManager的getRemoteBytes(BlockId)方法來獲得序列化的task result,對這個task result進行反解析後得到DirectTaskResult類,從而獲得反序列化後的真正結果數據。

這是大致的一個過程,具體還有一些細節在之中,比如會向scheduler發送不同的event、blockManager會調用BlockManagerMaster把該Block remove掉。

 

BlockId類有這麼些關鍵變量:

private[spark] sealed abstract class BlockId {
  /** A globally unique identifier for this Block. Can be used for ser/de. */
  def name: String

  // convenience methods
  def asRDDId = if (isRDD) Some(asInstanceOf[RDDBlockId]) else None
  def isRDD = isInstanceOf[RDDBlockId]
  def isShuffle = isInstanceOf[ShuffleBlockId]
  def isBroadcast = isInstanceOf[BroadcastBlockId]

下麵看BlockManager如何通過BlockId獲得數據:

調用的是BlockManager的內部方法

private def doGetRemote(blockId: BlockId, asValues: Boolean): Option[Any] = {
require(blockId != null, "BlockId is null")
// 通過BlockManagerMaster獲得這個blockId的locations
    val locations = Random.shuffle(master.getLocations(blockId))
    for (loc <- locations) {
      logDebug("Getting remote block " + blockId + " from " + loc)
      // 使用BlockManagerWorker來獲得block的數據
      val data = BlockManagerWorker.syncGetBlock(
        GetBlock(blockId), ConnectionManagerId(loc.host, loc.port))
      if (data != null) {
        if (asValues) {
          // 取到就返回
          return Some(dataDeserialize(blockId, data))
        } else {
          return Some(data)
        }
      }
      logDebug("The value of block " + blockId + " is null")
    }
    logDebug("Block " + blockId + " not found")
    None
  }

思路是通過BlockManagerMaster來獲得block的位置信息,得到的集合打亂後,遍曆位置信息,通過BlockManagerWorker去獲得數據,隻要得到了,就反序列化之後返回。

 

在TaskResultGetter處理的時候,成功和失敗分別向Scheduler調用handleSuccessfulTask和handleFailedTask方法。

handleSuccessfulTask在DAGScheduler裏,會發出CompletionEvent事件,這一步結尾工作會有很多細節處理,這裏先不閱讀了。

handleFailedTask的話,隻要TaskSetManager不是zombie,task沒有被kill,那麼會繼續調用backend.reviveOffers()來re-run。



全文完 :)


最後更新:2017-04-03 12:56:20

  上一篇:go Spark Core源碼分析: RDD基礎
  下一篇:go Spark Core源碼分析: Spark任務模型