閱讀503 返回首頁    go 阿裏雲 go 技術社區[雲棲]


【Kafka源碼】Kafka啟動過程

一般來說,我們是通過命令來啟動kafka,但是命令的本質還是調用代碼中的main方法,所以,我們重點看下啟動類Kafka。源碼下下來之後,我們也可以通過直接運行Kafka.scala中的main方法(需要指定啟動參數,也就是server.properties的位置)來啟動Kafka。因為kafka依賴zookeeper,所以我們需要提前啟動zookeeper,然後在server.properties中指定zk地址後,啟動。

下麵我們首先看一下main()方法:

def main(args: Array[String]): Unit = {
try {
val serverProps = getPropsFromArgs(args)
val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)

  // attach shutdown handler to catch control-c
  Runtime.getRuntime().addShutdownHook(new Thread() {
    override def run() = {
      kafkaServerStartable.shutdown
    }
  })

  kafkaServerStartable.startup
  kafkaServerStartable.awaitShutdown
}
catch {
  case e: Throwable =>
    fatal(e)
    System.exit(1)
}
System.exit(0)

}
我們慢慢來分析下,首先是getPropsFromArgs(args),這一行很明確,就是從配置文件中讀取我們配置的內容,然後賦值給serverProps。第二步,KafkaServerStartable.fromProps(serverProps),

object KafkaServerStartable {
def fromProps(serverProps: Properties) = {
KafkaMetricsReporter.startReporters(new VerifiableProperties(serverProps))
new KafkaServerStartable(KafkaConfig.fromProps(serverProps))
}
}
這塊主要是啟動了一個內部的監控服務(內部狀態監控)。

下麵是一個在java中常見的鉤子函數,在關閉時會啟動一些銷毀程序,保證程序安全關閉。之後就是我們啟動的重頭戲了:kafkaServerStartable.startup。跟進去可以很清楚的看到,裏麵調用的方法是KafkaServer中的startup方法,下麵我們重點看下這個方法(比較長):

def startup() {
try {
info("starting")

  if(isShuttingDown.get)
    throw new IllegalStateException("Kafka server is still shutting down, cannot re-start!")

  if(startupComplete.get)
    return

  val canStartup = isStartingUp.compareAndSet(false, true)
  if (canStartup) {
    metrics = new Metrics(metricConfig, reporters, kafkaMetricsTime, true)

    brokerState.newState(Starting)

    /* start scheduler */
    kafkaScheduler.startup()

    /* setup zookeeper */
    zkUtils = initZk()

    /* start log manager */
    logManager = createLogManager(zkUtils.zkClient, brokerState)
    logManager.startup()

    /* generate brokerId */
    config.brokerId =  getBrokerId
    this.logIdent = "[Kafka Server " + config.brokerId + "], "

    socketServer = new SocketServer(config, metrics, kafkaMetricsTime)
    socketServer.startup()

    /* start replica manager */
    replicaManager = new ReplicaManager(config, metrics, time, kafkaMetricsTime, zkUtils, kafkaScheduler, logManager,
      isShuttingDown)
    replicaManager.startup()

    /* start kafka controller */
    kafkaController = new KafkaController(config, zkUtils, brokerState, kafkaMetricsTime, metrics, threadNamePrefix)
    kafkaController.startup()

    /* start group coordinator */
    groupCoordinator = GroupCoordinator(config, zkUtils, replicaManager, kafkaMetricsTime)
    groupCoordinator.startup()

    /* Get the authorizer and initialize it if one is specified.*/
    authorizer = Option(config.authorizerClassName).filter(_.nonEmpty).map { authorizerClassName =>
      val authZ = CoreUtils.createObject[Authorizer](authorizerClassName)
      authZ.configure(config.originals())
      authZ
    }

    /* start processing requests */
    apis = new KafkaApis(socketServer.requestChannel, replicaManager, groupCoordinator,
      kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer)
    requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
    brokerState.newState(RunningAsBroker)

    Mx4jLoader.maybeLoad()

    /* start dynamic config manager */
    dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager, config),
                                                       ConfigType.Client -> new ClientIdConfigHandler(apis.quotaManagers))

    // Apply all existing client configs to the ClientIdConfigHandler to bootstrap the overrides
    // TODO: Move this logic to DynamicConfigManager
    AdminUtils.fetchAllEntityConfigs(zkUtils, ConfigType.Client).foreach {
      case (clientId, properties) => dynamicConfigHandlers(ConfigType.Client).processConfigChanges(clientId, properties)
    }

    // Create the config manager. start listening to notifications
    dynamicConfigManager = new DynamicConfigManager(zkUtils, dynamicConfigHandlers)
    dynamicConfigManager.startup()

    /* tell everyone we are alive */
    val listeners = config.advertisedListeners.map {case(protocol, endpoint) =>
      if (endpoint.port == 0)
        (protocol, EndPoint(endpoint.host, socketServer.boundPort(protocol), endpoint.protocolType))
      else
        (protocol, endpoint)
    }
    kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, zkUtils, config.rack,
      config.interBrokerProtocolVersion)
    kafkaHealthcheck.startup()

    // Now that the broker id is successfully registered via KafkaHealthcheck, checkpoint it
    checkpointBrokerId(config.brokerId)

    /* register broker metrics */
    registerStats()

    shutdownLatch = new CountDownLatch(1)
    startupComplete.set(true)
    isStartingUp.set(false)
    AppInfoParser.registerAppInfo(jmxPrefix, config.brokerId.toString)
    info("started")
  }
}
catch {
  case e: Throwable =>
    fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e)
    isStartingUp.set(false)
    shutdown()
    throw e
}

}
首先判斷是否目前正在關閉中或者已經啟動了,這兩種情況直接拋出異常。然後是一個CAS的操作isStartingUp,防止線程並發操作啟動,判斷是否可以啟動。如果可以啟動,就開始我們的啟動過程。

構造Metrics類
定義broker狀態為啟動中starting
啟動定時器kafkaScheduler.startup()
構造zkUtils:利用參數中的zk信息,啟動一個zk客戶端
啟動文件管理器:讀取zk中的配置信息,包含__consumer_offsets和__system.topic__。重點是啟動一些定時任務,來刪除符合條件的記錄(cleanupLogs),清理髒記錄(flushDirtyLogs),把所有記錄寫到一個文本文件中,防止在啟動時重啟所有的記錄文件(checkpointRecoveryPointOffsets)。
/**

  • Start the background threads to flush logs and do log cleanup / def startup() { / Schedule the cleanup task to delete old logs */ if(scheduler != null) { info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs)) scheduler.schedule("kafka-log-retention", cleanupLogs, delay = InitialTaskDelayMs, period = retentionCheckMs, TimeUnit.MILLISECONDS) info("Starting log flusher with a default period of %d ms.".format(flushCheckMs)) scheduler.schedule("kafka-log-flusher", flushDirtyLogs, delay = InitialTaskDelayMs, period = flushCheckMs, TimeUnit.MILLISECONDS) scheduler.schedule("kafka-recovery-point-checkpoint", checkpointRecoveryPointOffsets, delay = InitialTaskDelayMs, period = flushCheckpointMs, TimeUnit.MILLISECONDS) } if(cleanerConfig.enableCleaner) cleaner.startup() } 下一步,獲取brokerId 啟動一個NIO socket服務 啟動複製管理器:啟動ISR超時處理線程 啟動kafka控製器:注冊session過期監聽器,同時啟動控製器leader選舉 啟動協調器 權限認證 開啟線程,開始處理請求 開啟配置監聽,主要是監聽zk節點數據變化,然後廣播到所有機器 開啟健康檢查:目前隻是把broker節點注冊到zk上,注冊成功就是活的,否則就是dead 注冊啟動數據信息 啟動成功 等待關閉countDownLatch,如果shutdownLatch變為0,則關閉Kafka

最後更新:2017-10-23 17:33:35

  上一篇:go  微信掃一掃,手機就能看遠程視頻監控直播
  下一篇:go  米領通信正式加入中關村互聯網金融協會,踐行科技服務金融