【Kafka源碼】broker被選為controller之後的連鎖反應
[TOC]
今天我們主要分析下broker被選為controller之後,主要幹了什麼。門麵代碼先列出來:
def onControllerFailover() {
if (isRunning) {
info("Broker %d starting become controller state transition".format(config.brokerId))
//read controller epoch from zk
readControllerEpochFromZookeeper()
// increment the controller epoch
incrementControllerEpoch(zkUtils.zkClient)
// before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks
registerReassignedPartitionsListener()
registerIsrChangeNotificationListener()
registerPreferredReplicaElectionListener()
partitionStateMachine.registerListeners()
replicaStateMachine.registerListeners()
initializeControllerContext()
replicaStateMachine.startup()
partitionStateMachine.startup()
// register the partition change listeners for all existing topics on failover
controllerContext.allTopics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic))
info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch))
brokerState.newState(RunningAsController)
maybeTriggerPartitionReassignment()
maybeTriggerPreferredReplicaElection()
/ send partition leadership info to all live brokers /
sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
if (config.autoLeaderRebalanceEnable) {
info("starting the partition rebalance scheduler")
autoRebalanceScheduler.startup()
autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance,
5, config.leaderImbalanceCheckIntervalSeconds.toLong, TimeUnit.SECONDS)
}
deleteTopicManager.start()
}
else
info("Controller has been shut down, aborting startup/failover")
}
一個門麵,涉及到的監聽器和其他內容比較多,我們一一分析。
一、controller epoch
首先從zk的節點/controller_epoch下獲取之前的epoch,然後將其+1後持久化到zk中。
二、注冊監聽器
這塊就是訂閱zk的節點信息,如果節點信息有變化,會做出一些操作。
2.1 registerReassignedPartitionsListener
private def registerReassignedPartitionsListener() = {
zkUtils.zkClient.subscribeDataChanges(ZkUtils.ReassignPartitionsPath, partitionReassignedListener)
}
這塊訂閱的路徑是:/admin/reassign_partitions,表示的是分區的重新分配。如果有變化,會有下麵的操作:
/**
* Starts the partition reassignment process unless -
* 1. Partition previously existed
* 2. New replicas are the same as existing replicas
* 3. Any replica in the new set of replicas are dead
* If any of the above conditions are satisfied, it logs an error and removes the partition from list of reassigned
* partitions.
*/
class PartitionsReassignedListener(controller: KafkaController) extends IZkDataListener with Logging {
this.logIdent = "[PartitionsReassignedListener on " + controller.config.brokerId + "]: "
val zkUtils = controller.controllerContext.zkUtils
val controllerContext = controller.controllerContext
/**
* Invoked when some partitions are reassigned by the admin command
* @throws Exception On any error.
*/
@throws(classOf[Exception])
def handleDataChange(dataPath: String, data: Object) {
debug("Partitions reassigned listener fired for path %s. Record partitions to be reassigned %s"
.format(dataPath, data))
//解析zk節點上的數據
val partitionsReassignmentData = zkUtils.parsePartitionReassignmentData(data.toString)
//獲取需要重新分配的分區列表
val partitionsToBeReassigned = inLock(controllerContext.controllerLock) {
partitionsReassignmentData.filterNot(p => controllerContext.partitionsBeingReassigned.contains(p._1))
}
partitionsToBeReassigned.foreach { partitionToBeReassigned =>
inLock(controllerContext.controllerLock) {
//首先判斷topic是否正在等待被刪除,如果是,就把相關的分區從列表中刪除
if (controller.deleteTopicManager.isTopicQueuedUpForDeletion(partitionToBeReassigned._1.topic)) {
error("Skipping reassignment of partition %s for topic %s since it is currently being deleted"
.format(partitionToBeReassigned._1, partitionToBeReassigned._1.topic))
controller.removePartitionFromReassignedPartitions(partitionToBeReassigned._1)
} else {//進行重分配
val context = new ReassignedPartitionsContext(partitionToBeReassigned._2)
controller.initiateReassignReplicasForTopicPartition(partitionToBeReassigned._1, context)
}
}
}
}
/**
* Called when the leader information stored in zookeeper has been delete. Try to elect as the leader
*
* @throws Exception
* On any error.
*/
@throws(classOf[Exception])
def handleDataDeleted(dataPath: String) {
}
}
下麵我們具體看下重新分配的過程,也就是initiateReassignReplicasForTopicPartition裏麵做了什麼。
2.1.1 initiateReassignReplicasForTopicPartition
def initiateReassignReplicasForTopicPartition(topicAndPartition: TopicAndPartition,reassignedPartitionContext: ReassignedPartitionsContext) {
val newReplicas = reassignedPartitionContext.newReplicas
val topic = topicAndPartition.topic
val partition = topicAndPartition.partition
//獲取存活的replica
val aliveNewReplicas = newReplicas.filter(r => controllerContext.liveBrokerIds.contains(r))
try {
val assignedReplicasOpt = controllerContext.partitionReplicaAssignment.get(topicAndPartition)
assignedReplicasOpt match {
case Some(assignedReplicas) =>
if (assignedReplicas == newReplicas) {
throw new KafkaException("Partition %s to be reassigned is already assigned to replicas".format(topicAndPartition) +
" %s. Ignoring request for partition reassignment".format(newReplicas.mkString(",")))
} else {
if (aliveNewReplicas == newReplicas) {
info("Handling reassignment of partition %s to new replicas %s".format(topicAndPartition, newReplicas.mkString(",")))
// first register ISR change listener 監聽ISR變化
watchIsrChangesForReassignedPartition(topic, partition, reassignedPartitionContext)
controllerContext.partitionsBeingReassigned.put(topicAndPartition, reassignedPartitionContext)
// mark topic ineligible for deletion for the partitions being reassigned
deleteTopicManager.markTopicIneligibleForDeletion(Set(topic))
onPartitionReassignment(topicAndPartition, reassignedPartitionContext)
} else {
// some replica in RAR is not alive. Fail partition reassignment
throw new KafkaException("Only %s replicas out of the new set of replicas".format(aliveNewReplicas.mkString(",")) +
" %s for partition %s to be reassigned are alive. ".format(newReplicas.mkString(","), topicAndPartition) +
"Failing partition reassignment")
}
}
case None => throw new KafkaException("Attempt to reassign partition %s that doesn't exist"
.format(topicAndPartition))
}
} catch {
case e: Throwable => error("Error completing reassignment of partition %s".format(topicAndPartition), e)
// remove the partition from the admin path to unblock the admin client
removePartitionFromReassignedPartitions(topicAndPartition)
}
}
我們分析到代碼watchIsrChangesForReassignedPartition時,發現裏麵定義的數據監聽之後,其實也是調用了onPartitionReassignment,所以我們之間看下onPartitionReassignment,這是重新分配的重點。
2.1.2 onPartitionReassignment
這個方法由重新分區監聽器觸發,當admin觸發時,它首先創建/admin/reassign_partitions路徑,以觸發zk監聽器。分區重新分配會經曆下麵幾步:
RAR = Reassigned replicas 重新分配的副本
OAR = Original list of replicas for partition,分區最初的副本列表
AR = current assigned replicas:當前分配的副本
- 1、通過OAR + RAR更新zk中的AR
- 2、發送LeaderAndIsr請求給AR中的每個副本,我們這樣做的目的是強製更新zk中的controller epoch。
- 3、將RAR-OAR中副本狀態變為新副本狀態NewReplica,啟動新副本
- 4、等待RAR中所有副本與leader同步
- 5、將RAR中所有的副本設置為OnlineReplica狀態
- 6、設置AR到RAR的內存中
- 7、如果leader不在RAR中,從RAR中選舉一個leader。如果需要選舉,需要發送LeaderAndIsr請求。如果不是,那麼controller epoch會自增,然後發送LeaderAndIsr請求。在任何情況下,都要保證AR=RAR。防止出現leader把RAR-OAR中的副本加到isr中。
- 8、把OAR-RAR中的副本設為OfflineReplica狀態。當OfflineReplica狀態變化時,我們會移除zk中ISR的OAR-RAR部分,然後發送LeaderAndIsr給leader,通知他ISR的縮減。然後,我們把OAR-RAR的副本狀態改為StopReplica。
- 9、將OAR-RAR中所有的副本狀態改為StopReplica。這會物理刪除這些副本。
- 10、使用RAR更新ZK中的AR
- 11、更新zk節點/admin/reassign_partitions,刪除對應的分區
- 12、選舉完成後,副本和isr信息變化了。重新發送更新源數據的請求給每個broker。
整個過程比較繞,需要仔細理解下,下麵是一個簡單的過程,可以參考。
* For example, if OAR = {1, 2, 3} and RAR = {4,5,6}, the values in the assigned replica (AR) and leader/isr path in ZK
* may go through the following transition.
* AR leader/isr
* {1,2,3} 1/{1,2,3} (initial state)
* {1,2,3,4,5,6} 1/{1,2,3} (step 2)
* {1,2,3,4,5,6} 1/{1,2,3,4,5,6} (step 4)
* {1,2,3,4,5,6} 4/{1,2,3,4,5,6} (step 7)
* {1,2,3,4,5,6} 4/{4,5,6} (step 8)
* {4,5,6} 4/{4,5,6} (step 10)
*
* Note that we have to update AR in ZK with RAR last since it's the only place where we store OAR persistently.
* This way, if the controller crashes before that step, we can still recover.
2.2 registerIsrChangeNotificationListener
注冊路徑/isr_change_notification監聽器。
/**
* Called when leader intimates of isr change
*
* @param controller
*/
class IsrChangeNotificationListener(controller: KafkaController) extends IZkChildListener with Logging {
override def handleChildChange(parentPath: String, currentChildren: util.List[String]): Unit = {
import scala.collection.JavaConverters._
inLock(controller.controllerContext.controllerLock) {
debug("[IsrChangeNotificationListener] Fired!!!")
val childrenAsScala: mutable.Buffer[String] = currentChildren.asScala
try {
val topicAndPartitions: immutable.Set[TopicAndPartition] = childrenAsScala.map(x => getTopicAndPartition(x)).flatten.toSet
if (topicAndPartitions.nonEmpty) {
controller.updateLeaderAndIsrCache(topicAndPartitions)
processUpdateNotifications(topicAndPartitions)
}
} finally {
// delete processed children
childrenAsScala.map(x => controller.controllerContext.zkUtils.deletePath(
ZkUtils.IsrChangeNotificationPath + "/" + x))
}
}
}
主要是更新下leader和isr的緩存,主要是controller的epoch,然後發送更新源數據的請求。
2.3 registerPreferredReplicaElectionListener
監聽/admin/preferred_replica_election路徑的數據,preferred replica在leader掛掉的情況下,會直接被選為leader,也就是就是assigned replicas列表中的第一個replica。
三、分區和副本狀態機
3.1 注冊分區狀態機監聽器
首先是分區狀態機,分區的狀態有以下幾個:
- NonExistentPartition,分區不存在,他的前一個狀態隻能是OfflinePartition
- NewPartition:新分區,還沒有選出leader,前一個狀態為NonExistentPartition
- OnlinePartition:分區上線,leader已經選舉出來了,前一個狀態為NewPartition/OfflinePartition
- OfflinePartition:分區下線,前一個狀態為NewPartition/OnlinePartition
scala // register topic and partition change listeners def registerListeners() { registerTopicChangeListener() if(controller.config.deleteTopicEnable) registerDeleteTopicListener() }
監聽/brokers/topics路徑數據變化,如果允許刪除topic的話,監聽/admin/delete_topics路徑數據變化。
下麵我們看下兩個監聽背後的動作。
3.1.1 registerTopicChangeListener
這塊主要處理了/brokers/topics路徑下一些topic的變化,包括新增和刪除的後續操作。
/**
* This is the zookeeper listener that triggers all the state transitions for a partition
*/
class TopicChangeListener extends IZkChildListener with Logging {
this.logIdent = "[TopicChangeListener on Controller " + controller.config.brokerId + "]: "
@throws(classOf[Exception])
def handleChildChange(parentPath : String, children : java.util.List[String]) {
inLock(controllerContext.controllerLock) {
if (hasStarted.get) {
try {
val currentChildren = {
import JavaConversions._
debug("Topic change listener fired for path %s with children %s".format(parentPath, children.mkString(",")))
(children: Buffer[String]).toSet
}
val newTopics = currentChildren -- controllerContext.allTopics
val deletedTopics = controllerContext.allTopics -- currentChildren
controllerContext.allTopics = currentChildren
val addedPartitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(newTopics.toSeq)
controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p =>
!deletedTopics.contains(p._1.topic))
controllerContext.partitionReplicaAssignment.++=(addedPartitionReplicaAssignment)
info("New topics: [%s], deleted topics: [%s], new partition replica assignment [%s]".format(newTopics,
deletedTopics, addedPartitionReplicaAssignment))
if(newTopics.size > 0)
controller.onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet.toSet)
} catch {
case e: Throwable => error("Error while handling new topic", e )
}
}
}
}
}
3.1.2 registerDeleteTopicListener
監聽zk節點,把需要刪除的topic放到待刪除隊列中,然後由kafka執行刪除,主要刪除的是zk下麵相關的節點,和日誌文件。
3.2 注冊副本狀態機監聽器
副本狀態機,有以下幾種狀態:
- NewReplica:controller在重新分區時會創建新副本,這個狀態下,隻能收到成為follower的請求,前一個狀態是NonExistentReplica。
- OnlineReplica:副本啟動後的狀態,這個狀態下,他可以收到成為leader或follower的請求。前一個狀態可以是NewReplica, OnlineReplica or OfflineReplica。
- OfflineReplica:分區掛掉後的狀態,前一個狀態為NewReplica, OnlineReplica
- ReplicaDeletionStarted:副本刪除開始時的狀態,前一個狀態為OfflineReplica
- ReplicaDeletionSuccessful:副本響應刪除請求時沒有錯誤碼,這時候的狀態,前一個狀態為ReplicaDeletionStarted
- ReplicaDeletionIneligible:副本刪除失敗的狀態,前一個狀態為ReplicaDeletionStarted
- NonExistentReplica:副本刪除成功後的狀態,前一個狀態為ReplicaDeletionSuccessful。
3.2.1 registerBrokerChangeListener
監聽/brokers/ids路徑下的節點變化。主要是broker是否有新增或者刪除,然後做對應的操作。
/**
* This is the zookeeper listener that triggers all the state transitions for a replica
*/
class BrokerChangeListener() extends IZkChildListener with Logging {
this.logIdent = "[BrokerChangeListener on Controller " + controller.config.brokerId + "]: "
def handleChildChange(parentPath : String, currentBrokerList : java.util.List[String]) {
info("Broker change listener fired for path %s with children %s".format(parentPath, currentBrokerList.sorted.mkString(",")))
inLock(controllerContext.controllerLock) {
if (hasStarted.get) {
ControllerStats.leaderElectionTimer.time {
try {
val curBrokers = currentBrokerList.map(_.toInt).toSet.flatMap(zkUtils.getBrokerInfo)
val curBrokerIds = curBrokers.map(_.id)
val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds
val newBrokerIds = curBrokerIds -- liveOrShuttingDownBrokerIds
val deadBrokerIds = liveOrShuttingDownBrokerIds -- curBrokerIds
val newBrokers = curBrokers.filter(broker => newBrokerIds(broker.id))
controllerContext.liveBrokers = curBrokers
val newBrokerIdsSorted = newBrokerIds.toSeq.sorted
val deadBrokerIdsSorted = deadBrokerIds.toSeq.sorted
val liveBrokerIdsSorted = curBrokerIds.toSeq.sorted
info("Newly added brokers: %s, deleted brokers: %s, all live brokers: %s"
.format(newBrokerIdsSorted.mkString(","), deadBrokerIdsSorted.mkString(","), liveBrokerIdsSorted.mkString(",")))
newBrokers.foreach(controllerContext.controllerChannelManager.addBroker)
deadBrokerIds.foreach(controllerContext.controllerChannelManager.removeBroker)
if(newBrokerIds.size > 0)
controller.onBrokerStartup(newBrokerIdsSorted)
if(deadBrokerIds.size > 0)
controller.onBrokerFailure(deadBrokerIdsSorted)
} catch {
case e: Throwable => error("Error while handling broker changes", e)
}
}
}
}
}
}
3.3 初始化controller上下文
這塊主要獲取了一些原始數據,包括topic、分區等等,然後啟動了一些管理器。
private def initializeControllerContext() {
// update controller cache with delete topic information
//存活的brokerId列表
controllerContext.liveBrokers = zkUtils.getAllBrokersInCluster().toSet
//所有的topic
controllerContext.allTopics = zkUtils.getAllTopics().toSet
//所有topic的分區信息
controllerContext.partitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(controllerContext.allTopics.toSeq)
//分區的leader信息
controllerContext.partitionLeadershipInfo = new mutable.HashMap[TopicAndPartition, LeaderIsrAndControllerEpoch]
//已經掛掉的broker列表,默認為空
controllerContext.shuttingDownBrokerIds = mutable.Set.empty[Int]
// update the leader and isr cache for all existing partitions from Zookeeper
updateLeaderAndIsrCache()
// start the channel manager
startChannelManager()
initializePreferredReplicaElection()
initializePartitionReassignment()
initializeTopicDeletion()
info("Currently active brokers in the cluster: %s".format(controllerContext.liveBrokerIds))
info("Currently shutting brokers in the cluster: %s".format(controllerContext.shuttingDownBrokerIds))
info("Current list of topics in the cluster: %s".format(controllerContext.allTopics))
}
前麵幾行已經有了注釋,也比較清楚,下麵我們從startChannelManager開始。這個ChannelManager是什麼?其實就是用於leader與各個broker通信的通道。這個manager也就是管理這些請求的管理器。
這裏主要處理幾種請求:
- LEADER_AND_ISR
- STOP_REPLICA
- UPDATE_METADATA_KEY
這個通道啟動完成後,就是初始化三個動作:
- initializePreferredReplicaElection
- initializePartitionReassignment
- initializeTopicDeletion
## 3.4 副本狀態機監聽器啟動
也就是replicaStateMachine.startup()。這個方法通過讀取zk中的分區信息,把所有的副本狀態改為OnlineReplica。
scala /** * Invoked on successful controller election. First registers a broker change listener since that triggers all * state transitions for replicas. Initializes the state of replicas for all partitions by reading from zookeeper. * Then triggers the OnlineReplica state change for all replicas. */ def startup() { // initialize replica state initializeReplicaState() // set started flag hasStarted.set(true) // move all Online replicas to Online handleStateChanges(controllerContext.allLiveReplicas(), OnlineReplica) info("Started replica state machine with initial state -> " + replicaState.toString()) }
3.5 分區狀態機監聽器啟動
類似於副本狀態機監聽器,這個也是初始化了分區的狀態,然後把分區的狀態變為OnlineState。
/**
* Invoked on successful controller election. First registers a topic change listener since that triggers all
* state transitions for partitions. Initializes the state of partitions by reading from zookeeper. Then triggers
* the OnlinePartition state change for all new or offline partitions.
*/
def startup() {
// initialize partition state
initializePartitionState()
// set started flag
hasStarted.set(true)
// try to move partitions to online state
triggerOnlinePartitionStateChange()
info("Started partition state machine with initial state -> " + partitionState.toString())
}
3.6 自動負載定時器
如果開啟了auto.leader.rebalance.enable參數,那麼就會啟動分區負載定時器。配置中可以設置leader.imbalance.check.interval.seconds參數,表示定時檢查的時間間隔,單位為秒。
if (config.autoLeaderRebalanceEnable) {
info("starting the partition rebalance scheduler")
autoRebalanceScheduler.startup()
autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance,
5, config.leaderImbalanceCheckIntervalSeconds.toLong, TimeUnit.SECONDS)
}
我們可以著重看下checkAndTriggerPartitionRebalance方法。
private def checkAndTriggerPartitionRebalance(): Unit = {
if (isActive()) {
trace("checking need to trigger partition rebalance")
// get all the active brokers
var preferredReplicasForTopicsByBrokers: Map[Int, Map[TopicAndPartition, Seq[Int]]] = null
inLock(controllerContext.controllerLock) {
preferredReplicasForTopicsByBrokers =
controllerContext.partitionReplicaAssignment.filterNot(p => deleteTopicManager.isTopicQueuedUpForDeletion(p._1.topic)).groupBy {
case (topicAndPartition, assignedReplicas) => assignedReplicas.head
}
}
debug("preferred replicas by broker " + preferredReplicasForTopicsByBrokers)
// for each broker, check if a preferred replica election needs to be triggered
preferredReplicasForTopicsByBrokers.foreach {
case (leaderBroker, topicAndPartitionsForBroker) => {
var imbalanceRatio: Double = 0
var topicsNotInPreferredReplica: Map[TopicAndPartition, Seq[Int]] = null
inLock(controllerContext.controllerLock) {
topicsNotInPreferredReplica =
topicAndPartitionsForBroker.filter {
case (topicPartition, replicas) => {
controllerContext.partitionLeadershipInfo.contains(topicPartition) &&
controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader != leaderBroker
}
}
debug("topics not in preferred replica " + topicsNotInPreferredReplica)
val totalTopicPartitionsForBroker = topicAndPartitionsForBroker.size
val totalTopicPartitionsNotLedByBroker = topicsNotInPreferredReplica.size
imbalanceRatio = totalTopicPartitionsNotLedByBroker.toDouble / totalTopicPartitionsForBroker
trace("leader imbalance ratio for broker %d is %f".format(leaderBroker, imbalanceRatio))
}
// check ratio and if greater than desired ratio, trigger a rebalance for the topic partitions
// that need to be on this broker
if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100)) {
topicsNotInPreferredReplica.foreach {
case (topicPartition, replicas) => {
inLock(controllerContext.controllerLock) {
// do this check only if the broker is live and there are no partitions being reassigned currently
// and preferred replica election is not in progress
if (controllerContext.liveBrokerIds.contains(leaderBroker) &&
controllerContext.partitionsBeingReassigned.size == 0 &&
controllerContext.partitionsUndergoingPreferredReplicaElection.size == 0 &&
!deleteTopicManager.isTopicQueuedUpForDeletion(topicPartition.topic) &&
controllerContext.allTopics.contains(topicPartition.topic)) {
onPreferredReplicaElection(Set(topicPartition), true)
}
}
}
}
}
}
}
}
}
}
3.7 啟動刪除topic進程
如果允許程序自動刪除topic的話(delete.topic.enable=true),那麼就會啟動這個進程。
最後更新:2017-11-13 17:34:16