Spark技術內幕:Worker源碼與架構解析
首先通過一張Spark的架構圖來了解Worker在Spark中的作用和地位:
Worker所起的作用有以下幾個:
1. 接受Master的指令,啟動或者殺掉Executor
2. 接受Master的指令,啟動或者殺掉Driver
3. 報告Executor/Driver的狀態到Master
4. 心跳到Master,心跳超時則Master認為Worker已經掛了不能工作了
5. 向GUI報告Worker的狀態
說白了,Worker就是整個集群真正幹活的。首先看一下Worker重要的數據結構:
val executors = new HashMap[String, ExecutorRunner] val finishedExecutors = new HashMap[String, ExecutorRunner] val drivers = new HashMap[String, DriverRunner] val finishedDrivers = new HashMap[String, DriverRunner]
這些Hash Map存儲了名字和實體時間的對應關係,方便通過名字直接找到實體進行調用。
看一下如何啟動Executor:
case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) => if (masterUrl != activeMasterUrl) { logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.") } else { try { logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name)) val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_, self, workerId, host, appDesc.sparkHome.map(userSparkHome => new File(userSparkHome)).getOrElse(sparkHome), workDir, akkaUrl, ExecutorState.RUNNING) executors(appId + "/" + execId) = manager manager.start() coresUsed += cores_ memoryUsed += memory_ masterLock.synchronized { master ! ExecutorStateChanged(appId, execId, manager.state, None, None) } } catch { case e: Exception => { logError("Failed to launch executor %s/%d for %s".format(appId, execId, appDesc.name)) if (executors.contains(appId + "/" + execId)) { executors(appId + "/" + execId).kill() executors -= appId + "/" + execId } masterLock.synchronized { master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None) } } }
1行到3行是驗證該命令是否發自一個合法的Master。7到10行定義了一個ExecutorRunner,實際上係統並沒有一個類叫做Executor,我們所說的Executor實際上是由ExecutorRunner實現的,這個名字起得也比較貼切。11行將新建的executor放到上麵提到的Hash Map中。然後12行啟動這個Executor。13行和14行將現在已經使用的core和memory進行的統計。15到17行實際上是向Master報告Executor的狀態。這裏需要加鎖。
如果在這過程中有異常拋出,那麼需要check是否是executor已經加到Hash Map中,如果有則首先停止它,然後從Hash Map中刪除它。並且向Master report Executor是FAILED的。Master會重新啟動新的Executor。
接下來看一下Driver的Hash Map的使用,通過KillDriver:
case KillDriver(driverId) => { logInfo(s"Asked to kill driver $driverId") drivers.get(driverId) match { case Some(runner) => runner.kill() case None => logError(s"Asked to kill unknown driver $driverId") } }
這個KillDirver的命令實際上由Master發出的,而Master實際上接收了Client的kill driver的命令。這個也可以看出Scala語言的簡潔性。
最後更新:2017-04-03 05:39:42