Replication工具
所有工具都在$KAFKA_HOME/bin目錄下
kafka-topics.sh
該工具可用於創建、刪除、修改、查看某個Topic,也可用於列出所有Topic。另外,該工具還可修改某個Topic的以下配置。
unclean.leader.election.enable
delete.retention.ms
segment.jitter.ms
retention.ms
flush.ms
segment.bytes
flush.messages
segment.ms
retention.bytes
cleanup.policy
segment.index.bytes
min.cleanable.dirty.ratio
max.message.bytes
file.delete.delay.ms
min.insync.replicas
index.interval.bytes
Replica Verification Tool
kafka-replica-verification.sh
該工具用來驗證所指定的一個或多個Topic下每個Partition對應的所有Replica是否都同步。可通過topic-white-list這一參數指定所需要驗證的所有Topic,支持正則表達式。
Preferred Replica Leader Election Tool
用途
有了Replication機製後,每個Partition可能有多個備份。某個Partition的Replica列表叫作AR(Assigned Replicas),AR中的第一個Replica即為“Preferred Replica”。創建一個新的Topic或者給已有Topic增加Partition時,Kafka保證Preferred Replica被均勻分布到集群中的所有Broker上。理想情況下,Preferred Replica會被選為Leader。以上兩點保證了所有Partition的Leader被均勻分布到了集群當中,這一點非常重要,因為所有的讀寫操作都由Leader完成,若Leader分布過於集中,會造成集群負載不均衡。但是,隨著集群的運行,該平衡可能會因為Broker的宕機而被打破,該工具就是用來幫助恢複Leader分配的平衡。
事實上,每個Topic從失敗中恢複過來後,它默認會被設置為Follower角色,除非某個Partition的Replica全部宕機,而當前Broker是該Partition的AR中第一個恢複回來的Replica。因此,某個Partition的Leader(Preferred Replica)宕機並恢複後,它很可能不再是該Partition的Leader,但仍然是Preferred Replica。
原理
1. 在ZooKeeper上創建/admin/preferred_replica_election節點,並存入需要調整Preferred Replica的Partition信息。
2. Controller一直Watch該節點,一旦該節點被創建,Controller會收到通知,並獲取該內容。
3. Controller讀取Preferred Replica,如果發現該Replica當前並非是Leader並且它在該Partition的ISR中,Controller向該Replica發送LeaderAndIsrRequest,使該Replica成為Leader。如果該Replica當前並非是Leader,且不在ISR中,Controller為了保證沒有數據丟失,並不會將其設置為Leader。
用法
kafka-preferred-replica-election.sh --zookeeper localhost:2181
在包含8個Broker的Kafka集群上,創建1個名為topic1,replication-factor為3,Partition數為8的Topic,使用如下命令查看其Partition/Replica分布。
kafka-topics.sh --describe --topic topic1 --zookeeper localhost:2181
查詢結果如下圖所示,從圖中可以看到,Kafka將所有Replica均勻分布到了整個集群,並且Leader也均勻分布。
手動停止部分Broker,topic1的Partition/Replica分布如下圖所示。從圖中可以看到,由於Broker 1/2/4都被停止,Partition 0的Leader由原來的1變為3,Partition 1的Leader由原來的2變為5,Partition 2的Leader由原來的3變為6,Partition 3的Leader由原來的4變為7。
再重新啟動ID為1的Broker,topic1的Partition/Replica分布如下。可以看到,雖然Broker 1已經啟動(Partition 0和Partition5的ISR中有1),但是1並不是任何一個Parititon的Leader,而Broker 5/6/7都是2個Partition的Leader,即Leader的分布不均衡——一個Broker最多是2個Partition的Leader,而最少是0個Partition的Leader。
運行該工具後,topic1的Partition/Replica分布如下圖所示。由圖可見,除了Partition 1和Partition 3由於Broker 2和Broker 4還未啟動,所以其Leader不是其Preferred Repliac外,其它所有Partition的Leader都是其Preferred Replica。同時,與運行該工具前相比,Leader的分配更均勻——一個Broker最多是2個Parittion的Leader,最少是1個Partition的Leader。
啟動Broker 2和Broker 4,Leader分布與上一步相比並未變化,如下圖所示。
再次運行該工具,所有Partition的Leader都由其Preferred Replica承擔,Leader分布更均勻——每個Broker承擔1個Partition的Leader角色。
除了手動運行該工具使Leader分配均勻外,Kafka還提供了自動平衡Leader分配的功能,該功能可通過將auto.leader.rebalance.enable設置為true開啟,它將周期性檢查Leader分配是否平衡,若不平衡度超過一定閾值則自動由Controller嚐試將各Partition的Leader設置為其Preferred Replica。檢查周期由leader.imbalance.check.interval.seconds指定,不平衡度閾值由leader.imbalance.per.broker.percentage指定。
Kafka Reassign Partitions Tool
用途
該工具的設計目標與Preferred Replica Leader Election Tool有些類似,都旨在促進Kafka集群的負載均衡。不同的是,Preferred Replica Leader Election隻能在Partition的AR範圍內調整其Leader,使Leader分布均勻,而該工具還可以調整Partition的AR。
Follower需要從Leader Fetch數據以保持與Leader同步,所以僅僅保持Leader分布的平衡對整個集群的負載均衡來說是不夠的。另外,生產環境下,隨著負載的增大,可能需要給Kafka集群擴容。向Kafka集群中增加Broker非常簡單方便,但是對於已有的Topic,並不會自動將其Partition遷移到新加入的Broker上,此時可用該工具達到此目的。某些場景下,實際負載可能遠小於最初預期負載,此時可用該工具將分布在整個集群上的Partition重裝分配到某些機器上,然後可以停止不需要的Broker從而實現節約資源的目的。
需要說明的是,該工具不僅可以調整Partition的AR位置,還可調整其AR數量,即改變該Topic的replication factor。
原理
該工具隻負責將所需信息存入ZooKeeper中相應節點,然後退出,不負責相關的具體操作,所有調整都由Controller完成。
1. 在ZooKeeper上創建/admin/reassign_partitions節點,並存入目標Partition列表及其對應的目標AR列表。
2. Controller注冊在/admin/reassign_partitions上的Watch被fire,Controller獲取該列表。
3. 對列表中的所有Partition,Controller會做如下操作:
* 啟動RAR - AR中的Replica,即新分配的Replica。(RAR = Reassigned Replicas, AR = Assigned Replicas)
* 等待新的Replica與Leader同步
* 如果Leader不在RAR中,從RAR中選出新的Leader
* 停止並刪除AR - RAR中的Replica,即不再需要的Replica
* 刪除/admin/reassign_partitions節點
用法
該工具有三種使用模式
* generate模式,給定需要重新分配的Topic,自動生成reassign plan(並不執行)
* execute模式,根據指定的reassign plan重新分配Partition
* verify模式,驗證重新分配Partition是否成功
下麵這個例子將使用該工具將Topic的所有Partition重新分配到Broker 4/5/6/7上,步驟如下:
1. 使用generate模式,生成reassign plan
指定需要重新分配的Topic ({"topics":[{"topic":"topic1"}],"version":1}),並存入/tmp/topics-to-move.json文件中,然後執行如下命令
kafka-reassign-partitions.sh --zookeeper localhost:2181
--topics-to-move-json-file /tmp/topics-to-move.json
--broker-list "4,5,6,7" --generate
結果如下圖所示
使用execute模式,執行reassign plan
將上一步生成的reassignment plan存入/tmp/reassign-plan.json文件中,並執行
kafka-reassign-partitions.sh --zookeeper localhost:2181
--reassignment-json-file /tmp/reassign-plan.json --execute
此時,ZooKeeper上/admin/reassign_partitions節點被創建,且其值與/tmp/reassign-plan.json文件的內容一致。使用verify模式,驗證reassign是否完成
執行verify命令
kafka-reassign-partitions.sh --zookeeper localhost:2181
--reassignment-json-file /tmp/reassign-plan.json --verify
結果如下所示,從圖中可看出topic1的所有Partititon都根據reassign plan重新分配成功。
接下來用Topic Tool再次驗證。
kafka-topics.sh --zookeeper localhost:2181 --describe --topic topic1
結果如下圖所示,從圖中可看出topic1的所有Partition都被重新分配到Broker 4/5/6/7,且每個Partition的AR與reassign plan一致。
需要說明的是,在使用execute之前,並不一定要使用generate模式自動生成reassign plan,使用generate模式隻是為了方便。事實上,某些場景下,generate模式生成的reassign plan並不一定能滿足需求,此時用戶可以自己設置reassign plan。
State Change Log Merge Tool
用途
該工具旨在從整個集群的Broker上收集狀態改變日誌,並生成一個集中的格式化的日誌以幫助診斷狀態改變相關的故障。每個Broker都會將其收到的狀態改變相關的的指令存於名為state-change.log的日誌文件中。某些情況下,Partition的Leader election可能會出現問題,此時我們需要對整個集群的狀態改變有個全局的了解從而診斷故障並解決問題。該工具將集群中相關的state-change.log日誌按時間順序合並,同時支持用戶輸入時間範圍和目標Topic及Partition作為過濾條件,最終將格式化的結果輸出。
用法
kafka-run-class.sh kafka.tools.StateChangeLogMerger
--logs /opt/kafka_2.11-0.8.2.1/logs/state-change.log
--topic topic1 --partitions 0,1,2,3,4,5,6,7
最後更新:2017-11-16 15:35:04