閱讀870 返回首頁    go 小米 go 小米6


Spark技術內幕:Worker源碼與架構解析

首先通過一張Spark的架構圖來了解Worker在Spark中的作用和地位:


Worker所起的作用有以下幾個:

1. 接受Master的指令,啟動或者殺掉Executor

2. 接受Master的指令,啟動或者殺掉Driver

3. 報告Executor/Driver的狀態到Master

4. 心跳到Master,心跳超時則Master認為Worker已經掛了不能工作了

5. 向GUI報告Worker的狀態


說白了,Worker就是整個集群真正幹活的。首先看一下Worker重要的數據結構:

  val executors = new HashMap[String, ExecutorRunner]
  val finishedExecutors = new HashMap[String, ExecutorRunner]
  val drivers = new HashMap[String, DriverRunner]
  val finishedDrivers = new HashMap[String, DriverRunner]

這些Hash Map存儲了名字和實體時間的對應關係,方便通過名字直接找到實體進行調用。

看一下如何啟動Executor:

case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
      if (masterUrl != activeMasterUrl) {
        logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
      } else {
        try {
          logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
          val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
            self, workerId, host,
            appDesc.sparkHome.map(userSparkHome => new File(userSparkHome)).getOrElse(sparkHome),
            workDir, akkaUrl, ExecutorState.RUNNING)
          executors(appId + "/" + execId) = manager
          manager.start()
          coresUsed += cores_
          memoryUsed += memory_
          masterLock.synchronized {
            master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
          }
        } catch {
          case e: Exception => {
            logError("Failed to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
            if (executors.contains(appId + "/" + execId)) {
              executors(appId + "/" + execId).kill()
              executors -= appId + "/" + execId
            }
            masterLock.synchronized {
              master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None)
            }
          }
        }


1行到3行是驗證該命令是否發自一個合法的Master。7到10行定義了一個ExecutorRunner,實際上係統並沒有一個類叫做Executor,我們所說的Executor實際上是由ExecutorRunner實現的,這個名字起得也比較貼切。11行將新建的executor放到上麵提到的Hash Map中。然後12行啟動這個Executor。13行和14行將現在已經使用的core和memory進行的統計。15到17行實際上是向Master報告Executor的狀態。這裏需要加鎖。

如果在這過程中有異常拋出,那麼需要check是否是executor已經加到Hash Map中,如果有則首先停止它,然後從Hash Map中刪除它。並且向Master report Executor是FAILED的。Master會重新啟動新的Executor。


接下來看一下Driver的Hash Map的使用,通過KillDriver:

    case KillDriver(driverId) => {
      logInfo(s"Asked to kill driver $driverId")
      drivers.get(driverId) match {
        case Some(runner) =>
          runner.kill()
        case None =>
          logError(s"Asked to kill unknown driver $driverId")
      }
    }

這個KillDirver的命令實際上由Master發出的,而Master實際上接收了Client的kill driver的命令。這個也可以看出Scala語言的簡潔性。



最後更新:2017-04-03 05:39:42

  上一篇:go UIImage擴展
  下一篇:go IOS搖一搖功能實現