771
技術社區[雲棲]
【Kafka源碼】KafkaController啟動過程
[TOC]
之前聊過了很多Kafka啟動過程中的一些加載內容,也知道了broker可以分為很多的partition,每個partition內部也可以分為leader和follower,主從之間有數據的複製。那麼這麼多partition是誰在管理?broker內部有沒有主從之分?這就是本文的主角,KafkaController,本文將細細道來。
一、入口
KafkaController的啟動入口同樣很簡潔,在KafkaServer的start方法中。
/ start kafka controller /
kafkaController = new KafkaController(config, zkUtils, brokerState, kafkaMetricsTime, metrics, threadNamePrefix)
kafkaController.startup()
首先實例化一個KafkaController,之後啟動了這個controller。
二、實例化Controller
實例化的源碼,見注釋:
this.logIdent = "[Controller " + config.brokerId + "]: "
private var isRunning = true
private val stateChangeLogger = KafkaController.stateChangeLogger
//實例化上下文
val controllerContext = new ControllerContext(zkUtils, config.zkSessionTimeoutMs)
//實例化partition狀態機
val partitionStateMachine = new PartitionStateMachine(this)
//實例化replica狀態機
val replicaStateMachine = new ReplicaStateMachine(this)
//實例化broker的leader選舉器
private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover,
onControllerResignation, config.brokerId)
// have a separate scheduler for the controller to be able to start and stop independently of the
// kafka server
//實例化負載均衡定時器
private val autoRebalanceScheduler = new KafkaScheduler(1)
//topic刪除管理器
var deleteTopicManager: TopicDeletionManager = null
//離線分區leader選擇器
val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext, config)
//重新分配分區leader
private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext)
//重新分配leader時優先選擇的replica
private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext)
//controller關閉後的leader選舉
private val controlledShutdownPartitionLeaderSelector = new ControlledShutdownLeaderSelector(controllerContext)
private val brokerRequestBatch = new ControllerBrokerRequestBatch(this)
//重分配監聽器
private val partitionReassignedListener = new PartitionsReassignedListener(this)
//優選replica選舉監聽器
private val preferredReplicaElectionListener = new PreferredReplicaElectionListener(this)
//isr變化通知監聽器
private val isrChangeNotificationListener = new IsrChangeNotificationListener(this)
三、Controller啟動
直接上代碼:
def startup() = {
inLock(controllerContext.controllerLock) {
info("Controller starting up")
registerSessionExpirationListener()
isRunning = true
controllerElector.startup
info("Controller startup complete")
}
}
這個start方法並不意味著當前的broker就是controller,隻是把它注冊到zk上麵,後麵zk會進行選舉,選舉出controller後,在controller機器上麵會執行一係列的操作,後麵我們能看到。
3.1 registerSessionExpirationListener
首先,我們的broker會注冊一個session過期的監聽器,我們看一下這個監聽器。
private def registerSessionExpirationListener() = {
zkUtils.zkClient.subscribeStateChanges(new SessionExpirationListener())
}
class SessionExpirationListener() extends IZkStateListener with Logging {
this.logIdent = "[SessionExpirationListener on " + config.brokerId + "], "
@throws(classOf[Exception])
def handleStateChanged(state: KeeperState) {
// do nothing, since zkclient will do reconnect for us.
}
/**
* Called after the zookeeper session has expired and a new session has been created. You would have to re-create
* any ephemeral nodes here.
*
* @throws Exception
* On any error.
*/
@throws(classOf[Exception])
def handleNewSession() {
info("ZK expired; shut down all controller components and try to re-elect")
inLock(controllerContext.controllerLock) {
onControllerResignation()
controllerElector.elect
}
}
override def handleSessionEstablishmentError(error: Throwable): Unit = {
//no-op handleSessionEstablishmentError in KafkaHealthCheck should handle this error in its handleSessionEstablishmentError
}
}
可以看到,當broker到zk的session失效之後,broker並不會主動發起重連操作,而是等待zk的重連,當新的session被創建後,也就是當前broker加入到broker列表中之後,會進行兩個操作:
- onControllerResignation:也就是當前controller失效
- controllerElector.elect:重新進行controller選舉
下麵我們分別看看做了啥。
3.1.1 onControllerResignation
從代碼看會比較直觀,主要就是清理一些controller的數據。
/**
* This callback is invoked by the zookeeper leader elector when the current broker resigns as the controller. This is
* required to clean up internal controller data structures
*/
def onControllerResignation() {
debug("Controller resigning, broker id %d".format(config.brokerId))
// de-register listeners 取消訂閱監聽器
deregisterIsrChangeNotificationListener()
deregisterReassignedPartitionsListener()
deregisterPreferredReplicaElectionListener()
// shutdown delete topic manager 關閉topic刪除管理器
if (deleteTopicManager != null)
deleteTopicManager.shutdown()
// shutdown leader rebalance scheduler 關閉負載均衡定時器
if (config.autoLeaderRebalanceEnable)
autoRebalanceScheduler.shutdown()
inLock(controllerContext.controllerLock) {
// de-register partition ISR listener for on-going partition reassignment task 取消訂閱ISR監聽器
deregisterReassignedPartitionsIsrChangeListeners()
// shutdown partition state machine 關閉分區狀態機
partitionStateMachine.shutdown()
// shutdown replica state machine 關閉replica狀態機
replicaStateMachine.shutdown()
// shutdown controller channel manager 關閉控製器管道管理器
if (controllerContext.controllerChannelManager != null) {
controllerContext.controllerChannelManager.shutdown()
controllerContext.controllerChannelManager = null
}
// reset controller context
controllerContext.epoch = 0
controllerContext.epochZkVersion = 0
brokerState.newState(RunningAsBroker)//把當前broker狀態從controller改為broker
info("Broker %d resigned as the controller".format(config.brokerId))
}
}
3.1.2 controllerElector.elect
這塊是進行controller的重新選舉。
def elect: Boolean = {
val timestamp = SystemTime.milliseconds.toString
val electString = Json.encode(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp))
leaderId = getControllerID
/*
* We can get here during the initial startup and the handleDeleted ZK callback. Because of the potential race condition,
* it's possible that the controller has already been elected when we get here. This check will prevent the following
* createEphemeralPath method from getting into an infinite loop if this broker is already the controller.
*/
if(leaderId != -1) {
debug("Broker %d has been elected as leader, so stopping the election process.".format(leaderId))
return amILeader
}
try {
val zkCheckedEphemeral = new ZKCheckedEphemeral(electionPath,
electString,
controllerContext.zkUtils.zkConnection.getZookeeper,
JaasUtils.isZkSecurityEnabled())
zkCheckedEphemeral.create()
info(brokerId + " successfully elected as leader")
leaderId = brokerId
onBecomingLeader()
} catch {
case e: ZkNodeExistsException =>
// If someone else has written the path, then
leaderId = getControllerID
if (leaderId != -1)
debug("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId))
else
warn("A leader has been elected but just resigned, this will result in another round of election")
case e2: Throwable =>
error("Error while electing or becoming leader on broker %d".format(brokerId), e2)
resign()
}
amILeader
}
這塊主要進行的是controller的選舉,我們著重看下當前broker被選為controller之後的動作,也就是onBecomingLeader。這塊就需要我們返回到實例化中去看下,這個動作是:onControllerFailover。
def onControllerFailover() {
if (isRunning) {
info("Broker %d starting become controller state transition".format(config.brokerId))
//read controller epoch from zk
readControllerEpochFromZookeeper()
// increment the controller epoch
incrementControllerEpoch(zkUtils.zkClient)
// before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks
registerReassignedPartitionsListener()
registerIsrChangeNotificationListener()
registerPreferredReplicaElectionListener()
partitionStateMachine.registerListeners()
replicaStateMachine.registerListeners()
initializeControllerContext()
replicaStateMachine.startup()
partitionStateMachine.startup()
// register the partition change listeners for all existing topics on failover
controllerContext.allTopics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic))
info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch))
brokerState.newState(RunningAsController)
maybeTriggerPartitionReassignment()
maybeTriggerPreferredReplicaElection()
/ send partition leadership info to all live brokers /
sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
if (config.autoLeaderRebalanceEnable) {
info("starting the partition rebalance scheduler")
autoRebalanceScheduler.startup()
autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance,
5, config.leaderImbalanceCheckIntervalSeconds.toLong, TimeUnit.SECONDS)
}
deleteTopicManager.start()
}
else
info("Controller has been shut down, aborting startup/failover")
}
這裏麵執行的動作很多,我們一一分析。
- 首先從zk中讀取controller的epoch
- 然後將epoch+1後更新到zk中
- 注冊一係列監聽器
- 初始化controller上下文
- 啟動兩個狀態機
- 訂閱所有topic的分區變化監聽器
- 定時檢查觸發分區選舉
- 啟動topic刪除管理器
這裏麵的東西比較多,我們後麵文章再分析。
3.2 controllerElector.startup
def startup {
inLock(controllerContext.controllerLock) {
controllerContext.zkUtils.zkClient.subscribeDataChanges(electionPath, leaderChangeListener)
elect
}
}
這裏的electionPath是/controller,下麵我們看下這個leaderChangeListener。
3.2.1 leaderChangeListener
class LeaderChangeListener extends IZkDataListener with Logging {
/**
* Called when the leader information stored in zookeeper has changed. Record the new leader in memory
* @throws Exception On any error.
*/
@throws(classOf[Exception])
def handleDataChange(dataPath: String, data: Object) {
inLock(controllerContext.controllerLock) {
val amILeaderBeforeDataChange = amILeader
leaderId = KafkaController.parseControllerId(data.toString)
info("New leader is %d".format(leaderId))
// The old leader needs to resign leadership if it is no longer the leader
if (amILeaderBeforeDataChange && !amILeader)
onResigningAsLeader()
}
}
/**
* Called when the leader information stored in zookeeper has been delete. Try to elect as the leader
* @throws Exception
* On any error.
*/
@throws(classOf[Exception])
def handleDataDeleted(dataPath: String) {
inLock(controllerContext.controllerLock) {
debug("%s leader change listener fired for path %s to handle data deleted: trying to elect as a leader"
.format(brokerId, dataPath))
if(amILeader)
onResigningAsLeader()
elect
}
}
}
監聽對應的zk節點,如果節點發生了變化,調用handleDataChange方法,主要內容是獲取當前的leaderId。如果當前broker之前是leader,而新的leader不是自己,那麼就會調用onResigningAsLeader方法,清除之前的leader信息。
如果節點被刪除了,就會調用handleDataDeleted方法。如果當前broker是leader,會首先調用onResigningAsLeader方法,然後發起新的leader選舉。
3.2.2 elect
這邊就是我們的controller即leader選舉方法。與3.1.2的內容一致。
最後更新:2017-11-08 20:35:27