Spark技術內幕:Client,Master和Worker 通信源碼解析
Spark的Cluster Manager可以有幾種部署模式:
- Standlone
- Mesos
- YARN
- EC2
- Local
在向集群提交計算任務後,係統的運算模型就是Driver Program定義的SparkContext向APP Master提交,有APP Master進行計算資源的調度並最終完成計算。具體闡述可以閱讀《Spark:大數據的電花火石! 》。

那麼Standalone模式下,Client,Master和Worker是如何進行通信,注冊並開啟服務的呢?
1. node之間的RPC - akka
模塊間通信有很多成熟的實現,現在很多成熟的Framework已經早已經讓我們擺脫原始的Socket編程了。簡單歸類,可以歸納為基於消息的傳遞和基於資源共享的同步機製。
基於消息的傳遞的機製應用比較廣泛的有Message Queue。Message Queue, 是一種應用程序對應用程序的通信方法。應用程序通過讀寫出入隊列的消息(針對應用程序的數據)來通信,而無需專用連接來鏈接它們。消 息傳遞指的是程序之間通過在消息中發送數據進行通信,而不是通過直接調用彼此來通信,直接調用通常是用於諸如遠程過程調用的技術。排隊指的是應用程序通過 隊列來通信。隊列的使用除去了接收和發送應用程序同時執行的要求。其中較為成熟的MQ產品有IBM WEBSPHERE MQ和RabbitMQ(AMQP的開源實現,現在由Pivotal維護)。
還有不得不提的是ZeroMQ,一個致力於進入Linux內核的基於Socket的編程框架。官方的說法: “ZeroMQ是一個簡單好用的傳輸層,像框架一樣的一個socket library,它使得Socket編程更加簡單、簡潔和性能更高。是一個消息處理隊列庫,可在多個線程、內核和主機盒之間彈性伸縮。ZMQ的明確目標是“成為標準網絡協議棧的一部分,之後進入Linux內核”。
Spark在很多模塊之間的通信選擇是Scala原生支持的akka,一個用 Scala 編寫的庫,用於簡化編寫容錯的、高可伸縮性的 Java 和 Scala 的 Actor 模型應用。akka有以下5個特性:
- 易於構建並行和分布式應用 (Simple Concurrency & Distribution): Akka在設計時采用了異步通訊和分布式架構,並對上層進行抽象,如Actors、Futures ,STM等。
- 可靠性(Resilient by Design): 係統具備自愈能力,在本地/遠程都有監護。
- 高性能(High Performance):在單機中每秒可發送50,000,000個消息。內存占用小,1GB內存中可保存2,500,000個actors。
- 彈性,無中心(Elastic — Decentralized):自適應的負責均衡,路由,分區,配置
- 可擴展(Extensible):可以使用Akka 擴展包進行擴展。
在Spark中的Client,Master和Worker實際上都是一個actor,拿Client來說:
import akka.actor._
import akka.pattern.ask
import akka.remote.{AssociationErrorEvent, DisassociatedEvent, RemotingLifecycleEvent}
private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends Actor with Logging {
var masterActor: ActorSelection = _
val timeout = AkkaUtils.askTimeout(conf)
override def preStart() = {
masterActor = context.actorSelection(Master.toAkkaUrl(driverArgs.master))
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
println(s"Sending ${driverArgs.cmd} command to ${driverArgs.master}")
driverArgs.cmd match {
case "launch" =>
...
masterActor ! RequestSubmitDriver(driverDescription)
case "kill" =>
val driverId = driverArgs.driverId
val killFuture = masterActor ! RequestKillDriver(driverId)
}
}
override def receive = {
case SubmitDriverResponse(success, driverId, message) =>
println(message)
if (success) pollAndReportStatus(driverId.get) else System.exit(-1)
case KillDriverResponse(driverId, success, message) =>
println(message)
if (success) pollAndReportStatus(driverId) else System.exit(-1)
case DisassociatedEvent(_, remoteAddress, _) =>
println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.")
System.exit(-1)
case AssociationErrorEvent(cause, _, remoteAddress, _) =>
println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.")
println(s"Cause was: $cause")
System.exit(-1)
}
}
/**
* Executable utility for starting and terminating drivers inside of a standalone cluster.
*/
object Client {
def main(args: Array[String]) {
println("WARNING: This client is deprecated and will be removed in a future version of Spark.")
println("Use ./bin/spark-submit with \"--master spark://host:port\"")
val conf = new SparkConf()
val driverArgs = new ClientArguments(args)
if (!driverArgs.logLevel.isGreaterOrEqual(Level.WARN)) {
conf.set("spark.akka.logLifecycleEvents", "true")
}
conf.set("spark.akka.askTimeout", "10")
conf.set("akka.loglevel", driverArgs.logLevel.toString.replace("WARN", "WARNING"))
Logger.getRootLogger.setLevel(driverArgs.logLevel)
// TODO: See if we can initialize akka so return messages are sent back using the same TCP
// flow. Else, this (sadly) requires the DriverClient be routable from the Master.
val (actorSystem, _) = AkkaUtils.createActorSystem(
"driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))
actorSystem.actorOf(Props(classOf[ClientActor], driverArgs, conf))
actorSystem.awaitTermination()
}
}
其中第19行的含義就是向Master提交Driver的請求,
masterActor ! RequestSubmitDriver(driverDescription)
而Master將在receive裏處理這個請求。當然了27行到44行的是處理Client Actor收到的消息。
可以看出,通過akka,可以非常簡單高效的處理模塊間的通信,這可以說是Spark RPC的一大特色。
2. Client,Master和Workerq啟動通信詳解
源碼位置:spark-1.0.0\core\src\main\scala\org\apache\spark\deploy。主要涉及的類:Client.scala, Master.scala和Worker.scala。這三大模塊之間的通信框架如下圖。
Standalone模式下存在的角色:
-
Client:負責提交作業到Master。
Master:接收Client提交的作業,管理Worker,並命令Worker啟動Driver和Executor。
Worker:負責管理本節點的資源,定期向Master匯報心跳,接收Master的命令,比如啟動Driver和Executor。

實際上,Master和Worker要處理的消息要比這多得多,本圖隻是反映了集群啟動和向集群提交運算時候的主要消息處理。
接下來將分別走讀這三大角色的源碼。
2.1 Client源碼解析
Client啟動:
object Client {
def main(args: Array[String]) {
println("WARNING: This client is deprecated and will be removed in a future version of Spark.")
println("Use ./bin/spark-submit with \"--master spark://host:port\"")
val conf = new SparkConf()
val driverArgs = new ClientArguments(args)
if (!driverArgs.logLevel.isGreaterOrEqual(Level.WARN)) {
conf.set("spark.akka.logLifecycleEvents", "true")
}
conf.set("spark.akka.askTimeout", "10")
conf.set("akka.loglevel", driverArgs.logLevel.toString.replace("WARN", "WARNING"))
Logger.getRootLogger.setLevel(driverArgs.logLevel)
// TODO: See if we can initialize akka so return messages are sent back using the same TCP
// flow. Else, this (sadly) requires the DriverClient be routable from the Master.
val (actorSystem, _) = AkkaUtils.createActorSystem(
"driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))
// 使用ClientActor初始化actorSystem
actorSystem.actorOf(Props(classOf[ClientActor], driverArgs, conf))
//啟動並等待actorSystem的結束
actorSystem.awaitTermination()
}
}
從行21可以看出,核心實現是由ClientActor實現的。Client的Actor是akka.Actor的一個擴展。對於Actor,從它對recevie的override就可以看出它需要處理的消息。
override def receive = {
case SubmitDriverResponse(success, driverId, message) =>
println(message)
if (success) pollAndReportStatus(driverId.get) else System.exit(-1)
case KillDriverResponse(driverId, success, message) =>
println(message)
if (success) pollAndReportStatus(driverId) else System.exit(-1)
case DisassociatedEvent(_, remoteAddress, _) =>
println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.")
System.exit(-1)
case AssociationErrorEvent(cause, _, remoteAddress, _) =>
println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.")
println(s"Cause was: $cause")
System.exit(-1)
}
2.2 Master的源碼分析
源碼分析詳見注釋。
override def receive = {
case ElectedLeader => {
// 被選為Master,首先判斷是否該Master原來為active,如果是那麼進行Recovery。
}
case CompleteRecovery => completeRecovery() // 刪除沒有響應的worker和app,並且將所有沒有worker的Driver分配worker
case RevokedLeadership => {
// Master將關閉。
}
case RegisterWorker(id, workerHost, workerPort, cores, memory, workerUiPort, publicAddress) =>
{
// 如果該Master不是active,不做任何操作,返回
// 如果注冊過該worker id,向sender返回錯誤
sender ! RegisterWorkerFailed("Duplicate worker ID")
// 注冊worker,如果worker注冊成功則返回成功的消息並且進行調度
sender ! RegisteredWorker(masterUrl, masterWebUiUrl)
schedule()
// 如果worker注冊失敗,發送消息到sender
sender ! RegisterWorkerFailed("Attempted to re-register worker at same address: " + workerAddress)
}
case RequestSubmitDriver(description) => {
// 如果master不是active,返回錯誤
sender ! SubmitDriverResponse(false, None, msg)
// 否則創建driver,返回成功的消息
sender ! SubmitDriverResponse(true, Some(driver.id), s"Driver successfully submitted as ${driver.id}")
}
}
case RequestKillDriver(driverId) => {
if (state != RecoveryState.ALIVE) {
// 如果master不是active,返回錯誤
val msg = s"Can only kill drivers in ALIVE state. Current state: $state."
sender ! KillDriverResponse(driverId, success = false, msg)
} else {
logInfo("Asked to kill driver " + driverId)
val driver = drivers.find(_.id == driverId)
driver match {
case Some(d) =>
//如果driver仍然在等待隊列,從等待隊列刪除並且更新driver狀態為KILLED
} else {
// 通知worker kill driver id的driver。結果會由workder發消息給master ! DriverStateChanged
d.worker.foreach { w => w.actor ! KillDriver(driverId) }
}
// 注意,此時driver不一定被kill,master隻是通知了worker去kill driver。
sender ! KillDriverResponse(driverId, success = true, msg)
case None =>
// driver已經被kill,直接返回結果
sender ! KillDriverResponse(driverId, success = false, msg)
}
}
}
case RequestDriverStatus(driverId) => {
// 查找請求的driver,如果找到則返回driver的狀態
(drivers ++ completedDrivers).find(_.id == driverId) match {
case Some(driver) =>
sender ! DriverStatusResponse(found = true, Some(driver.state),
driver.worker.map(_.id), driver.worker.map(_.hostPort), driver.exception)
case None =>
sender ! DriverStatusResponse(found = false, None, None, None, None)
}
}
case RegisterApplication(description) => {
//如果是standby,那麼忽略這個消息
//否則注冊application;返回結果並且開始調度
}
case ExecutorStateChanged(appId, execId, state, message, exitStatus) => {
// 通過idToApp獲得app,然後通過app獲得executors,從而通過execId獲得executor
val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))
execOption match {
case Some(exec) => {
exec.state = state
exec.application.driver ! ExecutorUpdated(execId, state, message, exitStatus)
if (ExecutorState.isFinished(state)) {
val appInfo = idToApp(appId)
// Remove this executor from the worker and app
logInfo("Removing executor " + exec.fullId + " because it is " + state)
appInfo.removeExecutor(exec)
exec.worker.removeExecutor(exec)
}
}
}
case DriverStateChanged(driverId, state, exception) => {
// 如果Driver的state為ERROR | FINISHED | KILLED | FAILED, 刪除它。
}
case Heartbeat(workerId) => {
// 更新worker的時間戳 workerInfo.lastHeartbeat = System.currentTimeMillis()
}
case MasterChangeAcknowledged(appId) => {
// 將appId的app的狀態置為WAITING,為切換Master做準備。
}
case WorkerSchedulerStateResponse(workerId, executors, driverIds) => {
// 通過workerId查找到worker,那麼worker的state置為ALIVE,
// 並且查找狀態為idDefined的executors,並且將這些executors都加入到app中,
// 然後保存這些app到worker中。可以理解為Worker在Master端的Recovery
idToWorker.get(workerId) match {
case Some(worker) =>
logInfo("Worker has been re-registered: " + workerId)
worker.state = WorkerState.ALIVE
val validExecutors = executors.filter(exec => idToApp.get(exec.appId).isDefined)
for (exec <- validExecutors) {
val app = idToApp.get(exec.appId).get
val execInfo = app.addExecutor(worker, exec.cores, Some(exec.execId))
worker.addExecutor(execInfo)
execInfo.copyState(exec)
}
// 將所有的driver設置為RUNNING然後加入到worker中。
for (driverId <- driverIds) {
drivers.find(_.id == driverId).foreach { driver =>
driver.worker = Some(worker)
driver.state = DriverState.RUNNING
worker.drivers(driverId) = driver
}
}
}
}
case DisassociatedEvent(_, address, _) => {
// 這個請求是Worker或者是App發送的。刪除address對應的Worker和App
// 如果Recovery可以結束,那麼結束Recovery
}
case RequestMasterState => {
//向sender返回master的狀態
sender ! MasterStateResponse(host, port, workers.toArray, apps.toArray, completedApps.toArray, drivers.toArray, completedDrivers.toArray, state)
}
case CheckForWorkerTimeOut => {
//刪除超時的Worker
}
case RequestWebUIPort => {
//向sender返回web ui的端口號
sender ! WebUIPortResponse(webUi.boundPort)
}
}
2.3 Worker 源碼解析
通過對Client和Master的源碼解析,相信你也知道如何去分析Worker是如何和Master進行通信的了,沒錯,答案就在下麵:
override def receive
參考資料:
Spark源碼1.0.0。
請您支持:如果您讀到這裏,相信本文對您有所幫助,請點擊投票支持一下吧。如果您已經在投票頁麵,請點擊下麵的投一票吧!
BTW,即使您沒有CSDN的帳號,可以使用第三方登錄的,包括微博,QQ,Gmail,GitHub,百度,等。
最後更新:2017-04-03 07:57:06