閱讀406 返回首頁    go 微軟 go windows


spark源碼分析之Checkpoint的過程

概述

checkpoint 的機製保證了需要訪問重複數據的應用 Spark 的DAG執行行圖可能很龐大,task 中計算鏈可能會很長,這時如果 task 中途運行出錯,那麼 task 的整個需要重算非常耗時,因此,有必要將計算代價較大的 RDD checkpoint 一下,當下遊 RDD 計算出錯時,可以直接從 checkpoint 過的 RDD 那裏讀取數據繼續算。

我們先來看一個例子,checkpoint的使用

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object CheckPointTest {

   def main(args: Array[String]) {

    val sc: SparkContext = SparkContext.getOrCreate(new   SparkConf().setAppName("ck").setMaster("local[2]"))
    sc.setCheckpointDir("/Users/kinge/ck")

    val rdd: RDD[(String, Int)] = sc.textFile("").map{x=>(x,1) }.reduceByKey(_+_)
    rdd.checkpoint()

    rdd.count()
    rdd.groupBy(x=>x._2).collect().foreach(println)
   }
}

checkpoint流程分析

checkpoint初始化

我們可以看到最先調用了SparkContextsetCheckpointDir 設置了一個checkpoint 目錄
我們跟進這個方法看一下

  /**
   * Set the directory under which RDDs are going to be checkpointed. The directory must
   * be a HDFS path if running on a cluster.
   */
  def setCheckpointDir(directory: String) {

    // If we are running on a cluster, log a warning if the directory is local.
    // Otherwise, the driver may attempt to reconstruct the checkpointed RDD from
    // its own local file system, which is incorrect because the checkpoint files
    // are actually on the executor machines.
    if (!isLocal && Utils.nonLocalPaths(directory).isEmpty) {
      logWarning("Checkpoint directory must be non-local " +
        "if Spark is running on a cluster: " + directory)
    }

   //利用hadoop的api創建了一個hdfs目錄
    checkpointDir = Option(directory).map { dir =>
      val path = new Path(dir, UUID.randomUUID().toString)
      val fs = path.getFileSystem(hadoopConfiguration)
      fs.mkdirs(path)
      fs.getFileStatus(path).getPath.toString
    }
  }

這個方法挺簡單的,就創建了一個目錄,接下來我們看RDD核心的checkpoint 方法,跟進去

  def checkpoint(): Unit = RDDCheckpointData.synchronized {
    if (context.checkpointDir.isEmpty) {
      throw new SparkException("Checkpoint directory has not been set in the SparkContext")
    } else if (checkpointData.isEmpty) {
      checkpointData = Some(new ReliableRDDCheckpointData(this))
    }
  }

這個方法沒有返回值,邏輯隻有一個判斷,checkpointDir剛才設置過了,不為空,然後創建了一個ReliableRDDCheckpointData,我們來看ReliableRDDCheckpointData

/**
 * An implementation of checkpointing that writes the RDD data to reliable storage.
 * This allows drivers to be restarted on failure with previously computed state.
 */
private[spark] class ReliableRDDCheckpointData[T: ClassTag](@transient rdd: RDD[T])
  extends RDDCheckpointData[T](rdd) with Logging {
   。。。。。
}

這個ReliableRDDCheckpointData的父類RDDCheckpointData我們再繼續看它的父類

/**
*   RDD 需要經過
*    [ Initialized  --> CheckpointingInProgress--> Checkpointed ] 
*    這幾個階段才能被 checkpoint。
*/

private[spark] object CheckpointState extends Enumeration {
  type CheckpointState = Value
  val Initialized, CheckpointingInProgress, Checkpointed = Value
}

private[spark] abstract class RDDCheckpointData[T: ClassTag](@transient rdd: RDD[T])
  extends Serializable {

  import CheckpointState._

  // The checkpoint state of the associated RDD.
  protected var cpState = Initialized

  。。。。。。
}

RDD 需要經過
[ Initialized --> CheckpointingInProgress--> Checkpointed ]
這幾個階段才能被 checkpoint。
這類裏麵有一個枚舉來標識CheckPoint的狀態,第一次初始化時是Initialized。
checkpoint這個一步已經完成了,回到我們的RDD成員變量裏checkpointData這個變量指向的RDDCheckpointData的實例。

Checkpoint初始化時序圖

myuml__Collaboration1__Interaction1___0.jpg

checkpoint什麼時候寫入數據

我們知道一個spark job運行最終會調用SparkContextrunJob方法將任務提交給Executor去執行,我們來看runJob

  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      resultHandler: (Int, U) => Unit): Unit = {
    if (stopped.get()) {
      throw new IllegalStateException("SparkContext has been shutdown")
    }
    val callSite = getCallSite
    val cleanedFunc = clean(func)
    logInfo("Starting job: " + callSite.shortForm)
    if (conf.getBoolean("spark.logLineage", false)) {
      logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
    }
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
    progressBar.foreach(_.finishAll())
    rdd.doCheckpoint()
  }

最後一行代碼調用了doCheckpoint,在dagScheduler將任務提交給集群運行之後,我來看這個doCheckpoint方法

  private[spark] def doCheckpoint(): Unit = {
    RDDOperationScope.withScope(sc, "checkpoint", allowNesting = false, ignoreParent = true) {
      if (!doCheckpointCalled) {
        doCheckpointCalled = true
        if (checkpointData.isDefined) {
          checkpointData.get.checkpoint()
        } else {
          //遍曆依賴的rdd,調用每個rdd的doCheckpoint方法
          dependencies.foreach(_.rdd.doCheckpoint())
        }
      }
    }
  }

這個是一個遞歸,遍曆RDD依賴鏈條,當rdd是checkpointData不為空時,調用checkpointDatacheckpoint()方法。還記得checkpointData類型是什麼嗎?就是RDDCheckpointData ,我們來看它的checkpoint方法,以下

  final def checkpoint(): Unit = {
    // Guard against multiple threads checkpointing the same RDD by
    // atomically flipping the state of this RDDCheckpointData
    RDDCheckpointData.synchronized {
      if (cpState == Initialized) {

       //1、標記當前狀態為正在checkpoint中
        cpState = CheckpointingInProgress
      } else {
        return
      }
    }

  //2 這裏調用的是子類的doCheckpoint()
    val newRDD = doCheckpoint()

    // 3 標記checkpoint已完成,清空RDD依賴
    RDDCheckpointData.synchronized {
      cpRDD = Some(newRDD)
      cpState = Checkpointed
      rdd.markCheckpointed()
    }
  }

這個方法開始做checkpoint操作了,將doCheckpoint交給子類去實現checkponit的邏輯,我們去看子類怎麼實現doCheckpoint

  protected override def doCheckpoint(): CheckpointRDD[T] = {

    // Create the output path for the checkpoint
    val path = new Path(cpDir)
    val fs = path.getFileSystem(rdd.context.hadoopConfiguration)
    if (!fs.mkdirs(path)) {
      throw new SparkException(s"Failed to create checkpoint path $cpDir")
    }

    //需要的配置文件(如 core-site.xml 等)broadcast 到其他 worker 節點的 blockManager。

    val broadcastedConf = rdd.context.broadcast(
      new SerializableConfiguration(rdd.context.hadoopConfiguration))


   //向集群提交一個Job去執行checkpoint操作,將RDD序列化到HDFS目錄上
    rdd.context.runJob(rdd, ReliableCheckpointRDD.writeCheckpointFile[T](cpDir, broadcastedConf) _)

    // 為該 rdd 生成一個新的依賴,設置該 rdd 的 parent rdd 為  
    //CheckpointRDD,該 CheckpointRDD 負責以後讀取在文件係統上的   
   //checkpoint 文件,生成該 rdd 的 partition。
    val newRDD = new ReliableCheckpointRDD[T](rdd.context, cpDir)
    if (newRDD.partitions.length != rdd.partitions.length) {
      throw new SparkException(
        s"Checkpoint RDD $newRDD(${newRDD.partitions.length}) has different " +
          s"number of partitions from original RDD $rdd(${rdd.partitions.length})")
    }

    // 是否清除checkpoint文件如果超出引用的資源範圍
    if (rdd.conf.getBoolean("spark.cleaner.referenceTracking.cleanCheckpoints", false)) {
      rdd.context.cleaner.foreach { cleaner =>
        cleaner.registerRDDCheckpointDataForCleanup(newRDD, rdd.id)
      }
    }

    logInfo(s"Done checkpointing RDD ${rdd.id} to $cpDir, new parent is RDD ${newRDD.id}")

//  將新產生的RDD返回給父類
    newRDD
  }

上麵的代碼最終會返回新的CheckpointRDD ,父類將它複值給成員變量cpRDD,最終標記當前狀態為Checkpointed並清空當RDD的依賴鏈。到此Checkpoint的數據就被序列化到HDFS上了。

Checkpoint 寫數據時序圖

checkpoint.jpg

checkpoint什麼時候讀取數據

我們知道Task是saprk運行任務的最小單元,當Task執行失敗的時候spark會重新計算,這裏Task進行計算的地方就是讀取checkpoint的入口。我們可以看一下ShuffleMapTask 裏的計算方法runTask,如下

  override def runTask(context: TaskContext): MapStatus = {

     。。。。。。。

    try {
      val manager = SparkEnv.get.shuffleManager
      writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)

    //調用rdd.iterator,迭代每個partition裏的數據,計算並寫入磁盤
      writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])

      writer.stop(success = true).get
    } catch {
      case e: Exception =>
        try {
          if (writer != null) {
            writer.stop(success = false)
          }
        } catch {
          case e: Exception =>
            log.debug("Could not stop writer", e)
        }
        throw e
    }
  }

這是spark真正調用計算方法的邏輯runTask調用 rdd.iterator() 去計算該 rdd 的 partition 的,我們來看RDD的iterator()

  final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
    if (storageLevel != StorageLevel.NONE) {
      SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
    } else {
      computeOrReadCheckpoint(split, context)
    }
  }

這裏會繼續調用computeOrReadCheckpoint,我們看該方法

**
   * Compute an RDD partition or read it from a checkpoint if the RDD is checkpointing.
   */
  private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
  {
    if (isCheckpointedAndMaterialized) {
      firstParent[T].iterator(split, context)
    } else {
      compute(split, context)
    }
  }

當調用rdd.iterator()去計算該 rdd 的 partition 的時候,會調用 computeOrReadCheckpoint(split: Partition)去查看該 rdd 是否被 checkpoint 過了,如果是,就調用該 rdd 的 parent rdd 的 iterator() 也就是 CheckpointRDD.iterator(),否則直接調用該RDD的compute, 那麼我們就跟進CheckpointRDDcompute

  /**
   * Read the content of the checkpoint file associated with the given partition.
   */
  override def compute(split: Partition, context: TaskContext): Iterator[T] = {
    val file = new Path(checkpointPath, ReliableCheckpointRDD.checkpointFileName(split.index))
    ReliableCheckpointRDD.readCheckpointFile(file, broadcastedConf, context)
  }

這裏就兩行代碼,意思是從Path上讀取我們的CheckPoint數據,看一下readCheckpointFile

  /**
   * Read the content of the specified checkpoint file.
   */
  def readCheckpointFile[T](
      path: Path,
      broadcastedConf: Broadcast[SerializableConfiguration],
      context: TaskContext): Iterator[T] = {
    val env = SparkEnv.get

  // 用hadoop API 讀取HDFS上的數據
    val fs = path.getFileSystem(broadcastedConf.value.value)
    val bufferSize = env.conf.getInt("spark.buffer.size", 65536)
    val fileInputStream = fs.open(path, bufferSize)
    val serializer = env.serializer.newInstance()
    val deserializeStream = serializer.deserializeStream(fileInputStream)

    // Register an on-task-completion callback to close the input stream.
    context.addTaskCompletionListener(context => deserializeStream.close())

    //反序列化數據後轉換為一個Iterator
    deserializeStream.asIterator.asInstanceOf[Iterator[T]]
  }

CheckpointRDD 負責讀取文件係統上的文件,生成該 rdd 的 partition。這就解釋了為什麼要為調用了checkpoint的RDD 添加一個 parent CheckpointRDD的原因。
到此,整個checkpoint的流程就結束了。

Checkpoint 讀取數據時序圖

checkpoint.jpg

最後更新:2017-05-01 08:01:17

  上一篇:go JVM結構書目錄
  下一篇:go spark源碼分析之任務調度篇