閱讀745 返回首頁    go 阿裏雲 go 技術社區[雲棲]


《KAFKA官方文檔》入門指南(四)

1.4生態係統

除了Kafka的主要版本之外,還有很多應用集成了Kafka工具。該生態係統頁麵中列出的許多工具,包括流處理係統,Hadoop的集成,監控和部署工具。

1.5從以前版本升級

0.8.40.9.x0.10.0.x0.10.1.x升級到0.10.2.0

0.10.2.0的有線協議有變化。通過下麵的推薦滾動升級計劃,你能保證在升級過程中無需停機。但是,請在升級之前查看0.10.2.0版本顯著的變化

從0.10.2版本開始,Java客戶端(生產者和消費者)已獲得與舊版本代理服務器溝通的能力。版本0.10.2客戶可以跟0.10.0版或更新版本的代理溝通。但是,如果你的代理比0.10.0老,你必須在升級客戶端之前升級Kafka集群中的所有代理服務器(Broker)。版本0.10.2代理支持0.8.x和更新的客戶端。

對於滾動升級:

  1. 更新所有代理服務器上的server.properties文件,添加以下屬性:
    • inter.broker.protocol.version = CURRENT_KAFKA_VERSION(例如0.8.2,0.9.0,0.10.0或0.10.1)。
    • log.message.format.version = CURRENT_KAFKA_VERSION(參見升級後的潛在性能的影響了解此配置做什麼的詳細信息。)
  2. 逐一升級代理:關閉代理,更新代碼,並重新啟動。
  3. 一旦整個群集升級成功,通過編輯inter.broker.protocol.version將其設置為0.10.2的協議版本。
  4. 如果您以前的消息格式為0.10.0,改變log.message.format.version至0.10.2(這是一個無效操作,因為0.10.0,0.10.1和0.10.2的消息格式相同)。如果您以前的消息格式版本低於0.10.0,不要改變log.message.format.version – 這個參數隻能在所有的消費者都已經升級到0.10.0.0或更高版本之後改動。
  5. 逐一重新啟動代理服務器使新協議版本生效。
  6. 如果這時log.message.format.version仍比0.10.0低,等到所有的消費者都已經升級到0.10.0或更高版本,然後更改每個代理服務器的log.message.format.version到0.10.2,然後逐一重新啟動。

注意:如果你願意接受宕機,你可以簡單地把所有的代理服務器關閉,更新代碼,然後重新啟動他們。他們將默認使用新的協議。

注:改變協議版本並重新啟動可以在代理服務器升級之後的任何時間做,沒有必要必須立刻就做。

升級0.10.1版本的Kafka流應用

  • 從0.10.1升級您的流應用程序到0.10.2不需要升級代理。0.10.2 Kafka流應用程序可以連接到0.10.2和0.10.1代理(但無法連接到 0.10.0的代理)。
  • 你需要重新編譯代碼。隻是替換Kafka流的jar文件將無法正常工作,這破壞你的應用程序。
  • 如果您使用自定義(即用戶實現的)的時間戳提取,則需要更新此代碼,因為TimestampExtractor接口改變了。
  • 如果您注冊了自定義指標,您將需要更新此代碼,因為StreamsMetric接口被改變了。
  • 0.10.2 流 API的變化更多的細節。

0.10.2.1顯著的變化

  • 對於StreamsConfig類的兩個配置的默認值的修改提高了Kafka流應用的彈性。內部Kafka流生產者retries默認值從0變化到10,內部Kafka流消費者max.poll.interval.ms 缺省值從300000到改變Integer.MAX_VALUE。

0.10.2.0顯著的變化

  • 在Java客戶端(生產者和消費者)已獲得與舊版本代理溝通的能力。版本0.10.2客戶端可以跟0.10.0版或更新版本的代理溝通。請注意,某些功能在跟就代理溝通的時候不可用或被限製了。
  • 在Java消費者中有幾種方法現在可能拋出InterruptException如果調用線程被中斷。請參閱KafkaConsumer的Javadoc,對這種變化有一個更深入的解釋。
  • Java的消費者現在被恰當關閉。默認情況下,消費者會等待30秒才能完成掛起的請求。一個帶有timeout參數的新的API已添加到KafkaConsumer去控製最大等待時間。
  • 用逗號分隔的多個正則表達式可以傳遞多個Java消費者給MirrorMaker–whitelist選擇。這使得與MirrorMaker使用老Scala消費者時的行為一致。
  • 從0.10.1升級您的流應用程序0.10.2不需要代理服務器升級。Kafka 0.10.2流應用程序可以連接到0.10.2和0.10.1代理(但無法連接到0.10.0代理)。
  • Zookeeper的依賴從流API中刪除。流API現在使用Kafka協議來管理內部主題,而不是直接修改動物園管理員的主題。這消除了需要直接訪問Zookeeper的特權,而“StreamsConfig.ZOOKEEPER_CONFIG”也不需要在流應用被設置。如果Kafka集群是安全認證的,流應用程序必須具備必要的安全權限才可以創建新的主題。
  • 一些新的參數,包括“security.protocol”, “connections.max.idle.ms”, “retry.backoff.ms”, “reconnect.backoff.ms”和“request.timeout.ms”添加到StreamsConfig類。如果用戶需要設置這些,要注意這些默認值。欲了解更多詳情,請參閱3.5Kafka流CONFIGS
  • 該offsets.topic.replication.factor代理的配置現在在主題生產中強製使用。直到集群的大小符合這個複製因子要求,否則,主題的生產將失敗,返回GROUP_COORDINATOR_NOT_AVAILABLE錯誤。

新的協議版本

  • KIP-88:OffsetFetchRequest v2支持偏移檢索所有的主題,如果topics數組設置為null。
  • KIP-88:OffsetFetchResponse V2引入了頂級error_code域。
  • KIP-103:UpdateMetadataRequest v3引入一個listener_name字段到end_points數組中的元素。
  • KIP-108:CreateTopicsRequest V1引入了一個validate_only參數。
  • KIP-108:CreateTopicsResponse V1引入了error_message到數組topic_errors的元素。

0.8.40.9.x版本或0.10.0.X升級到0.10.1.0

0.10.1.0有線協議發生了變化。通過下麵的推薦滾動升級計劃,能保證在升級過程中無需停機。但是,請注意在升級之前仔細閱讀0.10.1.0潛在的重大更改
注意:由於新協議的引入,它是升級你的客戶端之前請先完成Kafka集群的升級(即0.10.1.x客戶端僅支持0.10.1.x或更高版本的代理,但0.10.1.x的代理可以支持舊版本客戶端)。

對於滾動升級:

  1. 更新所有代理上的server.properties文件,並添加以下屬性:
    • inter.broker.protocol.version = CURRENT_KAFKA_VERSION(例如0.8.2.0,0.9.0.0或0.10.0.0)。
    • log.message.format.version = CURRENT_KAFKA_VERSION(參見升級後的潛在性能的影響對於此配置做什麼的詳細信息。)
  2. 升級代理服務器一次一個:關閉代理,更新代碼,並重新啟動。
  3. 一旦整個群集升級完成,通過編輯inter.broker.protocol.version並將其設置為0.10.1.0的協議版本。
  4. 如果您以前的消息格式為0.10.0,改變log.message.format.version至0.10.1(這是一個無效操作,如果0.10.0和0.10.1兩個協議的消息格式相同)。如果您以前的消息格式版本低於0.10.0,不要改變log.message.format.version — 這個參數隻能在所有的消費者都已經升級到0.10.0.0或更高版本之後修改。
  5. 逐一重新啟動代理,新版本協議生效。
  6. 如果log.message.format.version仍比0.10.0低,等到所有的消費者都已經升級到0.10.0或更高版本,然後更改log.message.format.version到0.10.1,逐一重新啟動代理服務器。

注意:如果你願意接受宕機,你可以簡單地把所有的代理服務器關閉,更新代碼,然後重新啟動他們。他們將默認使用新的協議。

注:改變協議版本並重新啟動可以在代理服務器升級之後的任何時間做,沒有必要必須立刻就做。

0.10.1.0的重大更改

  • 日誌保留時間不再基於日誌段的最後修改時間。相反,它會基於日誌段裏擁有最大的時間戳的消息。
  • 日誌滾動時間不再取決於日誌段創建時間。相反,它現在是基於消息的時間戳。進一步來說,如果日誌段中第一個消息的時間戳是T,當一個新的消息具有的時間戳大於或等於T + log.roll.m,該日誌將被覆蓋。
  • 0.10.0的打開文件的處理程序將增加〜33%,因為每個日誌段增加的時間索引文件。
  • 時間索引和偏移索引共享相同的索引大小的配置。因為時間索引條目大小是1.5倍偏移索引條目的大小。用戶可能需要增加log.index.size.max.bytes以避免潛在的頻繁的日誌滾動。
  • 由於增加的索引文件,在某些代理服務器上具有大量的日誌段(例如> 15K),代理啟動期間日誌加載過程可能很長。根據我們的實驗,num.recovery.threads.per.data.dir設置為1可減少日誌裝載時間。

升級0.10.0Kafka流應用

  • 從0.10.0升級您的流應用程序到0.10.1確實需要一個代理的升級,因為Kafka 0.10.1的流應用程序隻能連接到0.10.1代理。
  • 有幾個API的變化不向後兼容(參見流API在0.10.1的變化有詳細介紹)。因此,你需要更新和重新編譯代碼。隻是交換了Kafka流庫的jar文件將無法正常工作,並會破壞你的應用程序。

0.10.1.0顯著的變化

  • 新的Java消費者不是beta版了,我們推薦它做新的應用開發。老Scala消費者仍然支持,但他們會在未來的版本中將會棄用,並將在未來的主版本中刪除。
  • 在使用像MirrorMaker和控製台消費者新建消費者的過程中–new-consumer/ –new.consumer開關不再被需要; 一個簡單地使用是通過一個Kafka代理去連接,而不是Zookeeper的合集。此外,控製台消費者去連接舊版本的消費者已被棄用,並將在未來的主版本中刪除。
  • Kafka集群現在可以通過一個集群ID被唯一標識。其會在一個代理升級到0.10.1.0時自動生成。集群ID經由kafka.server可用:type= KafkaServer,name= ClusterId metric ,它是所述元數據響應的一部分。串行器,客戶端攔截器和度量報告可以通過實現ClusterResourceListener接口接收集群ID。
  • BrokerState “RunningAsController”(值4)已被刪除。由於一個bug,代理在轉換狀態之前隻會簡單的這種狀態下,因此去除的影響應該很小。一種推薦的檢測方法是一個給定的代理的控製器是由kafka.controller實現:type=KafkaController,name=ActiveControllerCount metric。
  • 新的Java消費者現在可以允許用戶通過時間戳在分區上搜索偏移量(offset)。
  • 新的Java消費者現在可以從後台線程支持心跳檢查。有一個新的配置 max.poll.interval.ms,它控製消費者會主動離開組(5分鍾默認情況下)之前輪詢調用的最大時間。配置的值 request.timeout.ms必須始終大於max.poll.interval.ms因為這是一個JoinGroup請求可以在服務器上被阻止到消費者被負載均衡之前的最長時間.所以我們可以改變默認值為剛好超過5分鍾。最後,默認值session.timeout.ms已調整到10秒,默認值max.poll.records已更改為500。
  • 當授權者和用戶沒有說明某個主題的授權,代理將不再返回TOPIC_AUTHORIZATION_FAILED給請求,因為這會泄漏主題名稱。相反,UNKNOWN_TOPIC_OR_PARTITION錯誤代碼將被返回。使用Kafka生產者和消費者通常會在收到未知的主題錯誤時自動重試,這可能會導致意外的超時或延遲。如果你懷疑這種情況發生了,你可以查看客戶端的log去檢查。
  • 獲取返回有默認的大小限製(消費者50 MB和副本的複製10 MB)。現有的每個分區的限製也適用(消費者和副本複製為1 MB)。請注意,這些限製都不是絕對最大值,在下一個要點有解釋。
  • 消費者和副本可以繼續進行,如果發現一個消息大於返回/分區大小的限製。更具體地,如果在非空的分區上提取的第一個消息比任一個或兩個限值大,仍然會被返回。
  • 重載的構造函數加入到kafka.api.FetchRequest和kafka.javaapi.FetchRequest允許調用者指定分區順序(因為順序在V3是很重要的)。先前存在的構造函數被棄用,在發送請求以避免饑餓問題之前,分區會被洗牌。
  • 轉載自 並發編程網 - ifeve.com

最後更新:2017-05-18 20:36:04

  上一篇:go  Clojure世界:日誌管理——clojure.tools.logging
  下一篇:go  Clojure世界: STM的統計