解析Kafka High Available 二
Broker Failover過程
一、Controller對Broker failure的處理過程
1、假設隻有一台broker宕機
2、Controller在ZooKeeper的/brokers/ids節點上注冊Watch。一旦有Broker宕機,其在ZooKeeper對應的Znode會自動被刪除,ZooKeeper會fire Controller注冊的Watch,Controller即可獲取最新的幸存的Broker列表。
3、Controller決定set_p,該集合包含了宕機的Broker上的所有Partition分區
3.1、set_p包含了, 所有topic下麵分布在該broker上的partition分區leader
4、對set_p中的每一個Partition:
4.1、從/brokers/topics/[topic]/partitions/[partition]/state讀取某一個Partition的ISR(容災備份分區列表)。
a、在這裏需要注意的一個問題的是:這裏所指的該broker上的partition,代表的就是某一個topic的其中一個partition分區的leader
b、至於已該broker上的replica(其他partition的備份)是不關心的.
5、決定該Partition的新Leader。如果當前ISR中有至少一個Replica還幸存,則選擇其中一個作為新Leader,新的ISR則包含當前ISR中所有幸存的Replica。否則選擇該Partition中任意一個幸存的Replica作為新的Leader以及ISR(該場景下可能會有潛在的數據丟失)(ISR裏麵隻有一個replica,是被選為leader,因為隻有一個了)。如果該Partition的所有Replica都宕機了,則將新的Leader設置為-1(表示某個topic下麵的某一個partition分區已經完全損壞,該分區不能在使用)。
6、將新的Leader,ISR和新的leader_epoch及controller_epoch寫入/brokers/topics/[topic]/partitions/[partition]/state。注意,該操作隻有Controller版本在3.1至3.3的過程中無變化時才會執行,否則跳轉到3.1。
7、直接通過RPC(遠程網絡調用)向set_p相關的Broker發送LeaderAndISRRequest命令。Controller可以在一個RPC操作中發送多個命令從而提高效率。
二、創建/刪除Topic
1、Controller在ZooKeeper的/brokers/topics節點上注冊Watch,一旦某個Topic被創建或刪除,則Controller會通過Watch得到新創建/刪除的Topic的Partition/Replica分配。
1.1、這句話寫的真的很繞,通俗的說,就是controller在topics節點上注冊了一個哨兵,隻要topics發生改變其發生改變的topic已經啊topic下麵的partition和rpelica都會被controller知道.
2、對於刪除Topic操作,Topic工具會將該Topic名字存於/admin/delete_topics。
2.1、若delete.topic.enable為true,則Controller注冊在/admin/delete_topics上的Watch被fire,Controller通過回調向對應的Broker發送StopReplicaRequest(停止從;leader哪裏進行主從同步),
2.2、若為false則Controller不會在/admin/delete_topics上注冊Watch,也就不會對該事件作出反應。
3、對於創建Topic操作,Controller從/brokers/ids讀取當前所有可用的Broker列表,對於set_p中的每一個Partition:(為什麼是set_p?)
3.1、分配給該Partition(分區)的所有Replica(備份)(稱為AR)中任選一個可用的Broker作為新的Leader,並將AR(備份列表)設置為新的ISR
(因為該Topic是新創建的,所以AR中所有的Replica都沒有數據,可認為它們都是同步的,也即都在ISR中,任意一個Replica都可作為Leader)
4、將新的Leader和ISR寫入/brokers/topics/[topic]/partitions/[partition]
5、直接通過RPC向相關的Broker發送LeaderAndISRRequest。
三、Broker響應請求流程
四個角色
1、acceptor 請求接受這
Acceptor的主要職責是監聽並接受客戶端(請求發起方,包括但不限於Producer,Consumer,Controller,Admin Tool)的連接請求,並建立和客戶端的數據傳輸通道,然後為該客戶端指定一個Processor,至此它對該客戶端該次請求的任務就結束了,它可以去響應下一個客戶端的連接請求了
2、Processor 請求中轉處理者
Processor主要負責從客戶端讀取數據並將響應返回給客戶端,它本身並不處理具體的業務邏輯,並且其內部維護了一個隊列來保存分配給它的所有SocketChannel。Processor的run方法會循環從隊列中取出新的SocketChannel並將其SelectionKey.OP_READ注冊到selector上,然後循環處理已就緒的讀(請求)和寫(響應)。Processor讀取完數據後,將其封裝成Request對象並將其交給RequestChannel。
3、RequestChannel 中轉站
RequestChannel是Processor和KafkaRequestHandler交換數據的地方,它包含一個隊列requestQueue用來存放Processor加入的Request,KafkaRequestHandler會從裏麵取出Request來處理;同時它還包含一個respondQueue,用來存放KafkaRequestHandler處理完Request後返還給客戶端的Response。
4、KafkaRequestHandler 業務處理器
循環從RequestChannel中取Request並交給kafka.server.KafkaApis處理具體的業務邏輯。
Processor會通過processNewResponses方法依次將requestChannel中responseQueue保存的Response取出,並將對應的SelectionKey.OP_WRITE事件注冊到selector上。當selector的select方法返回時,對檢測到的可寫通道,調用write方法將Response返回給客戶端
四、LeaderAndIsrRequest響應過程
對於收到的LeaderAndIsrRequest,Broker主要通過ReplicaManager的becomeLeaderOrFollower處理,流程如下:
1、校驗求中的controllerEpoch是否小於當前最新的controllerEpoch,小於則表示該條請求已經過期(已經有新的controller被選出) return 錯誤 否則繼續
2、對於請求中partitionStateInfos中的每一個元素,即((topic, partitionId), partitionStateInfo):
3、檢查每日一個元素的partitionStateInfo中的leader時間戳,是否小於當前broker所有用的該partition的leader時間戳小,小於則已過期,需要吧錯誤代碼存入Response中,並且continue檢查下一個元素,否則繼續
4、檢查該元素的partitionStatrInfo中的broker_id 是否包含本broker_id,若是不包含,則記錄一個error或者wore,然後continue下一個元素 若是包含 ,則繼續
在一開始搭建HA的時候已經確定好了每一個topic的partition(分區)置於那一部分broker之中,也就是有一個Replica list,所以,要是該list中本就不包含本台broker的id則說明這個分區不應該出現在這個broker上
同樣的也會有一份replica list 來決定每一個partition的備份群歸屬地,所以需要進行校
5、該partition及partitionStateInfo存入一個名為partitionState的HashMap中
6、篩選出partitionState中Leader與當前Broker ID相等的所有記錄存入partitionsTobeLeader中,其它記錄存入partitionsToBeFollower中 區分主從
7、若partitionsTobeLeader不為空,則對其執行makeLeaders方。
8、若partitionsToBeFollower不為空,則對其執行makeFollowers方法。
9、若highwatermak線程還未啟動,則將其啟動,並將hwThreadInitialized設為true。
10、關閉所有Idle狀態的Fetcher。
11、LeaderAndIsrRequest處理過程如下圖所示
五、Broker啟動過程
Broker啟動後首先根據其ID在ZooKeeper的/brokers/idszonde下創建臨時子節點(Ephemeral node),創建成功後Controller的ReplicaStateMachine注冊其上的Broker Change Watch會被fire,從而通過回調KafkaController.onBrokerStartup方法完成以下步驟:
向所有新啟動的Broker發送UpdateMetadataRequest,其定義如下。
將新啟動的Broker上的所有Replica設置為OnlineReplica狀態,同時這些Broker會為這些Partition啟動high watermark線程。
通過partitionStateMachine觸發OnlinePartitionStateChange。
六、Controller Failover
Controller也需要Failover。每個Broker都會在Controller Path (/controller)上注冊一個Watch。當前Controller失敗時,對應的Controller Path會自動消失(因為它是Ephemeral Node),此時該Watch被fire,所有“活”著的Broker都會去競選成為新的Controller(創建新的Controller Path),但是隻會有一個競選成功(這點由ZooKeeper保證)。競選成功者即為新的Leader,競選失敗者則重新在新的Controller Path上注冊Watch。因為ZooKeeper的Watch是一次性的,被fire一次之後即失效,所以需要重新注冊。
Broker成功競選為新Controller後會觸發KafkaController.onControllerFailover方法,並在該方法中完成如下操作:
1、讀取並增加Controller Epoch。
2、在ReassignedPartitions Patch(/admin/reassign_partitions)上注冊Watch。
3、在PreferredReplicaElection Path(/admin/preferred_replica_election)上注冊Watch。
4、通過partitionStateMachine在Broker Topics Patch(/brokers/topics)上注冊Watch。
5、若delete.topic.enable設置為true(默認值是false),則partitionStateMachine在Delete Topic Patch(/admin/delete_topics)上注冊Watch。
6、通過replicaStateMachine在Broker Ids Patch(/brokers/ids)上注冊Watch。
7、初始化ControllerContext對象,設置當前所有Topic,“活”著的Broker列表,所有Partition的Leader及ISR等。
8、啟動replicaStateMachine和partitionStateMachine。
9、將brokerState狀態設置為RunningAsController。
10、將每個Partition的Leadership信息發送給所有“活”著的Broker。
11、若auto.leader.rebalance.enable配置為true(默認值是true),則啟動partition-rebalance線程。
12、若delete.topic.enable設置為true且Delete Topic Patch(/admin/delete_topics)中有值,則刪除相應的Topic。
七、Partition重新分配
管理工具發出重新分配Partition請求後,會將相應信息寫到/admin/reassign_partitions上,而該操作會觸發ReassignedPartitionsIsrChangeListener,從而通過執行回調函數KafkaController.onPartitionReassignment來完成以下操作:
1、將ZooKeeper中的AR(Current Assigned Replicas)更新為OAR(Original list of replicas for partition) + RAR(Reassigned replicas)。
2、強製更新ZooKeeper中的leader epoch,向AR中的每個Replica發送LeaderAndIsrRequest。
3、將RAR - OAR中的Replica設置為NewReplica狀態。
4、等待直到RAR中所有的Replica都與其Leader同步。
5、將RAR中所有的Replica都設置為OnlineReplica狀態。
6、將Cache中的AR設置為RAR。
7、若Leader不在RAR中,則從RAR中重新選舉出一個新的Leader並發送LeaderAndIsrRequest。若新的Leader不是從RAR中選舉而出,則還要增加ZooKeeper中的leader epoch。
8、將OAR - RAR中的所有Replica設置為OfflineReplica狀態,該過程包含兩部分。第一,將ZooKeeper上ISR中的OAR - RAR移除並向Leader發送LeaderAndIsrRequest從而通知這些Replica已經從ISR中移除;第二,向OAR - RAR中的Replica發送StopReplicaRequest從而停止不再分配給該Partition的Replica。
9、將OAR - RAR中的所有Replica設置為NonExistentReplica狀態從而將其從磁盤上刪除。
10、將ZooKeeper中的AR設置為RAR。
11、刪除/admin/reassign_partition。
12、注意:最後一步才將ZooKeeper中的AR更新,因為這是唯一一個持久存儲AR的地方,如果Controller在這一步之前crash,新的Controller仍然能夠繼續完成該過程。
八、Follower從Leader Fetch數據
Follower通過向Leader發送FetchRequest獲取消息
每個Fetch請求都要指定最大等待時間和最小獲取字節數,以及由TopicAndPartition和PartitionFetchInfo構成的Map。實際上,Follower從Leader數據和Consumer從Broker Fetch數據,都是通過FetchRequest請求完成,所以在FetchRequest結構中,其中一個字段是clientID,並且其默認值是ConsumerConfig.DefaultClientId
Leader收到Fetch請求後,Kafka通過KafkaApis.handleFetchRequest響應該請求,響應過程如下
1、replicaManager根據請求讀出數據存入dataRead中。
2、如果該請求來自Follower則更新其相應的LEO(log end offset)以及相應Partition的High Watermark
3、根據dataRead算出可讀消息長度(單位為字節)並存入bytesReadable中。
4、滿足下麵4個條件中的1個,則立即將相應的數據返回
4.1、Fetch請求不希望等待,即fetchRequest.macWait <= 0
4.2、Fetch請求不要求一定能取到消息,即fetchRequest.numPartitions <= 0,也即requestInfo為空
4.3、有足夠的數據可供返回,即bytesReadable >= fetchRequest.minBytes
4.4、讀取數據時發生異常
5、若不滿足以上4個條件,FetchRequest將不會立即返回,並將該請求封裝成DelayedFetch。檢查該DeplayedFetch是否滿足,若滿足則返回請求,否則將該請求加入Watch列表
這是我對篇博客的理解,大部分是一樣的.但是我加了自己學習的批注(括號或者小點)
最後更新:2017-11-15 15:05:07