閱讀844 返回首頁    go 阿裏雲 go 技術社區[雲棲]


Spark Core源碼分析: RDD基礎

RDD

RDD初始參數:上下文和一組依賴

abstract class RDD[T: ClassTag](
    @transient private var sc: SparkContext,
    @transient private var deps: Seq[Dependency[_]]
  ) extends Serializable

以下需要仔細理清:

A list of Partitions

Function to compute split (sub RDD impl)

A list of Dependencies

Partitioner for K-V RDDs (Optional)

Preferred locations to compute each spliton (Optional)


Dependency

Dependency代表了RDD之間的依賴關係,即血緣


RDD中的使用

RDD給子類提供了getDependencies方法來製定如何依賴父類RDD

protected def getDependencies: Seq[Dependency[_]] = deps

事實上,在獲取first parent的時候,子類經常會使用下麵這個方法

protected[spark] def firstParent[U: ClassTag] = {
  dependencies.head.rdd.asInstanceOf[RDD[U]]
}

可以看到,Seq裏的第一個dependency應該是直接的parent,從而從第一個dependency類裏獲得了rdd,這個rdd就是父RDD。


一般的RDD子類都會這麼實現compute和getPartition方法,以SchemaRDD舉例:

override def compute(split: Partition, context: TaskContext): Iterator[Row] =
    firstParent[Row].compute(split, context).map(_.copy())

override def getPartitions: Array[Partition] = firstParent[Row].partitions

compute()方法調用了第一個父類的compute,把結果RDD copy返回

getPartitions返回的就是第一個父類的partitions

 

下麵看一下Dependency類及其子類的實現。


寬依賴和窄依賴

abstract class Dependency[T](val rdd: RDD[T]) extends Serializable

Dependency裏傳入的rdd,就是父RDD本身。

繼承結構如下:



NarrowDependency代表窄依賴,即父RDD的分區,最多被子RDD的一個分區使用。所以支持並行計算。

子類需要實現方法:

def getParents(partitionId: Int): Seq[Int]

OneToOneDependency表示父RDD和子RDD的分區依賴是一對一的。

 

RangeDependency表示在一個range範圍內,依賴關係是一對一的,所以初始化的時候會有一個範圍,範圍外的partitionId,傳進去之後返回的是Nil。


下麵介紹寬依賴。

class ShuffleDependency[K, V](
    @transient rdd: RDD[_ <: Product2[K, V]],
    val partitioner: Partitioner,
    val serializer: Serializer = null)
  extends Dependency(rdd.asInstanceOf[RDD[Product2[K, V]]]) {

  // 上下文增量定義的Id
  val shuffleId: Int = rdd.context.newShuffleId()

  // ContextCleaner的作用和實現在SparkContext章節敘述
  rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
}

寬依賴針對的RDD是KV形式的,需要一個partitioner指定分區方式(下一節介紹),需要一個序列化工具類,序列化工具目前的實現如下:



寬依賴和窄依賴對失敗恢複時候的recompute有不同程度的影響,寬依賴可能是要全部計算的。


Partition

Partition具體表示RDD每個數據分區。

Partition提供trait類,內含一個index和hashCode()方法,具體子類實現與RDD子類有關,種類如下:


在分析每個RDD子類的時候再涉及。


Partitioner

Partitioner決定KV形式的RDD如何根據key進行partition


abstract class Partitioner extends Serializable {
  def numPartitions: Int // 總分區數
  def getPartition(key: Any): Int
}

在ShuffleDependency裏對應一個Partitioner,來完成寬依賴下,子RDD如何獲取父RDD。


默認Partitioner

Partitioner的伴生對象提供defaultPartitioner方法,邏輯為:

傳入的RDD(至少兩個)中,遍曆(順序是partition數目從大到小)RDD,如果已經有Partitioner了,就使用。如果RDD們都沒有Partitioner,則使用默認的HashPartitioner。而HashPartitioner的初始化partition數目,取決於是否設置了spark.default.parallelism,如果沒有的話就取RDD中partition數目最大的值。

如果上麵這段文字看起來費解,代碼如下:

  def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
    val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse
    for (r <- bySize if r.partitioner.isDefined) {
      return r.partitioner.get
    }
    if (rdd.context.conf.contains("spark.default.parallelism")) {
      new HashPartitioner(rdd.context.defaultParallelism)
    } else {
      new HashPartitioner(bySize.head.partitions.size)
    }
  }


HashPartitioner

HashPartitioner基於java的Object.hashCode。會有個問題是Java的Array有自己的hashCode,不基於Array裏的內容,所以RDD[Array[_]]或RDD[(Array[_], _)]使用HashPartitioner會有問題。

 

顧名思義,getPartition方法實現如下

  def getPartition(key: Any): Int = key match {
    case null => 0
    case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
  }

RangePartitioner

RangePartitioner處理的KV RDD要求Key是可排序的,即滿足Scala的Ordered[K]類型。所以它的構造如下:

class RangePartitioner[K <% Ordered[K]: ClassTag, V](
    partitions: Int,
    @transient rdd: RDD[_ <: Product2[K,V]],
    private val ascending: Boolean = true)
  extends Partitioner {

內部會計算一個rangBounds(上界),在getPartition的時候,如果rangBoundssize小於1000,則逐個遍曆獲得;否則二分查找獲得partitionId。


Persist

默認cache()過程是將RDD persist在內存裏,persist()操作可以為RDD重新指定StorageLevel,

class StorageLevel private(
    private var useDisk_ : Boolean,
    private var useMemory_ : Boolean,
    private var useOffHeap_ : Boolean,
    private var deserialized_ : Boolean,
    private var replication_ : Int = 1)

object StorageLevel {
  val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(false, false, true, false) // Tachyon

RDD的persist()和unpersist()操作,都是由SparkContext執行的(SparkContext的persistRDD和unpersistRDD方法)。

 

Persist過程是把該RDD存在上下文的TimeStampedWeakValueHashMap裏維護起來。也就是說,其實persist並不是action,並不會觸發任何計算。

Unpersist過程如下,會交給SparkEnv裏的BlockManager處理。

  private[spark] def unpersistRDD(rddId: Int, blocking: Boolean = true) {
    env.blockManager.master.removeRdd(rddId, blocking)
    persistentRdds.remove(rddId)
    listenerBus.post(SparkListenerUnpersistRDD(rddId))
  }

Checkpoint

RDD Actions api裏提供了checkpoint()方法,會把本RDD save到SparkContext CheckpointDir

目錄下。建議該RDD已經persist在內存中,否則需要recomputation。

 

如果該RDD沒有被checkpoint過,則會生成新的RDDCheckpointData。RDDCheckpointData類與一個RDD關聯,記錄了checkpoint相關的信息,並且記錄checkpointRDD的一個狀態,

[ Initialized --> marked for checkpointing--> checkpointing in progress --> checkpointed ]

內部有一個doCheckpoint()方法(會被下麵調用)。


執行邏輯

真正的checkpoint觸發,在RDD私有方法doCheckpoint()裏。doCheckpoint()會被DAGScheduler調用,且是在此次job裏使用這個RDD完畢之後,此時這個RDD就已經被計算或者物化過了。可以看到,會對RDD的父RDD進行遞歸。

  private[spark] def doCheckpoint() {
    if (!doCheckpointCalled) {
      doCheckpointCalled = true
      if (checkpointData.isDefined) {
        checkpointData.get.doCheckpoint()
      } else {
        dependencies.foreach(_.rdd.doCheckpoint())
      }
    }
  }

RDDCheckpointData的doCheckpoint()方法關鍵代碼如下:
// Create the output path for the checkpoint
val path = new Path(rdd.context.checkpointDir.get, "rdd-" + rdd.id)
val fs = path.getFileSystem(rdd.context.hadoopConfiguration)
if (!fs.mkdirs(path)) {
  throw new SparkException("Failed to create checkpoint path " + path)
}

// Save to file, and reload it as an RDD
val broadcastedConf = rdd.context.broadcast(
  new SerializableWritable(rdd.context.hadoopConfiguration))
// 這次runJob最終調的是dagScheduler的runJob
rdd.context.runJob(rdd, 
CheckpointRDD.writeToFile(path.toString, broadcastedConf) _)
// 此時rdd已經記錄到磁盤上
val newRDD = new CheckpointRDD[T](rdd.context, path.toString)
if (newRDD.partitions.size != rdd.partitions.size) {
  throw new SparkException("xxx")
}

runJob最終調的是dagScheduler的runJob。做完後,生成一個CheckpointRDD。

具體CheckpointRDD相關內容可以參考其他章節。


API

子類需要實現的方法

// 計算某個分區
def compute(split: Partition, context: TaskContext): Iterator[T]

protected def getPartitions: Array[Partition]
// 依賴的父RDD,默認就是返回整個dependency序列
protected def getDependencies: Seq[Dependency[_]] = deps

protected def getPreferredLocations(split: Partition): Seq[String] = Nil

Transformations

略。

Actions

略。


SubRDDs

部分RDD子類的實現分析,包括以下幾個部分:

1)  子類本身構造參數

2)  子類的特殊私有變量

3)  子類的Partitioner實現

4)  子類的父類函數實現

def compute(split: Partition, context: TaskContext): Iterator[T]
protected def getPartitions: Array[Partition]
protected def getDependencies: Seq[Dependency[_]] = deps
protected def getPreferredLocations(split: Partition): Seq[String] = Nil

CheckpointRDD

class CheckpointRDD[T: ClassTag](sc: SparkContext, val checkpointPath: String)
  extends RDD[T](sc, Nil)

CheckpointRDDPartition繼承自Partition,沒有什麼增加。


有一個被廣播的hadoop conf變量,在compute方法裏使用(readFromFile的時候用)

val broadcastedConf = sc.broadcast(
new SerializableWritable(sc.hadoopConfiguration))

getPartitions: Array[Partition]方法:

根據checkpointPath去查看Path下有多少個partitionFile,File個數為partition數目。getPartitions方法返回的Array[Partition]內容為New CheckpointRDDPartition(i),i為[0, 1, …, partitionNum]

 

getPreferredLocations(split:Partition): Seq[String]方法:

文件位置信息,借助hadoop core包,獲得block location,把得到的結果按照host打散(flatMap)並過濾掉localhost,返回。

 

compute(split: Partition, context:TaskContext): Iterator[T]方法:

調用CheckpointRDD.readFromFile(file, broadcastedConf,context)方法,其中file為hadoopfile path,conf為廣播過的hadoop conf。


Hadoop文件讀寫及序列化

伴生對象提供writeToFile方法和readFromFile方法,主要用於讀寫hadoop文件,並且利用env下的serializer進行序列化和反序列化工作。兩個方法具體實現如下:

def writeToFile[T](
 path: String,
 broadcastedConf: Broadcast[SerializableWritable[Configuration]],
 blockSize: Int = -1
)(ctx: TaskContext, iterator: Iterator[T]) {

創建hadoop文件的時候會若存在會拋異常。把hadoop的outputStream放入serializer的stream裏,serializeStream.writeAll(iterator)寫入。

 

writeToFile的調用在RDDCheckpointData類的doCheckpoint方法裏,如下:

rdd.context.runJob(rdd, 
CheckpointRDD.writeToFile(path.toString, broadcastedConf) _)

def readFromFile[T](
  path: Path,
  broadcastedConf: Broadcast[SerializableWritable[Configuration]],
  context: TaskContext
): Iterator[T] = {

打開Hadoop的inutStream,讀取的時候使用env下的serializer得到反序列化之後的流。返回的時候,DeserializationStream這個trait提供了asIterator方法,每次next操作可以進行一次readObject。

在返回之前,調用了TaskContext提供的addOnCompleteCallback回調,用於關閉hadoop的inputStream。


NewHadoopRDD

class NewHadoopRDD[K, V](
    sc : SparkContext,
    inputFormatClass: Class[_ <: InputFormat[K, V]],
    keyClass: Class[K],
    valueClass: Class[V],
    @transient conf: Configuration)
  extends RDD[(K, V)](sc, Nil)
  with SparkHadoopMapReduceUtil

private[spark] class NewHadoopPartition(
    rddId: Int,
    val index: Int,
    @transient rawSplit: InputSplit with Writable)
  extends Partition {

  val serializableHadoopSplit = new SerializableWritable(rawSplit)

  override def hashCode(): Int = 41 * (41 + rddId) + index
}

getPartitions操作:

根據inputFormatClass和conf,通過hadoop InputFormat實現類的getSplits(JobContext)方法得到InputSplits。(ORCFile在此處的優化)

這樣獲得的split同RDD的partition直接對應。

 

compute操作:

針對本次split(partition),調用InputFormat的createRecordReader(split)方法,

得到RecordReader<K,V>。這個RecordReader包裝在Iterator[(K,V)]類內,複寫Iterator的next()和hasNext方法,讓compute返回的InterruptibleIterator[(K,V)]能夠被迭代獲得RecordReader取到的數據。


getPreferredLocations(split: Partition)操作:

theSplit.serializableHadoopSplit.value.getLocations.filter(_ != "localhost")

在NewHadoopPartition裏SerializableWritable將split序列化,然後調用InputSplit本身的getLocations接口,得到有數據分布節點的nodes name列表。


WholeTextFileRDD

NewHadoopRDD的子類

private[spark] class WholeTextFileRDD(
    sc : SparkContext,
    inputFormatClass: Class[_ <: WholeTextFileInputFormat],
    keyClass: Class[String],
    valueClass: Class[String],
    @transient conf: Configuration,
    minSplits: Int)
  extends NewHadoopRDD[String, String](sc, inputFormatClass, keyClass, valueClass, conf) {

複寫了getPartitions方法:

NewHadoopRDD有自己的inputFormat實現類和recordReader實現類。在spark/input package下專門寫了這兩個類的實現。感覺是種參考。


InputFormat

WholeTextFileRDD在spark裏實現了自己的inputFormat。讀取的File以K,V的結構獲取,K為path,V為整個file的content。

 

複寫createRecordReader以使用WholeTextFileRecordReader

 

複寫setMaxSplitSize方法,由於用戶可以傳入minSplits數目,計算平均大小(splits files總大小除以split數目)的時候就變了。


RecordReader

複寫nextKeyValue方法,會讀出指定path下的file的內容,生成new Text()給value,結果是String。如果文件正在被別的進行打開著,會返回false。否則把file內容讀進value裏。


使用場景

在SparkContext下提供wholeTextFile方法,

def wholeTextFiles(path: String, minSplits: Int = defaultMinSplits):
  RDD[(String, String)]

用於讀取一個路徑下的所有text文件,以K,V的形式返回,K為一個文件的path,V為文件內容。比較適合小文件。



全文完  :)



最後更新:2017-04-03 12:56:20

  上一篇:go Android開發小白日記2 (20 Apr) 關於Fragment
  下一篇:go Spark Core源碼分析: Spark任務執行模型