spark源碼分析之Executor啟動與任務提交篇
任務提交流程
概述
在闡明了Spark的Master的啟動流程與Worker啟動流程。接下繼續執行的就是Worker上的Executor進程了,本文繼續分析整個Executor的啟動與任務提交流程
Spark-submit
提交一個任務到集群通過的是Spark-submit
通過啟動腳本的方式啟動它的主類,這裏以WordCount為例子spark-submit --class cn.apache.spark.WordCount
bin/spark-clas -> org.apache.spark.deploy.SparkSubmit 調用這個類的main方法
doRunMain方法中傳進來一個自定義spark應用程序的main方法
class cn.apache.spark.WordCount
通過反射拿到類的實例的引用
mainClass = Utils.classForName(childMainClass)
在通過反射調用
class cn.apache.spark.WordCount
的main
方法
我們來看SparkSubmit的main方法
def main(args: Array[String]): Unit = {
val appArgs = new SparkSubmitArguments(args)
if (appArgs.verbose) {
printStream.println(appArgs)
}
//匹配任務類型
appArgs.action match {
case SparkSubmitAction.SUBMIT => submit(appArgs)
case SparkSubmitAction.KILL => kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
}
}
這裏的類型是submit,調用submit方法
private[spark] def submit(args: SparkSubmitArguments): Unit = {
val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)
def doRunMain(): Unit = {
。。。。。。
try {
proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = {
//childMainClass這個你自己定義的App的main所在的全類名
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
}
})
} catch {
。。。。。。
}
}
。。。。。。。
//掉用上麵的doRunMain
doRunMain()
}
submit裏調用了doRunMain(),然後調用了runMain,來看runMain
private def runMain(
。。。。。。
try {
//通過反射
mainClass = Class.forName(childMainClass, true, loader)
} catch {
。。。。。。
}
//反射拿到麵方法實例
val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)
if (!Modifier.isStatic(mainMethod.getModifiers)) {
throw new IllegalStateException("The main method in the given main class must be static")
}
。。。。。。
try {
//調用App的main方法
mainMethod.invoke(null, childArgs.toArray)
} catch {
case t: Throwable =>
throw findCause(t)
}
}
最主要的流程就在這裏了,上麵的代碼注釋很清楚,通過反射調用我們寫的類的main方法,大體的流程到此
Executor啟動流程
SparkSubmit通過反射調用了我們程序的main方法後,就開始執行我們的代碼
,一個Spark程序中需要創建SparkContext對象,我們就從這個對象開始
SparkContext的構造方法代碼很長,主要關注的地方如下
class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient {
。。。。。。
private[spark] def createSparkEnv(
conf: SparkConf,
isLocal: Boolean,
listenerBus: LiveListenerBus): SparkEnv = {
//通過SparkEnv來創建createDriverEnv
SparkEnv.createDriverEnv(conf, isLocal, listenerBus)
}
//在這裏調用了createSparkEnv,返回一個SparkEnv對象,這個對象裏麵有很多重要屬性,最重要的ActorSystem
private[spark] val env = createSparkEnv(conf, isLocal, listenerBus)
SparkEnv.set(env)
//創建taskScheduler
// Create and start the scheduler
private[spark] var (schedulerBackend, taskScheduler) =
SparkContext.createTaskScheduler(this, master)
//創建DAGScheduler
dagScheduler = new DAGScheduler(this)
//啟動TaksScheduler
taskScheduler.start()
。。。。。
}
Spark的構造方法主要幹三件事,創建了一個SparkEnv,taskScheduler,dagScheduler,我們先來看createTaskScheduler
裏幹了什麼
//通過給定的URL創建TaskScheduler
private def createTaskScheduler(
.....
//匹配URL選擇不同的方式
master match {
。。。。。。
//這個是Spark的Standalone模式
case SPARK_REGEX(sparkUrl) =>
//首先創建TaskScheduler
val scheduler = new TaskSchedulerImpl(sc)
val masterUrls = sparkUrl.split(",").map("spark://" + _)
//很重要
val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
//初始化了一個調度器,默認是FIFO
scheduler.initialize(backend)
(backend, scheduler)
。。。。。
}
}
通過master的url來匹配到Standalone模式:然後初始化了**SparkDeploySchedulerBackend**和**TaskSchedulerImpl**,這兩個對象很重要,是啟動任務調度的核心,然後調用了scheduler.initialize(backend)
進行初始化
啟動TaksScheduler初始化完成,回到我們的SparkContext構造方法後麵繼續調用了taskScheduler.start()
啟動TaksScheduler
來看start方法
override def start() {
//調用backend的實現的start方法
backend.start()
if (!isLocal && conf.getBoolean("spark.speculation", false)) {
logInfo("Starting speculative execution thread")
import sc.env.actorSystem.dispatcher
sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,
SPECULATION_INTERVAL milliseconds) {
Utils.tryOrExit { checkSpeculatableTasks() }
}
}
}
這裏的backend是**SparkDeploySchedulerBackend**調用了它的start
override def start() {
//CoarseGrainedSchedulerBackend的start方法,在這個方法裏麵創建了一個DriverActor
super.start()
// The endpoint for executors to talk to us
//下麵是為了啟動java子進程做準備,準備一下參數
val driverUrl = AkkaUtils.address(
AkkaUtils.protocol(actorSystem),
SparkEnv.driverActorSystemName,
conf.get("spark.driver.host"),
conf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
val args = Seq(
"--driver-url", driverUrl,
"--executor-id", "{{EXECUTOR_ID}}",
"--hostname", "{{HOSTNAME}}",
"--cores", "{{CORES}}",
"--app-id", "{{APP_ID}}",
"--worker-url", "{{WORKER_URL}}")
val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
.map(Utils.splitCommandString).getOrElse(Seq.empty)
val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath")
.map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
val libraryPathEntries = sc.conf.getOption("spark.executor.extraLibraryPath")
.map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
// When testing, expose the parent class path to the child. This is processed by
// compute-classpath.{cmd,sh} and makes all needed jars available to child processes
// when the assembly is built with the "*-provided" profiles enabled.
val testingClassPath =
if (sys.props.contains("spark.testing")) {
sys.props("java.class.path").split(java.io.File.pathSeparator).toSeq
} else {
Nil
}
// Start executors with a few necessary configs for registering with the scheduler
val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
val javaOpts = sparkJavaOpts ++ extraJavaOpts
//用command拚接參數,最終會啟動org.apache.spark.executor.CoarseGrainedExecutorBackend子進程
val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
//用ApplicationDescription封裝了一些重要的參數
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
appUIAddress, sc.eventLogDir, sc.eventLogCodec)
//在這裏麵創建ClientActor
client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
//啟動ClientActor
client.start()
waitForRegistration()
}
這裏是拚裝了啟動Executor的一些參數,類名+參數 封裝成ApplicationDescription。最後傳給並創建AppClient並調用它的start方法
AppClient的start方法
接來下關注start方法
def start() {
// Just launch an actor; it will call back into the listener.
actor = actorSystem.actorOf(Props(new ClientActor))
}
在start方法裏創建了與Master通信的ClientActor,然後會調用它的preStart方法向Master注冊,接下來看它的preStart
override def preStart() {
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
try {
//ClientActor向Master注冊
registerWithMaster()
} catch {
case e: Exception =>
logWarning("Failed to connect to master", e)
markDisconnected()
context.stop(self)
}
}
最後會調用該方法向所有Master注冊
def tryRegisterAllMasters() {
for (masterAkkaUrl <- masterAkkaUrls) {
logInfo("Connecting to master " + masterAkkaUrl + "...")
//t通過actorSelection拿到了Master的引用
val actor = context.actorSelection(masterAkkaUrl)
//向Master發送異步的注冊App的消息
actor ! RegisterApplication(appDescription)
}
}
ClientActor發送來的注冊App的消息,ApplicationDescription,他包含了需求的資源,要求啟動的Executor類名和一些參數
Master的Receiver
case RegisterApplication(description) => {
if (state == RecoveryState.STANDBY) {
// ignore, don't send response
} else {
logInfo("Registering app " + description.name)
//創建App sender:ClientActor
val app = createApplication(description, sender)
//注冊App
registerApplication(app)
logInfo("Registered app " + description.name + " with ID " + app.id)
//持久化App
persistenceEngine.addApplication(app)
//向ClientActor反饋信息,告訴他app注冊成功了
sender ! RegisteredApplication(app.id, masterUrl)
//TODO 調度任務
schedule()
}
}
registerApplication(app)
def registerApplication(app: ApplicationInfo): Unit = {
val appAddress = app.driver.path.address
if (addressToApp.contains(appAddress)) {
logInfo("Attempted to re-register application at same address: " + appAddress)
return
}
//把App放到集合裏麵
applicationMetricsSystem.registerSource(app.appSource)
apps += app
idToApp(app.id) = app
actorToApp(app.driver) = app
addressToApp(appAddress) = app
waitingApps += app
}
Master將接受的信息保存到集合並序列化後發送一個RegisteredApplication
消息通知反饋給ClientActor,接著執行schedule()方法,該方法中會遍曆workers集合,並執行launchExecutor
def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc) {
logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
//記錄該worker上使用了多少資源
worker.addExecutor(exec)
//Master向Worker發送啟動Executor的消息
worker.actor ! LaunchExecutor(masterUrl,
exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)
//Master向ClientActor發送消息,告訴ClientActor executor已經啟動了
exec.application.driver ! ExecutorAdded(
exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)
}
這裏Master向Worker發送啟動Executor的消息worker.actor ! LaunchExecutor(masterUrl,
exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)
application.desc裏包含了Executor類的啟動信息
case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
。。。。。
appDirectories(appId) = appLocalDirs
//創建一個ExecutorRunner,這個很重要,保存了Executor的執行配置和參數
val manager = new ExecutorRunner(
appId,
execId,
appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
cores_,
memory_,
self,
workerId,
host,
webUi.boundPort,
publicAddress,
sparkHome,
executorDir,
akkaUrl,
conf,
appLocalDirs, ExecutorState.LOADING)
executors(appId + "/" + execId) = manager
//TODO 開始啟動ExecutorRunner
manager.start()
。。。。。。
}
}
}
Worker的Receiver接受到了啟動Executor的消息,appDesc對象保存了Command命令、Executor的實現類和參數
manager.start()
裏會創建一個線程
def start() {
//啟動一個線程
workerThread = new Thread("ExecutorRunner for " + fullId) {
//用一個子線程來幫助Worker啟動Executor子進程
override def run() { fetchAndRunExecutor() }
}
workerThread.start()
// Shutdown hook that kills actors on shutdown.
shutdownHook = new Thread() {
override def run() {
killProcess(Some("Worker shutting down"))
}
}
Runtime.getRuntime.addShutdownHook(shutdownHook)
}
在線程中調用了fetchAndRunExecutor()
方法,我們來看該方法
def fetchAndRunExecutor() {
try {
// Launch the process
val builder = CommandUtils.buildProcessBuilder(appDesc.command, memory,
sparkHome.getAbsolutePath, substituteVariables)
//構建命令
val command = builder.command()
logInfo("Launch command: " + command.mkString("\"", "\" \"", "\""))
builder.directory(executorDir)
builder.environment.put("SPARK_LOCAL_DIRS", appLocalDirs.mkString(","))
// In case we are running this from within the Spark Shell, avoid creating a "scala"
// parent process for the executor command
builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")
// Add webUI log urls
val baseUrl =
s"https://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")
//啟動子進程
process = builder.start()
val header = "Spark Executor Command: %s\n%s\n\n".format(
command.mkString("\"", "\" \"", "\""), "=" * 40)
// Redirect its stdout and stderr to files
val stdout = new File(executorDir, "stdout")
stdoutAppender = FileAppender(process.getInputStream, stdout, conf)
val stderr = new File(executorDir, "stderr")
Files.write(header, stderr, UTF_8)
stderrAppender = FileAppender(process.getErrorStream, stderr, conf)
// Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)
// or with nonzero exit code
//開始執行,等待結束信號
val exitCode = process.waitFor()
。。。。
}
}
這裏麵進行了類名和參數的拚裝,具體拚裝過程不用關心,最終builder.start()
會以SystemRuntime的方式啟動一個子進程,這個是進程的類名是CoarseGrainedExecutorBackend
到此Executor進程就啟動起來了
Executor任務調度對象啟動
Executor進程後,就首先要執行main方法,main的代碼如下
//Executor進程啟動的入口
def main(args: Array[String]) {
。。。。
//拚裝參數
while (!argv.isEmpty) {
argv match {
case ("--driver-url") :: value :: tail =>
driverUrl = value
argv = tail
case ("--executor-id") :: value :: tail =>
executorId = value
argv = tail
case ("--hostname") :: value :: tail =>
hostname = value
argv = tail
case ("--cores") :: value :: tail =>
cores = value.toInt
argv = tail
case ("--app-id") :: value :: tail =>
appId = value
argv = tail
case ("--worker-url") :: value :: tail =>
// Worker url is used in spark standalone mode to enforce fate-sharing with worker
workerUrl = Some(value)
argv = tail
case ("--user-class-path") :: value :: tail =>
userClassPath += new URL(value)
argv = tail
case Nil =>
case tail =>
System.err.println(s"Unrecognized options: ${tail.mkString(" ")}")
printUsageAndExit()
}
}
if (driverUrl == null || executorId == null || hostname == null || cores <= 0 ||
appId == null) {
printUsageAndExit()
}
//開始執行Executor
run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)
}
執行了run方法
private def run(
driverUrl: String,
executorId: String,
hostname: String,
cores: Int,
appId: String,
workerUrl: Option[String],
userClassPath: Seq[URL])
。。。。。
//通過actorSystem創建CoarseGrainedExecutorBackend -> Actor
//CoarseGrainedExecutorBackend -> DriverActor通信
env.actorSystem.actorOf(
Props(classOf[CoarseGrainedExecutorBackend],
driverUrl, executorId, sparkHostPort, cores, userClassPath, env),
name = "Executor")
。。。。。。
}
env.actorSystem.awaitTermination()
}
}
run方法中創建了CoarseGrainedExecutorBackend的Actor對象用於準備和DriverActor通信,接著會繼續調用preStart生命周期方法
override def preStart() {
logInfo("Connecting to driver: " + driverUrl)
//Executor跟DriverActor建立連接
driver = context.actorSelection(driverUrl)
//Executor向DriverActor發送消息
driver ! RegisterExecutor(executorId, hostPort, cores, extractLogUrls)
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
}
Executor向DriverActor發送注冊的消息 driver ! RegisterExecutor(executorId, hostPort, cores, extractLogUrls)
DriverActor的receiver收到消息後
def receiveWithLogging = {
//Executor發送給DriverActor的注冊消息
case RegisterExecutor(executorId, hostPort, cores, logUrls) =>
Utils.checkHostPort(hostPort, "Host port expected " + hostPort)
if (executorDataMap.contains(executorId)) {
sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId)
} else {
logInfo("Registered executor: " + sender + " with ID " + executorId)
//DriverActor向Executor發送注冊成功的消息
sender ! RegisteredExecutor
addressToExecutorId(sender.path.address) = executorId
totalCoreCount.addAndGet(cores)
totalRegisteredExecutors.addAndGet(1)
val (host, _) = Utils.parseHostPort(hostPort)
//將Executor的信息封裝起來
val data = new ExecutorData(sender, sender.path.address, host, cores, cores, logUrls)
// This must be synchronized because variables mutated
// in this block are read when requesting executors
CoarseGrainedSchedulerBackend.this.synchronized {
//往集合添加Executor的信息對象
executorDataMap.put(executorId, data)
if (numPendingExecutors > 0) {
numPendingExecutors -= 1
logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
}
}
listenerBus.post(
SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))
//將來用來執行真正的業務邏輯
makeOffers()
}
DriverActor的receiver裏將Executor信息封裝到Map中保存起來,並發送反饋消息 sender ! RegisteredExecutor
給CoarseGrainedExecutorBackend
override def receiveWithLogging = {
case RegisteredExecutor =>
logInfo("Successfully registered with driver")
val (hostname, _) = Utils.parseHostPort(hostPort)
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
CoarseGrainedExecutorBackend收到消息後創建一個Executor對象用於準備任務的執行,到此Executor的創建就完成了,接下來下篇介紹任務的調度。
最後更新:2017-05-01 08:01:17