《KAFKA官方文檔》入門指南(四)
除了Kafka的主要版本之外,還有很多應用集成了Kafka工具。該生態係統頁麵中列出的許多工具,包括流處理係統,Hadoop的集成,監控和部署工具。
從0.8.4,0.9.x,0.10.0.x或0.10.1.x升級到0.10.2.0
0.10.2.0的有線協議有變化。通過下麵的推薦滾動升級計劃,你能保證在升級過程中無需停機。但是,請在升級之前查看0.10.2.0版本顯著的變化。
從0.10.2版本開始,Java客戶端(生產者和消費者)已獲得與舊版本代理服務器溝通的能力。版本0.10.2客戶可以跟0.10.0版或更新版本的代理溝通。但是,如果你的代理比0.10.0老,你必須在升級客戶端之前升級Kafka集群中的所有代理服務器(Broker)。版本0.10.2代理支持0.8.x和更新的客戶端。
對於滾動升級:
- 更新所有代理服務器上的server.properties文件,添加以下屬性:
- inter.broker.protocol.version = CURRENT_KAFKA_VERSION(例如0.8.2,0.9.0,0.10.0或0.10.1)。
- log.message.format.version = CURRENT_KAFKA_VERSION(參見升級後的潛在性能的影響了解此配置做什麼的詳細信息。)
- 逐一升級代理:關閉代理,更新代碼,並重新啟動。
- 一旦整個群集升級成功,通過編輯inter.broker.protocol.version將其設置為0.10.2的協議版本。
- 如果您以前的消息格式為0.10.0,改變log.message.format.version至0.10.2(這是一個無效操作,因為0.10.0,0.10.1和0.10.2的消息格式相同)。如果您以前的消息格式版本低於0.10.0,不要改變log.message.format.version – 這個參數隻能在所有的消費者都已經升級到0.10.0.0或更高版本之後改動。
- 逐一重新啟動代理服務器使新協議版本生效。
- 如果這時log.message.format.version仍比0.10.0低,等到所有的消費者都已經升級到0.10.0或更高版本,然後更改每個代理服務器的log.message.format.version到0.10.2,然後逐一重新啟動。
注意:如果你願意接受宕機,你可以簡單地把所有的代理服務器關閉,更新代碼,然後重新啟動他們。他們將默認使用新的協議。
注:改變協議版本並重新啟動可以在代理服務器升級之後的任何時間做,沒有必要必須立刻就做。
- 從0.10.1升級您的流應用程序到0.10.2不需要升級代理。0.10.2 Kafka流應用程序可以連接到0.10.2和0.10.1代理(但無法連接到 0.10.0的代理)。
- 你需要重新編譯代碼。隻是替換Kafka流的jar文件將無法正常工作,這破壞你的應用程序。
- 如果您使用自定義(即用戶實現的)的時間戳提取,則需要更新此代碼,因為TimestampExtractor接口改變了。
- 如果您注冊了自定義指標,您將需要更新此代碼,因為StreamsMetric接口被改變了。
- 0.10.2 流 API的變化有更多的細節。
- 對於StreamsConfig類的兩個配置的默認值的修改提高了Kafka流應用的彈性。內部Kafka流生產者retries默認值從0變化到10,內部Kafka流消費者max.poll.interval.ms 缺省值從300000到改變Integer.MAX_VALUE。
- 在Java客戶端(生產者和消費者)已獲得與舊版本代理溝通的能力。版本0.10.2客戶端可以跟0.10.0版或更新版本的代理溝通。請注意,某些功能在跟就代理溝通的時候不可用或被限製了。
- 在Java消費者中有幾種方法現在可能拋出InterruptException如果調用線程被中斷。請參閱KafkaConsumer的Javadoc,對這種變化有一個更深入的解釋。
- Java的消費者現在被恰當關閉。默認情況下,消費者會等待30秒才能完成掛起的請求。一個帶有timeout參數的新的API已添加到KafkaConsumer去控製最大等待時間。
- 用逗號分隔的多個正則表達式可以傳遞多個Java消費者給MirrorMaker–whitelist選擇。這使得與MirrorMaker使用老Scala消費者時的行為一致。
- 從0.10.1升級您的流應用程序0.10.2不需要代理服務器升級。Kafka 0.10.2流應用程序可以連接到0.10.2和0.10.1代理(但無法連接到0.10.0代理)。
- Zookeeper的依賴從流API中刪除。流API現在使用Kafka協議來管理內部主題,而不是直接修改動物園管理員的主題。這消除了需要直接訪問Zookeeper的特權,而“StreamsConfig.ZOOKEEPER_CONFIG”也不需要在流應用被設置。如果Kafka集群是安全認證的,流應用程序必須具備必要的安全權限才可以創建新的主題。
- 一些新的參數,包括“security.protocol”, “connections.max.idle.ms”, “retry.backoff.ms”, “reconnect.backoff.ms”和“request.timeout.ms”添加到StreamsConfig類。如果用戶需要設置這些,要注意這些默認值。欲了解更多詳情,請參閱3.5Kafka流CONFIGS。
- 該offsets.topic.replication.factor代理的配置現在在主題生產中強製使用。直到集群的大小符合這個複製因子要求,否則,主題的生產將失敗,返回GROUP_COORDINATOR_NOT_AVAILABLE錯誤。
- KIP-88:OffsetFetchRequest v2支持偏移檢索所有的主題,如果topics數組設置為null。
- KIP-88:OffsetFetchResponse V2引入了頂級error_code域。
- KIP-103:UpdateMetadataRequest v3引入一個listener_name字段到end_points數組中的元素。
- KIP-108:CreateTopicsRequest V1引入了一個validate_only參數。
- KIP-108:CreateTopicsResponse V1引入了error_message到數組topic_errors的元素。
從0.8.4,0.9.x版本或0.10.0.X升級到0.10.1.0
0.10.1.0有線協議發生了變化。通過下麵的推薦滾動升級計劃,能保證在升級過程中無需停機。但是,請注意在升級之前仔細閱讀0.10.1.0潛在的重大更改。
注意:由於新協議的引入,它是升級你的客戶端之前請先完成Kafka集群的升級(即0.10.1.x客戶端僅支持0.10.1.x或更高版本的代理,但0.10.1.x的代理可以支持舊版本客戶端)。
對於滾動升級:
- 更新所有代理上的server.properties文件,並添加以下屬性:
- inter.broker.protocol.version = CURRENT_KAFKA_VERSION(例如0.8.2.0,0.9.0.0或0.10.0.0)。
- log.message.format.version = CURRENT_KAFKA_VERSION(參見升級後的潛在性能的影響對於此配置做什麼的詳細信息。)
- 升級代理服務器一次一個:關閉代理,更新代碼,並重新啟動。
- 一旦整個群集升級完成,通過編輯inter.broker.protocol.version並將其設置為0.10.1.0的協議版本。
- 如果您以前的消息格式為0.10.0,改變log.message.format.version至0.10.1(這是一個無效操作,如果0.10.0和0.10.1兩個協議的消息格式相同)。如果您以前的消息格式版本低於0.10.0,不要改變log.message.format.version — 這個參數隻能在所有的消費者都已經升級到0.10.0.0或更高版本之後修改。
- 逐一重新啟動代理,新版本協議生效。
- 如果log.message.format.version仍比0.10.0低,等到所有的消費者都已經升級到0.10.0或更高版本,然後更改log.message.format.version到0.10.1,逐一重新啟動代理服務器。
注意:如果你願意接受宕機,你可以簡單地把所有的代理服務器關閉,更新代碼,然後重新啟動他們。他們將默認使用新的協議。
注:改變協議版本並重新啟動可以在代理服務器升級之後的任何時間做,沒有必要必須立刻就做。
- 日誌保留時間不再基於日誌段的最後修改時間。相反,它會基於日誌段裏擁有最大的時間戳的消息。
- 日誌滾動時間不再取決於日誌段創建時間。相反,它現在是基於消息的時間戳。進一步來說,如果日誌段中第一個消息的時間戳是T,當一個新的消息具有的時間戳大於或等於T + log.roll.m,該日誌將被覆蓋。
- 0.10.0的打開文件的處理程序將增加〜33%,因為每個日誌段增加的時間索引文件。
- 時間索引和偏移索引共享相同的索引大小的配置。因為時間索引條目大小是1.5倍偏移索引條目的大小。用戶可能需要增加log.index.size.max.bytes以避免潛在的頻繁的日誌滾動。
- 由於增加的索引文件,在某些代理服務器上具有大量的日誌段(例如> 15K),代理啟動期間日誌加載過程可能很長。根據我們的實驗,num.recovery.threads.per.data.dir設置為1可減少日誌裝載時間。
- 從0.10.0升級您的流應用程序到0.10.1確實需要一個代理的升級,因為Kafka 0.10.1的流應用程序隻能連接到0.10.1代理。
- 有幾個API的變化不向後兼容(參見流API在0.10.1的變化有詳細介紹)。因此,你需要更新和重新編譯代碼。隻是交換了Kafka流庫的jar文件將無法正常工作,並會破壞你的應用程序。
- 新的Java消費者不是beta版了,我們推薦它做新的應用開發。老Scala消費者仍然支持,但他們會在未來的版本中將會棄用,並將在未來的主版本中刪除。
- 在使用像MirrorMaker和控製台消費者新建消費者的過程中–new-consumer/ –new.consumer開關不再被需要; 一個簡單地使用是通過一個Kafka代理去連接,而不是Zookeeper的合集。此外,控製台消費者去連接舊版本的消費者已被棄用,並將在未來的主版本中刪除。
- Kafka集群現在可以通過一個集群ID被唯一標識。其會在一個代理升級到0.10.1.0時自動生成。集群ID經由kafka.server可用:type= KafkaServer,name= ClusterId metric ,它是所述元數據響應的一部分。串行器,客戶端攔截器和度量報告可以通過實現ClusterResourceListener接口接收集群ID。
- BrokerState “RunningAsController”(值4)已被刪除。由於一個bug,代理在轉換狀態之前隻會簡單的這種狀態下,因此去除的影響應該很小。一種推薦的檢測方法是一個給定的代理的控製器是由kafka.controller實現:type=KafkaController,name=ActiveControllerCount metric。
- 新的Java消費者現在可以允許用戶通過時間戳在分區上搜索偏移量(offset)。
- 新的Java消費者現在可以從後台線程支持心跳檢查。有一個新的配置 max.poll.interval.ms,它控製消費者會主動離開組(5分鍾默認情況下)之前輪詢調用的最大時間。配置的值 request.timeout.ms必須始終大於max.poll.interval.ms因為這是一個JoinGroup請求可以在服務器上被阻止到消費者被負載均衡之前的最長時間.所以我們可以改變默認值為剛好超過5分鍾。最後,默認值session.timeout.ms已調整到10秒,默認值max.poll.records已更改為500。
- 當授權者和用戶沒有說明某個主題的授權,代理將不再返回TOPIC_AUTHORIZATION_FAILED給請求,因為這會泄漏主題名稱。相反,UNKNOWN_TOPIC_OR_PARTITION錯誤代碼將被返回。使用Kafka生產者和消費者通常會在收到未知的主題錯誤時自動重試,這可能會導致意外的超時或延遲。如果你懷疑這種情況發生了,你可以查看客戶端的log去檢查。
- 獲取返回有默認的大小限製(消費者50 MB和副本的複製10 MB)。現有的每個分區的限製也適用(消費者和副本複製為1 MB)。請注意,這些限製都不是絕對最大值,在下一個要點有解釋。
- 消費者和副本可以繼續進行,如果發現一個消息大於返回/分區大小的限製。更具體地,如果在非空的分區上提取的第一個消息比任一個或兩個限值大,仍然會被返回。
- 重載的構造函數加入到kafka.api.FetchRequest和kafka.javaapi.FetchRequest允許調用者指定分區順序(因為順序在V3是很重要的)。先前存在的構造函數被棄用,在發送請求以避免饑餓問題之前,分區會被洗牌。
-
轉載自 並發編程網 - ifeve.com
最後更新:2017-05-18 20:36:04