閱讀462 返回首頁    go 阿裏雲 go 技術社區[雲棲]


跟我學Kafka之Controller控製器詳解

我們的kafka源碼分享已經進行過很多期了,主要的內容也都分享的差不多了,那麼在今後的分享中,主要集中在kafka性能優化和使用。

Kafka集群中的其中一個Broker會被選舉為Controller,主要負責Partition管理和副本狀態管理,也會執行類似於重分配Partition之類的管理任務。如果當前的Controller失敗,會從其他正常的Broker中重新選舉Controller。

進入KafkaController.scala文件看到如下代碼:

class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerState: BrokerState) extends Logging with KafkaMetricsGroup {
  this.logIdent = "[Controller " + config.brokerId + "]: "
  private var isRunning = true
  private val stateChangeLogger = KafkaController.stateChangeLogger
  val controllerContext = new ControllerContext(zkClient, config.zkSessionTimeoutMs)
  val partitionStateMachine = new PartitionStateMachine(this)
  val replicaStateMachine = new ReplicaStateMachine(this)
  private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover,
    onControllerResignation, config.brokerId)
  // have a separate scheduler for the controller to be able to start and stop independently of the
  // kafka server
  private val autoRebalanceScheduler = new KafkaScheduler(1)
  var deleteTopicManager: TopicDeletionManager = null
  val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext, config)
  private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext)
  private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext)
  private val controlledShutdownPartitionLeaderSelector = new ControlledShutdownLeaderSelector(controllerContext)
  private val brokerRequestBatch = new ControllerBrokerRequestBatch(this)

  private val partitionReassignedListener = new PartitionsReassignedListener(this)
  private val preferredReplicaElectionListener = new PreferredReplicaElectionListener(this)

在KafkaController類中定義了很多屬性,我們先重點了解下麵的PartitionLeaderSelector對象,主要是為分區選舉出leader broker,該trait隻定義了一個方法selectLeader,接收一個TopicAndPartition對象和一個LeaderAndIsr對象。TopicAndPartition表示要選leader的分區,而第二個參數表示zookeeper中保存的該分區的當前leader和ISR記錄。該方法會返回一個元組包括了推舉出來的leader和ISR以及需要接收LeaderAndISr請求的一組副本。

trait PartitionLeaderSelector {  
    def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int])
}

通過我們上麵的代碼,可以看到在KafkaController中共定義了五種selector選舉器:

  • 1、NoOpLeaderSelector
  • 2、OfflinePartitionLeaderSelector
  • 3、ReassignedPartitionLeaderSelector
  • 4、PreferredReplicaPartitionLeaderSelector
  • 5、ControlledShutdownLeaderSelector

我們在解釋這五個選擇器之前,先了解一下在Kafka中Partition的四種狀態:

  • NonExistentPartition —— 這個狀態表示該分區要麼沒有被創建過或曾經被創建過但後麵被刪除了。
  • NewPartition —— 分區創建之後就處於NewPartition狀態。在這個狀態中,分區應該已經分配了副本,但是還沒有選舉出leader和ISR。
  • OnlinePartition —— 一旦分區的leader被推選出來,它就處於OnlinePartition狀態。
  • OfflinePartition —— 如果leader選舉出來後,leader broker宕機了,那麼該分區就處於OfflinePartition狀態。

四種狀態的轉換關係如下:

NonExistentPartition -> NewPartition

  1. 首先將第一個可用的副本broker作為leader broker並把所有可用的副本對象都裝入ISR,然後寫leader和ISR信息到zookeeper中保存
  2. 對於這個分區而言,發送LeaderAndIsr請求到每個可用的副本broker,以及UpdateMetadata請求到每個可用的broker上

OnlinePartition, OfflinePartition -> OnlinePartition

為該分區選取新的leader和ISR以及接收LeaderAndIsr請求的一組副本,然後寫入leader和ISR信息到zookeeper中保存。

NewPartition, OnlinePartition -> OfflinePartition

標記分區狀態為離線(offline)。

OfflinePartition -> NonExistentPartition

離線狀態標記為不存在分區,表示該分區失敗或者被刪除。

在介紹完最基本的概念之後,下麵我們將重點介紹上麵提到過的五種選舉器:
1、ReassignedPartitionLeaderSelector
從可用的ISR中選取第一個作為leader,把當前的ISR作為新的ISR,將重分配的副本集合作為接收LeaderAndIsr請求的副本集合。
2、PreferredReplicaPartitionLeaderSelector
如果從assignedReplicas取出的第一個副本就是分區leader的話,則拋出異常,否則將第一個副本設置為分區leader。
3、ControlledShutdownLeaderSelector
將ISR中處於關閉狀態的副本從集合中去除掉,返回一個新新的ISR集合,然後選取第一個副本作為leader,然後令當前AR作為接收LeaderAndIsr請求的副本。
4、NoOpLeaderSelector
原則上不做任何事情,返回當前的leader和isr。
5、OfflinePartitionLeaderSelector
從活著的ISR中選擇一個broker作為leader,如果ISR中沒有活著的副本,則從assignedReplicas中選擇一個副本作為leader,leader選舉成功後注冊到Zookeeper中,並更新所有的緩存。

所有的leader選擇完成後,都要通過請求把具體的request路由到對應的handler處理。目前kafka並沒有把handler抽象出來,而是每個handler都是一個函數,混在KafkaApi類中。
其實也就是如下的代碼:

def handle(request: RequestChannel.Request) {  
  try{  
    trace("Handling request: " + request.requestObj + " from client: " + request.remoteAddress)  
    request.requestId match {  
      case RequestKeys.ProduceKey => handleProducerRequest(request)  // producer  
      case RequestKeys.FetchKey => handleFetchRequest(request)       // consumer  
      case RequestKeys.OffsetsKey => handleOffsetRequest(request)  
      case RequestKeys.MetadataKey => handleTopicMetadataRequest(request)  
      case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request) //成為leader或follower設置同步副本組信息  
      case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)  
      case RequestKeys.UpdateMetadataKey => handleUpdateMetadataRequest(request)  
      case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request)  //shutdown broker  
      case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request)  
      case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request)  
      case requestId => throw new KafkaException("Unknown api code " + requestId)  
    }  
  } catch {  
    case e: Throwable =>  
      request.requestObj.handleError(e, requestChannel, request)  
      error("error when handling request %s".format(request.requestObj), e)  
  } finally  
    request.apiLocalCompleteTimeMs = SystemTime.milliseconds  
}

這裏麵的每個請求在上麵給出的鏈接的文章中都有過解釋說明,在這裏不多解釋。

RequestKeys.LeaderAndIsr詳細分析
在上麵的代碼中咱們看到ReequestKeys.LeaderAndlst對應的方法其實是KeyhandleLeaderAndIsrRequest。

def handleLeaderAndIsrRequest(request: RequestChannel.Request) {
    // ensureTopicExists is only for client facing requests
    // We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they
    // stop serving data to clients for the topic being deleted
    val leaderAndIsrRequest = request.requestObj.asInstanceOf[LeaderAndIsrRequest]
    try {
      val (response, error) = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest, offsetManager)
      val leaderAndIsrResponse = new LeaderAndIsrResponse(leaderAndIsrRequest.correlationId, response, error)
      requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(leaderAndIsrResponse)))
    } catch {
      case e: KafkaStorageException =>
        fatal("Disk error during leadership change.", e)
        Runtime.getRuntime.halt(1)
    }
  }

將request.requestObj轉換成LeaderAndIstRequest對象類型。

Sample Flowchart Template.png

流程圖說明

1、如果請求中controllerEpoch小於當前最新的controllerEpoch,則直接返回ErrorMapping.StaleControllerEpochCode。

2、如果partitionStateInfo中的leader epoch大於當前ReplicManager中存儲的(topic, partitionId)對應的partition的leader epoch,則:

2.1、如果當前brokerid(或者說replica id)在partitionStateInfo中,則將該partition及partitionStateInfo存入一個名為partitionState的HashMap中。
否則說明該Broker不在該Partition分配的Replica list中,將該信息記錄於log中

3、如果partitionStateInfo中的leader epoch小於當前ReplicManager則將相應的Error code(ErrorMapping.StaleLeaderEpochCode)存入Response中。

4、篩選出partitionState中Leader與當前Broker ID相等的所有記錄存入partitionsTobeLeader中,其它記錄存入partitionsToBeFollower中。
如果partitionsTobeLeader不為空,則對其執行makeLeaders方。
如果partitionsToBeFollower不為空,則對其執行makeFollowers方法。

最後更新:2017-05-19 17:32:06

  上一篇:go  跟我學Kafka之NIO通信機製
  下一篇:go  Akka簡單性能分析