Spark Core源碼分析: Spark任務模型
概述
一個Spark的Job分為多個stage,最後一個stage會包括一個或多個ResultTask,前麵的stages會包括一個或多個ShuffleMapTasks。
ResultTask執行並將結果返回給driver application。
ShuffleMapTask將task的output根據task的partition分離到多個buckets裏。一個ShuffleMapTask對應一個ShuffleDependency的partition,而總partition數同並行度、reduce數目是一致的。
Task
Task的代碼在scheduler package下。
抽象類Task構造參數如下:
private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) extends Serializable
Task對應一個stageId和partitionId。
提供runTask()接口、kill()接口等。
提供killed變量、TaskMetrics變量、TaskContext變量等。
除了上述基本接口和變量,Task的伴生對象提供了序列化和反序列化應用依賴的jar包的方法。原因是Task需要保證工作節點具備本次Task需要的其他依賴,注冊到SparkContext下,所以提供了把依賴轉成流寫入寫出的方法。
Task的兩種實現
ShuffleMapTask
ShuffleMapTask構造參數如下,
private[spark] class ShuffleMapTask( stageId: Int, var rdd: RDD[_], var dep: ShuffleDependency[_,_], _partitionId: Int, @transient private var locs: Seq[TaskLocation]) extends Task[MapStatus](stageId, _partitionId)
RDD partitioner對應的是ShuffleDependency。
ShuffleMapTask複寫了MapStatus向外讀寫的方法,因為向外讀寫的內容包括:stageId,rdd,dep,partitionId,epoch和split(某個partition)。對於其中的stageId,rdd,dep有統一的序列化和反序列化操作並會cache在內存裏,再放到ObjectOutput裏寫出去。序列化操作使用的是Gzip,序列化信息會維護在serializedInfoCache = newHashMap[Int, Array[Byte]]。這部分需要序列化並保存的原因是:stageId,rdd,dep真正代表了本次Shuffle Task的信息,為了減輕master節點負擔,把這部分序列化結果cache了起來。
Stage執行邏輯
主要步驟如下:
val ser = Serializer.getSerializer(dep.serializer) shuffle = shuffleBlockManager.forMapTask(dep.shuffleId, partitionId, numOutputSplits, ser)
這一步是初始化一個ShuffleWriterGroup,Group裏麵是一個BlockObjectWriter數組。
for (elem <- rdd.iterator(split, context)) { val pair = elem.asInstanceOf[Product2[Any, Any]] val bucketId = dep.partitioner.getPartition(pair._1) shuffle.writers(bucketId).write(pair) }
這一步是為每個Writer對應一個bucket,調用每個BlockObjectWriter的write()方法寫數據
var totalBytes = 0L var totalTime = 0L val compressedSizes: Array[Byte] = shuffle.writers.map { writer: BlockObjectWriter => writer.commit() writer.close() val size = writer.fileSegment().length totalBytes += size totalTime += writer.timeWriting() MapOutputTracker.compressSize(size) }
這一步是執行writer.commit(),並得到結果file segment大小,對總大小壓縮
val shuffleMetrics = new ShuffleWriteMetrics shuffleMetrics.shuffleBytesWritten = totalBytes shuffleMetrics.shuffleWriteTime = totalTime metrics.get.shuffleWriteMetrics = Some(shuffleMetrics) success = true new MapStatus(blockManager.blockManagerId, compressedSizes)
這一步是記錄metrcis信息,最後返回一個MapStatus類,裏麵是本地ShuffleMapTask結果的相關信息。
最後會release writers,讓對應的shuffle文件得到記錄和重用(ShuffleBlockManager管理這些file,這些file是Shuffle Task中一組Writer寫的對象)。
主要把下圖看懂。
重要類
介紹涉及到的重要外部類,幫助理解。
ShuffleBlockManager
整體梳理:
ShuffleState維護了兩個ShuffleFileGroup的ConcurrentLinkedQueue,以記錄目前shuffle的state。
ShuffleState記錄了一次shuffle操作的文件組狀態,在ShuffleBlockManager內用Map為每個shuffleId維護了一個ShuffleState。
每個shuffleId通過forMapTask()方法得到一組writer,即ShuflleWriterGroup。這組裏的writers共享一個shuffleId和mapId,但是每個對應不同的bucketId和file。在為writer分配FileGroup的時候,會從shuffleId對應的shuffle state裏先取unusedFileGroup,如果不存在,則在HDFS上新建File。
對於HDFS上的目標file,writer是可以append寫的。在新建file的時候,是根據shuffleId和bucket number和一個遞增的fileId來創建新的文件的。
ShuffleFileGroup的重用files和記錄mapId,index,offset這塊似懂非懂。
重要方法:
def forMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer: Serializer) = { new ShuffleWriterGroup {} }
該方法被一個ShuffleMapTask調用,傳入了這次shuffle操作的id,mapId是partitionId。Buckects數目等於分區數目。該方法返回的ShuffleWriterGroup裏麵是一組DiskBlockObjectWriter,每一個writer都屬於這一次shuffle操作,所以他們有共同的shuffleId,mapId,但是他們對應了不同的bucket,並且各自對應一個file。
在shuffle run裏的調用和參數傳入:
val ser = Serializer.getSerializer(dep.serializer) shuffle = shuffleBlockManager.forMapTask(dep.shuffleId, partitionId, numOutputSplits, ser)
shuffleId是由ShuffleDependency獲得的全局唯一id,代表本次shuffle任務id
mapId等於partitionId
Bucket數目等於分區數目
產生writers:
Writer類型是DiskBlockObjectWriter,數目等於buckets數目。bufferSize的設置:
conf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024
blockId產生自:
blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
在生成writer的時候調用的是BlockManager的getDiskWriter方法,ShuffleBlockManager初始化的時候綁定BlockManager。
private[spark] class DiskBlockObjectWriter( blockId: BlockId, file: File, serializer: Serializer, bufferSize: Int, compressStream: OutputStream => OutputStream, syncWrites: Boolean) extends BlockObjectWriter(blockId)
ShuffleFileGroup:私有內部類,對應了一組shuffle files,每個file對應一個reducer。一個Mapper會分到一個ShuffleFileGroup,把mapper的結果寫到這組File裏去。
MapStatus
注意到ShuffleMapTask的類型是MapStatus類。MapStatus類是ShuffleMapTask要返回給scheduler的執行結果,包括兩個東西:
class MapStatus(var location: BlockManagerId, var compressedSizes: Array[Byte])
前者是run這次task的block manager地址(BlockManagerId是一個類,保存了executorId,host, port, nettyPort),後者是output大小,該值會傳給接下來的reduce任務。該size是被MapOutputTracker壓縮過的。
MapStatus類提供了兩個方法如下,ShuffleMapTask進行了複寫。
def writeExternal(out: ObjectOutput) { location.writeExternal(out) out.writeInt(compressedSizes.length) out.write(compressedSizes) } def readExternal(in: ObjectInput) { location = BlockManagerId(in) compressedSizes = new Array[Byte](in.readInt()) in.readFully(compressedSizes) }
BlockManagerId
BlockManagerId類構造依賴executorId, host, port, nettyPort這些信息。伴生對象維護了一個blockManagerIdCache ,實現為ConcurrentHashMap[BlockManagerId,BlockManagerId]() 。
比如MapStatus的readExternal方法把ObjectInput傳入BlockManagerId構造函數的時候,BlockManagerId的apply()方法就會根據ObjectInput取出executorId, host, port,nettyPort信息,把這個BlockManagerIdobj維護到blockManagerIdCache內
ResultTask
構造參數
private[spark] class ResultTask[T, U]( stageId: Int, var rdd: RDD[T], var func: (TaskContext, Iterator[T]) => U, _partitionId: Int, @transient locs: Seq[TaskLocation], var outputId: Int) extends Task[U](stageId, _partitionId) with Externalizable {
ResultTask比較簡單,runTask方法調用的是rdd的迭代器:
override def runTask(context: TaskContext): U = { metrics = Some(context.taskMetrics) try { func(context, rdd.iterator(split, context)) } finally { context.executeOnCompleteCallbacks() } }
進程模型 vs. 線程模型
Spark同節點上的任務以多線程的方式運行在一個JVM進程中。
優點:
啟動任務快
共享內存,適合內存密集型任務
Executor所占資源可重複利用
缺點:
同節點上的所有任務運行在一個進程中,會出現嚴重的資源爭用,難以細粒度控製每個任務的占用資源。MapReduce為Map Task和Reduce Task設置不同資源,細粒度控製任務占用資源量。
MapReduce的每個Task都是一個JVM進程,都要經曆:資源申請->運行任務->釋放資源的過程
每個節點可以有一個或多個Executor,Executor配有一定數量slots,Executor內可以跑多個Result Task和ShuffleMap Task。
在共享內存方麵,broadcast的變量會在每個executor裏存一份,這個executor內的任務可以共享。
最後更新:2017-04-03 12:56:20