閱讀406 返回首頁    go 阿裏雲 go 技術社區[雲棲]


spark源碼分析之Checkpoint的過程

概述

checkpoint 的機製保證了需要訪問重複數據的應用 Spark 的DAG執行行圖可能很龐大,task 中計算鏈可能會很長,這時如果 task 中途運行出錯,那麼 task 的整個需要重算非常耗時,因此,有必要將計算代價較大的 RDD checkpoint 一下,當下遊 RDD 計算出錯時,可以直接從 checkpoint 過的 RDD 那裏讀取數據繼續算。

我們先來看一個例子,checkpoint的使用

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object CheckPointTest {

   def main(args: Array[String]) {

    val sc: SparkContext = SparkContext.getOrCreate(new   SparkConf().setAppName("ck").setMaster("local[2]"))
    sc.setCheckpointDir("/Users/kinge/ck")

    val rdd: RDD[(String, Int)] = sc.textFile("").map{x=>(x,1) }.reduceByKey(_+_)
    rdd.checkpoint()

    rdd.count()
    rdd.groupBy(x=>x._2).collect().foreach(println)
   }
}

checkpoint流程分析

checkpoint初始化

我們可以看到最先調用了SparkContextsetCheckpointDir 設置了一個checkpoint 目錄
我們跟進這個方法看一下

  /**
   * Set the directory under which RDDs are going to be checkpointed. The directory must
   * be a HDFS path if running on a cluster.
   */
  def setCheckpointDir(directory: String) {

    // If we are running on a cluster, log a warning if the directory is local.
    // Otherwise, the driver may attempt to reconstruct the checkpointed RDD from
    // its own local file system, which is incorrect because the checkpoint files
    // are actually on the executor machines.
    if (!isLocal && Utils.nonLocalPaths(directory).isEmpty) {
      logWarning("Checkpoint directory must be non-local " +
        "if Spark is running on a cluster: " + directory)
    }

   //利用hadoop的api創建了一個hdfs目錄
    checkpointDir = Option(directory).map { dir =>
      val path = new Path(dir, UUID.randomUUID().toString)
      val fs = path.getFileSystem(hadoopConfiguration)
      fs.mkdirs(path)
      fs.getFileStatus(path).getPath.toString
    }
  }

這個方法挺簡單的,就創建了一個目錄,接下來我們看RDD核心的checkpoint 方法,跟進去

  def checkpoint(): Unit = RDDCheckpointData.synchronized {
    if (context.checkpointDir.isEmpty) {
      throw new SparkException("Checkpoint directory has not been set in the SparkContext")
    } else if (checkpointData.isEmpty) {
      checkpointData = Some(new ReliableRDDCheckpointData(this))
    }
  }

這個方法沒有返回值,邏輯隻有一個判斷,checkpointDir剛才設置過了,不為空,然後創建了一個ReliableRDDCheckpointData,我們來看ReliableRDDCheckpointData

/**
 * An implementation of checkpointing that writes the RDD data to reliable storage.
 * This allows drivers to be restarted on failure with previously computed state.
 */
private[spark] class ReliableRDDCheckpointData[T: ClassTag](@transient rdd: RDD[T])
  extends RDDCheckpointData[T](rdd) with Logging {
   。。。。。
}

這個ReliableRDDCheckpointData的父類RDDCheckpointData我們再繼續看它的父類

/**
*   RDD 需要經過
*    [ Initialized  --> CheckpointingInProgress--> Checkpointed ] 
*    這幾個階段才能被 checkpoint。
*/

private[spark] object CheckpointState extends Enumeration {
  type CheckpointState = Value
  val Initialized, CheckpointingInProgress, Checkpointed = Value
}

private[spark] abstract class RDDCheckpointData[T: ClassTag](@transient rdd: RDD[T])
  extends Serializable {

  import CheckpointState._

  // The checkpoint state of the associated RDD.
  protected var cpState = Initialized

  。。。。。。
}

RDD 需要經過
[ Initialized --> CheckpointingInProgress--> Checkpointed ]
這幾個階段才能被 checkpoint。
這類裏麵有一個枚舉來標識CheckPoint的狀態,第一次初始化時是Initialized。
checkpoint這個一步已經完成了,回到我們的RDD成員變量裏checkpointData這個變量指向的RDDCheckpointData的實例。

Checkpoint初始化時序圖

myuml__Collaboration1__Interaction1___0.jpg

checkpoint什麼時候寫入數據

我們知道一個spark job運行最終會調用SparkContextrunJob方法將任務提交給Executor去執行,我們來看runJob

  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      resultHandler: (Int, U) => Unit): Unit = {
    if (stopped.get()) {
      throw new IllegalStateException("SparkContext has been shutdown")
    }
    val callSite = getCallSite
    val cleanedFunc = clean(func)
    logInfo("Starting job: " + callSite.shortForm)
    if (conf.getBoolean("spark.logLineage", false)) {
      logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
    }
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
    progressBar.foreach(_.finishAll())
    rdd.doCheckpoint()
  }

最後一行代碼調用了doCheckpoint,在dagScheduler將任務提交給集群運行之後,我來看這個doCheckpoint方法

  private[spark] def doCheckpoint(): Unit = {
    RDDOperationScope.withScope(sc, "checkpoint", allowNesting = false, ignoreParent = true) {
      if (!doCheckpointCalled) {
        doCheckpointCalled = true
        if (checkpointData.isDefined) {
          checkpointData.get.checkpoint()
        } else {
          //遍曆依賴的rdd,調用每個rdd的doCheckpoint方法
          dependencies.foreach(_.rdd.doCheckpoint())
        }
      }
    }
  }

這個是一個遞歸,遍曆RDD依賴鏈條,當rdd是checkpointData不為空時,調用checkpointDatacheckpoint()方法。還記得checkpointData類型是什麼嗎?就是RDDCheckpointData ,我們來看它的checkpoint方法,以下

  final def checkpoint(): Unit = {
    // Guard against multiple threads checkpointing the same RDD by
    // atomically flipping the state of this RDDCheckpointData
    RDDCheckpointData.synchronized {
      if (cpState == Initialized) {

       //1、標記當前狀態為正在checkpoint中
        cpState = CheckpointingInProgress
      } else {
        return
      }
    }

  //2 這裏調用的是子類的doCheckpoint()
    val newRDD = doCheckpoint()

    // 3 標記checkpoint已完成,清空RDD依賴
    RDDCheckpointData.synchronized {
      cpRDD = Some(newRDD)
      cpState = Checkpointed
      rdd.markCheckpointed()
    }
  }

這個方法開始做checkpoint操作了,將doCheckpoint交給子類去實現checkponit的邏輯,我們去看子類怎麼實現doCheckpoint

  protected override def doCheckpoint(): CheckpointRDD[T] = {

    // Create the output path for the checkpoint
    val path = new Path(cpDir)
    val fs = path.getFileSystem(rdd.context.hadoopConfiguration)
    if (!fs.mkdirs(path)) {
      throw new SparkException(s"Failed to create checkpoint path $cpDir")
    }

    //需要的配置文件(如 core-site.xml 等)broadcast 到其他 worker 節點的 blockManager。

    val broadcastedConf = rdd.context.broadcast(
      new SerializableConfiguration(rdd.context.hadoopConfiguration))


   //向集群提交一個Job去執行checkpoint操作,將RDD序列化到HDFS目錄上
    rdd.context.runJob(rdd, ReliableCheckpointRDD.writeCheckpointFile[T](cpDir, broadcastedConf) _)

    // 為該 rdd 生成一個新的依賴,設置該 rdd 的 parent rdd 為  
    //CheckpointRDD,該 CheckpointRDD 負責以後讀取在文件係統上的   
   //checkpoint 文件,生成該 rdd 的 partition。
    val newRDD = new ReliableCheckpointRDD[T](rdd.context, cpDir)
    if (newRDD.partitions.length != rdd.partitions.length) {
      throw new SparkException(
        s"Checkpoint RDD $newRDD(${newRDD.partitions.length}) has different " +
          s"number of partitions from original RDD $rdd(${rdd.partitions.length})")
    }

    // 是否清除checkpoint文件如果超出引用的資源範圍
    if (rdd.conf.getBoolean("spark.cleaner.referenceTracking.cleanCheckpoints", false)) {
      rdd.context.cleaner.foreach { cleaner =>
        cleaner.registerRDDCheckpointDataForCleanup(newRDD, rdd.id)
      }
    }

    logInfo(s"Done checkpointing RDD ${rdd.id} to $cpDir, new parent is RDD ${newRDD.id}")

//  將新產生的RDD返回給父類
    newRDD
  }

上麵的代碼最終會返回新的CheckpointRDD ,父類將它複值給成員變量cpRDD,最終標記當前狀態為Checkpointed並清空當RDD的依賴鏈。到此Checkpoint的數據就被序列化到HDFS上了。

Checkpoint 寫數據時序圖

checkpoint.jpg

checkpoint什麼時候讀取數據

我們知道Task是saprk運行任務的最小單元,當Task執行失敗的時候spark會重新計算,這裏Task進行計算的地方就是讀取checkpoint的入口。我們可以看一下ShuffleMapTask 裏的計算方法runTask,如下

  override def runTask(context: TaskContext): MapStatus = {

     。。。。。。。

    try {
      val manager = SparkEnv.get.shuffleManager
      writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)

    //調用rdd.iterator,迭代每個partition裏的數據,計算並寫入磁盤
      writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])

      writer.stop(success = true).get
    } catch {
      case e: Exception =>
        try {
          if (writer != null) {
            writer.stop(success = false)
          }
        } catch {
          case e: Exception =>
            log.debug("Could not stop writer", e)
        }
        throw e
    }
  }

這是spark真正調用計算方法的邏輯runTask調用 rdd.iterator() 去計算該 rdd 的 partition 的,我們來看RDD的iterator()

  final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
    if (storageLevel != StorageLevel.NONE) {
      SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
    } else {
      computeOrReadCheckpoint(split, context)
    }
  }

這裏會繼續調用computeOrReadCheckpoint,我們看該方法

**
   * Compute an RDD partition or read it from a checkpoint if the RDD is checkpointing.
   */
  private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
  {
    if (isCheckpointedAndMaterialized) {
      firstParent[T].iterator(split, context)
    } else {
      compute(split, context)
    }
  }

當調用rdd.iterator()去計算該 rdd 的 partition 的時候,會調用 computeOrReadCheckpoint(split: Partition)去查看該 rdd 是否被 checkpoint 過了,如果是,就調用該 rdd 的 parent rdd 的 iterator() 也就是 CheckpointRDD.iterator(),否則直接調用該RDD的compute, 那麼我們就跟進CheckpointRDDcompute

  /**
   * Read the content of the checkpoint file associated with the given partition.
   */
  override def compute(split: Partition, context: TaskContext): Iterator[T] = {
    val file = new Path(checkpointPath, ReliableCheckpointRDD.checkpointFileName(split.index))
    ReliableCheckpointRDD.readCheckpointFile(file, broadcastedConf, context)
  }

這裏就兩行代碼,意思是從Path上讀取我們的CheckPoint數據,看一下readCheckpointFile

  /**
   * Read the content of the specified checkpoint file.
   */
  def readCheckpointFile[T](
      path: Path,
      broadcastedConf: Broadcast[SerializableConfiguration],
      context: TaskContext): Iterator[T] = {
    val env = SparkEnv.get

  // 用hadoop API 讀取HDFS上的數據
    val fs = path.getFileSystem(broadcastedConf.value.value)
    val bufferSize = env.conf.getInt("spark.buffer.size", 65536)
    val fileInputStream = fs.open(path, bufferSize)
    val serializer = env.serializer.newInstance()
    val deserializeStream = serializer.deserializeStream(fileInputStream)

    // Register an on-task-completion callback to close the input stream.
    context.addTaskCompletionListener(context => deserializeStream.close())

    //反序列化數據後轉換為一個Iterator
    deserializeStream.asIterator.asInstanceOf[Iterator[T]]
  }

CheckpointRDD 負責讀取文件係統上的文件,生成該 rdd 的 partition。這就解釋了為什麼要為調用了checkpoint的RDD 添加一個 parent CheckpointRDD的原因。
到此,整個checkpoint的流程就結束了。

Checkpoint 讀取數據時序圖

checkpoint.jpg

最後更新:2017-05-01 08:01:17

  上一篇:go JVM結構書目錄
  下一篇:go spark源碼分析之任務調度篇