閱讀794 返回首頁    go 阿裏雲 go 技術社區[雲棲]


解析Kafka High Available 二

Broker Failover過程

一、Controller對Broker failure的處理過程

    1、假設隻有一台broker宕機
    2、Controller在ZooKeeper的/brokers/ids節點上注冊Watch。一旦有Broker宕機,其在ZooKeeper對應的Znode會自動被刪除,ZooKeeper會fire Controller注冊的Watch,Controller即可獲取最新的幸存的Broker列表。
    3、Controller決定set_p,該集合包含了宕機的Broker上的所有Partition分區
        3.1、set_p包含了, 所有topic下麵分布在該broker上的partition分區leader
    4、對set_p中的每一個Partition:

        4.1、從/brokers/topics/[topic]/partitions/[partition]/state讀取某一個Partition的ISR(容災備份分區列表)。
            a、在這裏需要注意的一個問題的是:這裏所指的該broker上的partition,代表的就是某一個topic的其中一個partition分區的leader
            b、至於已該broker上的replica(其他partition的備份)是不關心的.
    5、決定該Partition的新Leader。如果當前ISR中有至少一個Replica還幸存,則選擇其中一個作為新Leader,新的ISR則包含當前ISR中所有幸存的Replica。否則選擇該Partition中任意一個幸存的Replica作為新的Leader以及ISR(該場景下可能會有潛在的數據丟失)(ISR裏麵隻有一個replica,是被選為leader,因為隻有一個了)。如果該Partition的所有Replica都宕機了,則將新的Leader設置為-1(表示某個topic下麵的某一個partition分區已經完全損壞,該分區不能在使用)。
    6、將新的Leader,ISR和新的leader_epoch及controller_epoch寫入/brokers/topics/[topic]/partitions/[partition]/state。注意,該操作隻有Controller版本在3.1至3.3的過程中無變化時才會執行,否則跳轉到3.1。
    7、直接通過RPC(遠程網絡調用)向set_p相關的Broker發送LeaderAndISRRequest命令。Controller可以在一個RPC操作中發送多個命令從而提高效率。

二、創建/刪除Topic

    1、Controller在ZooKeeper的/brokers/topics節點上注冊Watch,一旦某個Topic被創建或刪除,則Controller會通過Watch得到新創建/刪除的Topic的Partition/Replica分配。
        1.1、這句話寫的真的很繞,通俗的說,就是controller在topics節點上注冊了一個哨兵,隻要topics發生改變其發生改變的topic已經啊topic下麵的partition和rpelica都會被controller知道.
    2、對於刪除Topic操作,Topic工具會將該Topic名字存於/admin/delete_topics。
        2.1、若delete.topic.enable為true,則Controller注冊在/admin/delete_topics上的Watch被fire,Controller通過回調向對應的Broker發送StopReplicaRequest(停止從;leader哪裏進行主從同步),
        2.2、若為false則Controller不會在/admin/delete_topics上注冊Watch,也就不會對該事件作出反應。
    3、對於創建Topic操作,Controller從/brokers/ids讀取當前所有可用的Broker列表,對於set_p中的每一個Partition:(為什麼是set_p?)
        3.1、分配給該Partition(分區)的所有Replica(備份)(稱為AR)中任選一個可用的Broker作為新的Leader,並將AR(備份列表)設置為新的ISR
            (因為該Topic是新創建的,所以AR中所有的Replica都沒有數據,可認為它們都是同步的,也即都在ISR中,任意一個Replica都可作為Leader)
    4、將新的Leader和ISR寫入/brokers/topics/[topic]/partitions/[partition]
    5、直接通過RPC向相關的Broker發送LeaderAndISRRequest。

三、Broker響應請求流程

      四個角色
        1、acceptor 請求接受這 
                Acceptor的主要職責是監聽並接受客戶端(請求發起方,包括但不限於Producer,Consumer,Controller,Admin Tool)的連接請求,並建立和客戶端的數據傳輸通道,然後為該客戶端指定一個Processor,至此它對該客戶端該次請求的任務就結束了,它可以去響應下一個客戶端的連接請求了
        2、Processor 請求中轉處理者                                 
            Processor主要負責從客戶端讀取數據並將響應返回給客戶端,它本身並不處理具體的業務邏輯,並且其內部維護了一個隊列來保存分配給它的所有SocketChannel。Processor的run方法會循環從隊列中取出新的SocketChannel並將其SelectionKey.OP_READ注冊到selector上,然後循環處理已就緒的讀(請求)和寫(響應)。Processor讀取完數據後,將其封裝成Request對象並將其交給RequestChannel。
        3、RequestChannel 中轉站
            RequestChannel是Processor和KafkaRequestHandler交換數據的地方,它包含一個隊列requestQueue用來存放Processor加入的Request,KafkaRequestHandler會從裏麵取出Request來處理;同時它還包含一個respondQueue,用來存放KafkaRequestHandler處理完Request後返還給客戶端的Response。
        4、KafkaRequestHandler 業務處理器
            循環從RequestChannel中取Request並交給kafka.server.KafkaApis處理具體的業務邏輯。
    Processor會通過processNewResponses方法依次將requestChannel中responseQueue保存的Response取出,並將對應的SelectionKey.OP_WRITE事件注冊到selector上。當selector的select方法返回時,對檢測到的可寫通道,調用write方法將Response返回給客戶端

四、LeaderAndIsrRequest響應過程

    對於收到的LeaderAndIsrRequest,Broker主要通過ReplicaManager的becomeLeaderOrFollower處理,流程如下:
        1、校驗求中的controllerEpoch是否小於當前最新的controllerEpoch,小於則表示該條請求已經過期(已經有新的controller被選出) return 錯誤 否則繼續
        2、對於請求中partitionStateInfos中的每一個元素,即((topic, partitionId), partitionStateInfo):
        3、檢查每日一個元素的partitionStateInfo中的leader時間戳,是否小於當前broker所有用的該partition的leader時間戳小,小於則已過期,需要吧錯誤代碼存入Response中,並且continue檢查下一個元素,否則繼續
        4、檢查該元素的partitionStatrInfo中的broker_id 是否包含本broker_id,若是不包含,則記錄一個error或者wore,然後continue下一個元素 若是包含 ,則繼續
            在一開始搭建HA的時候已經確定好了每一個topic的partition(分區)置於那一部分broker之中,也就是有一個Replica list,所以,要是該list中本就不包含本台broker的id則說明這個分區不應該出現在這個broker上
            同樣的也會有一份replica list 來決定每一個partition的備份群歸屬地,所以需要進行校
        5、該partition及partitionStateInfo存入一個名為partitionState的HashMap中
        6、篩選出partitionState中Leader與當前Broker ID相等的所有記錄存入partitionsTobeLeader中,其它記錄存入partitionsToBeFollower中 區分主從
        7、若partitionsTobeLeader不為空,則對其執行makeLeaders方。
        8、若partitionsToBeFollower不為空,則對其執行makeFollowers方法。
        9、若highwatermak線程還未啟動,則將其啟動,並將hwThreadInitialized設為true。
        10、關閉所有Idle狀態的Fetcher。
        11、LeaderAndIsrRequest處理過程如下圖所示

五、Broker啟動過程

      Broker啟動後首先根據其ID在ZooKeeper的/brokers/idszonde下創建臨時子節點(Ephemeral node),創建成功後Controller的ReplicaStateMachine注冊其上的Broker Change Watch會被fire,從而通過回調KafkaController.onBrokerStartup方法完成以下步驟:
      向所有新啟動的Broker發送UpdateMetadataRequest,其定義如下。
      將新啟動的Broker上的所有Replica設置為OnlineReplica狀態,同時這些Broker會為這些Partition啟動high watermark線程。
      通過partitionStateMachine觸發OnlinePartitionStateChange。

六、Controller Failover

      Controller也需要Failover。每個Broker都會在Controller Path (/controller)上注冊一個Watch。當前Controller失敗時,對應的Controller Path會自動消失(因為它是Ephemeral Node),此時該Watch被fire,所有“活”著的Broker都會去競選成為新的Controller(創建新的Controller Path),但是隻會有一個競選成功(這點由ZooKeeper保證)。競選成功者即為新的Leader,競選失敗者則重新在新的Controller Path上注冊Watch。因為ZooKeeper的Watch是一次性的,被fire一次之後即失效,所以需要重新注冊。
      Broker成功競選為新Controller後會觸發KafkaController.onControllerFailover方法,並在該方法中完成如下操作:
        1、讀取並增加Controller Epoch。
        2、在ReassignedPartitions Patch(/admin/reassign_partitions)上注冊Watch。
        3、在PreferredReplicaElection Path(/admin/preferred_replica_election)上注冊Watch。
        4、通過partitionStateMachine在Broker Topics Patch(/brokers/topics)上注冊Watch。
        5、若delete.topic.enable設置為true(默認值是false),則partitionStateMachine在Delete Topic Patch(/admin/delete_topics)上注冊Watch。
        6、通過replicaStateMachine在Broker Ids Patch(/brokers/ids)上注冊Watch。
        7、初始化ControllerContext對象,設置當前所有Topic,“活”著的Broker列表,所有Partition的Leader及ISR等。
        8、啟動replicaStateMachine和partitionStateMachine。
        9、將brokerState狀態設置為RunningAsController。
        10、將每個Partition的Leadership信息發送給所有“活”著的Broker。
        11、若auto.leader.rebalance.enable配置為true(默認值是true),則啟動partition-rebalance線程。
          12、若delete.topic.enable設置為true且Delete Topic Patch(/admin/delete_topics)中有值,則刪除相應的Topic。

七、Partition重新分配

    管理工具發出重新分配Partition請求後,會將相應信息寫到/admin/reassign_partitions上,而該操作會觸發ReassignedPartitionsIsrChangeListener,從而通過執行回調函數KafkaController.onPartitionReassignment來完成以下操作:
        1、將ZooKeeper中的AR(Current Assigned Replicas)更新為OAR(Original list of replicas for partition) + RAR(Reassigned replicas)。
        2、強製更新ZooKeeper中的leader epoch,向AR中的每個Replica發送LeaderAndIsrRequest。
        3、將RAR - OAR中的Replica設置為NewReplica狀態。
        4、等待直到RAR中所有的Replica都與其Leader同步。
        5、將RAR中所有的Replica都設置為OnlineReplica狀態。
        6、將Cache中的AR設置為RAR。
        7、若Leader不在RAR中,則從RAR中重新選舉出一個新的Leader並發送LeaderAndIsrRequest。若新的Leader不是從RAR中選舉而出,則還要增加ZooKeeper中的leader epoch。
        8、將OAR - RAR中的所有Replica設置為OfflineReplica狀態,該過程包含兩部分。第一,將ZooKeeper上ISR中的OAR - RAR移除並向Leader發送LeaderAndIsrRequest從而通知這些Replica已經從ISR中移除;第二,向OAR -                       RAR中的Replica發送StopReplicaRequest從而停止不再分配給該Partition的Replica。
        9、將OAR - RAR中的所有Replica設置為NonExistentReplica狀態從而將其從磁盤上刪除。
        10、將ZooKeeper中的AR設置為RAR。
        11、刪除/admin/reassign_partition。
        12、注意:最後一步才將ZooKeeper中的AR更新,因為這是唯一一個持久存儲AR的地方,如果Controller在這一步之前crash,新的Controller仍然能夠繼續完成該過程。

八、Follower從Leader Fetch數據

    Follower通過向Leader發送FetchRequest獲取消息             
        每個Fetch請求都要指定最大等待時間和最小獲取字節數,以及由TopicAndPartition和PartitionFetchInfo構成的Map。實際上,Follower從Leader數據和Consumer從Broker Fetch數據,都是通過FetchRequest請求完成,所以在FetchRequest結構中,其中一個字段是clientID,並且其默認值是ConsumerConfig.DefaultClientId
    Leader收到Fetch請求後,Kafka通過KafkaApis.handleFetchRequest響應該請求,響應過程如下
        1、replicaManager根據請求讀出數據存入dataRead中。
        2、如果該請求來自Follower則更新其相應的LEO(log end offset)以及相應Partition的High Watermark
        3、根據dataRead算出可讀消息長度(單位為字節)並存入bytesReadable中。
        4、滿足下麵4個條件中的1個,則立即將相應的數據返回
            4.1、Fetch請求不希望等待,即fetchRequest.macWait <= 0
            4.2、Fetch請求不要求一定能取到消息,即fetchRequest.numPartitions <= 0,也即requestInfo為空
            4.3、有足夠的數據可供返回,即bytesReadable >= fetchRequest.minBytes
            4.4、讀取數據時發生異常
        5、若不滿足以上4個條件,FetchRequest將不會立即返回,並將該請求封裝成DelayedFetch。檢查該DeplayedFetch是否滿足,若滿足則返回請求,否則將該請求加入Watch列表

這是我對篇博客的理解,大部分是一樣的.但是我加了自己學習的批注(括號或者小點)

最後更新:2017-11-15 15:05:07

  上一篇:go  Yann LeCun說是時候放棄概率論了,因果關係才是理解世界的基石
  下一篇:go  拿著錘子找釘子,數字芯片領導者比特大陸進軍人工智能