【Kafka】Consumer配置
從0.9.0.0開始,下麵是消費者的配置。
名稱 | 描述 | 類型 | 默認值 |
---|---|---|---|
bootstrap.servers | 消費者初始連接kafka集群時的地址列表。不管這邊配置的什麼地址,消費者會使用所有的kafka集群服務器。消費者會通過這些地址列表,找到所有的kafka集群機器。 | list | |
key.deserializer | 實現了Deserializer的key的反序列化類 | class | |
value.deserializer | 實現了Deserializer的value的反序列化類 | class | |
fetch.min.bytes | 每次請求,kafka返回的最小的數據量。如果數據量不夠,這個請求會等待,直到數據量到達最小指標時,才會返回給消費者。如果設置大於1,會提高kafka的吞吐量,但是會有額外的等待期的代價。 | int | 1 |
group.id | 標識這台消費者屬於那個消費組。如果消費者通過訂閱主題來實現組管理功能,或者使用基於kafka的偏移量管理策略,這個配置是必須的。 | string | "" |
heartbeat.interval.ms | 使用kafka集群管理工具時,消費者協調器之間的預計心跳時間。心跳的作用是確保消費者的session是活躍的,同時當新的機器加入集群或有機器掛掉的情況下觸發再平衡操作。這個配置必須小於heartbeat.interval.ms,而且應該不大於這個值的1/3。為了控製正常的負載均衡的預期時間,這個值可以設置的更小。 | int | 3000 |
max.partition.fetch.bytes | kafka集群每個分區一次返回的最大數據量。一次請求的最大內存使用量應該等於#partitions * max.partition.fetch.bytes。這個值必須與kafka集群允許的最大消息數據量差不多大小,否則可能生產者發送了一個消息,大於消費者配置的值。這種情況下,消費者可能會在獲取那條消息時堵住。 | int | 1048576 |
session.timeout.ms | 使用kafka集群管理工具時檢測失敗的超時時間。如果在session超時時間範圍內,沒有收到消費者的心跳,broker會把這個消費者置為失效,並觸發消費者負載均衡。因為隻有在調用poll方法時才會發送心跳,更大的session超時時間允許消費者在poll循環周期內處理消息內容,盡管這會有花費更長時間檢測失效的代價。如果想控製消費者處理消息的時間,還可以參考max.poll.records。注意這個值的大小應該在group.min.session.timeout.ms和group.max.session.timeout.ms範圍內。 | int | 30000 |
ssl.key.password | 私鑰存儲文件的私鑰密碼。可選配置。 | password | null |
ssl.keystore.location | 私鑰存儲文件的路徑。可選配置,並且可用來作為客戶端的雙向認證。 | string | null |
ssl.keystore.password | 私鑰存儲文件的存儲密碼。可選配置,並且隻有ssl.keystore.location配置的情況下才需要配置。 | password | null |
ssl.truststore.location | 信任秘鑰文件路徑。 | string | null |
ssl.truststore.password | 信任秘鑰文件密碼。 | password | null |
auto.offset.reset | 當kafka的初始偏移量沒了,或者當前的偏移量不存在的情況下,應該怎麼辦?下麵有幾種策略:earliest(將偏移量自動重置為最初的值)、latest(自動將偏移量置為最新的值)、none(如果在消費者組中沒有發現前一個偏移量,就向消費者拋出一個異常)、anything else(向消費者拋出異常) | string | latest |
connections.max.idle.ms | 配置時間後,關閉空閑的連接 | long | 540000 |
enable.auto.commit | 如果設為true,消費者的偏移量會定期在後台提交。 | boolean | true |
exclude.internal.topics | 內部主題(比如偏移量)是否需要暴露給消費者。如果設為true,獲取內部主題消息的途徑就是訂閱他們。 | boolean | true |
max.poll.records | 一次poll調用返回的最大消息數量。 | int | 2147483647 |
partition.assignment.strategy | 使用組管理時,客戶端使用的分區策略的類名,根據這個策略來進行消費分區。 | list | [org.apache.kafka.clients.consumer.RangeAssignor] |
receive.buffer.bytes | SO_RCVBUF讀取數據使用的內存大小。 | int | 65536 |
request.timeout.ms | 這個配置控製一次請求響應的最長等待時間。如果在超時時間內未得到響應,kafka要麼重發這條消息,要麼超過重試次數的情況下直接置為失敗。 | int | 40000 |
最後更新:2017-08-31 14:32:26