閱讀354 返回首頁    go 阿裏雲 go 技術社區[雲棲]


Spark Core源碼分析: Spark任務模型

概述

一個Spark的Job分為多個stage,最後一個stage會包括一個或多個ResultTask,前麵的stages會包括一個或多個ShuffleMapTasks。

ResultTask執行並將結果返回給driver application。

ShuffleMapTask將task的output根據task的partition分離到多個buckets裏。一個ShuffleMapTask對應一個ShuffleDependency的partition,而總partition數同並行度、reduce數目是一致的。


Task

Task的代碼在scheduler package下。

抽象類Task構造參數如下:

private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) extends Serializable

Task對應一個stageId和partitionId。

提供runTask()接口、kill()接口等。

提供killed變量、TaskMetrics變量、TaskContext變量等。

除了上述基本接口和變量,Task的伴生對象提供了序列化和反序列化應用依賴的jar包的方法。原因是Task需要保證工作節點具備本次Task需要的其他依賴,注冊到SparkContext下,所以提供了把依賴轉成流寫入寫出的方法。


Task的兩種實現


ShuffleMapTask

ShuffleMapTask構造參數如下,

private[spark] class ShuffleMapTask(
    stageId: Int,
    var rdd: RDD[_],
    var dep: ShuffleDependency[_,_],
    _partitionId: Int,
    @transient private var locs: Seq[TaskLocation])
  extends Task[MapStatus](stageId, _partitionId)

RDD partitioner對應的是ShuffleDependency。

 

ShuffleMapTask複寫了MapStatus向外讀寫的方法,因為向外讀寫的內容包括:stageId,rdd,dep,partitionId,epoch和split(某個partition)。對於其中的stageId,rdd,dep有統一的序列化和反序列化操作並會cache在內存裏,再放到ObjectOutput裏寫出去。序列化操作使用的是Gzip,序列化信息會維護在serializedInfoCache = newHashMap[Int, Array[Byte]]。這部分需要序列化並保存的原因是:stageId,rdd,dep真正代表了本次Shuffle Task的信息,為了減輕master節點負擔,把這部分序列化結果cache了起來。


Stage執行邏輯

主要步驟如下:

val ser = Serializer.getSerializer(dep.serializer)
shuffle = shuffleBlockManager.forMapTask(dep.shuffleId, partitionId, numOutputSplits, ser)

這一步是初始化一個ShuffleWriterGroup,Group裏麵是一個BlockObjectWriter數組。


for (elem <- rdd.iterator(split, context)) {
val pair = elem.asInstanceOf[Product2[Any, Any]]
  val bucketId = dep.partitioner.getPartition(pair._1)
  shuffle.writers(bucketId).write(pair)
}

這一步是為每個Writer對應一個bucket,調用每個BlockObjectWriter的write()方法寫數據


var totalBytes = 0L
var totalTime = 0L
val compressedSizes: Array[Byte] = 
shuffle.writers.map { writer: BlockObjectWriter =>
    writer.commit()
    writer.close()
val size = writer.fileSegment().length
    totalBytes += size
totalTime += writer.timeWriting()
MapOutputTracker.compressSize(size)
}

這一步是執行writer.commit(),並得到結果file segment大小,對總大小壓縮


val shuffleMetrics = new ShuffleWriteMetrics
shuffleMetrics.shuffleBytesWritten = totalBytes
shuffleMetrics.shuffleWriteTime = totalTime
metrics.get.shuffleWriteMetrics = Some(shuffleMetrics)

success = true
new MapStatus(blockManager.blockManagerId, compressedSizes)

這一步是記錄metrcis信息,最後返回一個MapStatus類,裏麵是本地ShuffleMapTask結果的相關信息。

 

最後會release writers,讓對應的shuffle文件得到記錄和重用(ShuffleBlockManager管理這些file,這些file是Shuffle Task中一組Writer寫的對象)。

主要把下圖看懂。


重要類

介紹涉及到的重要外部類,幫助理解。


ShuffleBlockManager

整體梳理:

ShuffleState維護了兩個ShuffleFileGroup的ConcurrentLinkedQueue,以記錄目前shuffle的state。

ShuffleState記錄了一次shuffle操作的文件組狀態,在ShuffleBlockManager內用Map為每個shuffleId維護了一個ShuffleState。

每個shuffleId通過forMapTask()方法得到一組writer,即ShuflleWriterGroup。這組裏的writers共享一個shuffleId和mapId,但是每個對應不同的bucketId和file。在為writer分配FileGroup的時候,會從shuffleId對應的shuffle state裏先取unusedFileGroup,如果不存在,則在HDFS上新建File。

對於HDFS上的目標file,writer是可以append寫的。在新建file的時候,是根據shuffleId和bucket number和一個遞增的fileId來創建新的文件的。


ShuffleFileGroup的重用files和記錄mapId,index,offset這塊似懂非懂。

 

重要方法:

def forMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer: Serializer) = { new ShuffleWriterGroup {} }

該方法被一個ShuffleMapTask調用,傳入了這次shuffle操作的id,mapId是partitionId。Buckects數目等於分區數目。該方法返回的ShuffleWriterGroup裏麵是一組DiskBlockObjectWriter,每一個writer都屬於這一次shuffle操作,所以他們有共同的shuffleId,mapId,但是他們對應了不同的bucket,並且各自對應一個file。

 

在shuffle run裏的調用和參數傳入:

val ser = Serializer.getSerializer(dep.serializer)
shuffle = shuffleBlockManager.forMapTask(dep.shuffleId, partitionId, numOutputSplits, ser)

shuffleId是由ShuffleDependency獲得的全局唯一id,代表本次shuffle任務id

mapId等於partitionId

Bucket數目等於分區數目

 

產生writers:

Writer類型是DiskBlockObjectWriter,數目等於buckets數目。bufferSize的設置:

conf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024

blockId產生自:

blockId = ShuffleBlockId(shuffleId, mapId, bucketId)

在生成writer的時候調用的是BlockManager的getDiskWriter方法,ShuffleBlockManager初始化的時候綁定BlockManager。

private[spark] class DiskBlockObjectWriter(
    blockId: BlockId,
    file: File,
    serializer: Serializer,
    bufferSize: Int,
    compressStream: OutputStream => OutputStream,
    syncWrites: Boolean)
  extends BlockObjectWriter(blockId)


ShuffleFileGroup:私有內部類,對應了一組shuffle files,每個file對應一個reducer。一個Mapper會分到一個ShuffleFileGroup,把mapper的結果寫到這組File裏去。

MapStatus

注意到ShuffleMapTask的類型是MapStatus類。MapStatus類是ShuffleMapTask要返回給scheduler的執行結果,包括兩個東西:

class MapStatus(var location: BlockManagerId, var compressedSizes: Array[Byte])

前者是run這次task的block manager地址(BlockManagerId是一個類,保存了executorId,host, port, nettyPort),後者是output大小,該值會傳給接下來的reduce任務。該size是被MapOutputTracker壓縮過的。


MapStatus類提供了兩個方法如下,ShuffleMapTask進行了複寫。

  def writeExternal(out: ObjectOutput) {
    location.writeExternal(out)
    out.writeInt(compressedSizes.length)
    out.write(compressedSizes)
  }

  def readExternal(in: ObjectInput) {
    location = BlockManagerId(in)
    compressedSizes = new Array[Byte](in.readInt())
    in.readFully(compressedSizes)
  }

BlockManagerId

BlockManagerId類構造依賴executorId, host, port, nettyPort這些信息。伴生對象維護了一個blockManagerIdCache ,實現為ConcurrentHashMap[BlockManagerId,BlockManagerId]() 。

比如MapStatus的readExternal方法把ObjectInput傳入BlockManagerId構造函數的時候,BlockManagerId的apply()方法就會根據ObjectInput取出executorId, host, port,nettyPort信息,把這個BlockManagerIdobj維護到blockManagerIdCache


ResultTask

構造參數

private[spark] class ResultTask[T, U](
    stageId: Int,
    var rdd: RDD[T],
    var func: (TaskContext, Iterator[T]) => U,
    _partitionId: Int,
    @transient locs: Seq[TaskLocation],
    var outputId: Int)
  extends Task[U](stageId, _partitionId) with Externalizable {

ResultTask比較簡單,runTask方法調用的是rdd的迭代器:

  override def runTask(context: TaskContext): U = {
    metrics = Some(context.taskMetrics)
    try {
      func(context, rdd.iterator(split, context))
    } finally {
      context.executeOnCompleteCallbacks()
    }
  }

進程模型 vs. 線程模型

Spark同節點上的任務以多線程的方式運行在一個JVM進程中。

 

優點:

啟動任務快

共享內存,適合內存密集型任務

Executor所占資源可重複利用

 

缺點:

同節點上的所有任務運行在一個進程中,會出現嚴重的資源爭用,難以細粒度控製每個任務的占用資源。MapReduce為Map Task和Reduce Task設置不同資源,細粒度控製任務占用資源量。

 

MapReduce的每個Task都是一個JVM進程,都要經曆:資源申請->運行任務->釋放資源的過程

 

每個節點可以有一個或多個Executor,Executor配有一定數量slots,Executor內可以跑多個Result Task和ShuffleMap Task。

在共享內存方麵,broadcast的變量會在每個executor裏存一份,這個executor內的任務可以共享。




全文完 :)



最後更新:2017-04-03 12:56:20

  上一篇:go Spark Core源碼分析: Spark任務執行模型
  下一篇:go 九度題目1091:棋盤遊戲