Kafka Producer攔截器
Kafka中的攔截器(Interceptor)是0.10.x.x版本引入的一個功能,一共有兩種:Kafka Producer端的攔截器和Kafka Consumer端的攔截器。本篇主要講述的是Kafka Producer端的攔截器,它主要用來對消息進行攔截或者修改,也可以用於Producer的Callback回調之前進行相應的預處理。
使用Kafka Producer端的攔截器非常簡單,主要是實現ProducerInterceptor接口,此接口包含4個方法:
1. ProducerRecord onSend(ProducerRecord record):Producer在將消息序列化和分配分區之前會調用攔截器的這個方法來對消息進行相應的操作。一般來說最好不要修改消息ProducerRecord的topic、key以及partition等信息,如果要修改,也需確保對其有準確的判斷,否則會與預想的效果出現偏差。比如修改key不僅會影響分區的計算,同樣也會影響Broker端日誌壓縮(Log Compaction)的功能。
2. void onAcknowledgement(RecordMetadata metadata, Exception exception):在消息被應答(Acknowledgement)之前或者消息發送失敗時調用,優先於用戶設定的Callback之前執行。這個方法運行在Producer的IO線程中,所以這個方法裏實現的代碼邏輯越簡單越好,否則會影響消息的發送速率。
3. void close():關閉當前的攔截器,此方法主要用於執行一些資源的清理工作。
4. configure(Map configs):用來初始化此類的方法,這個是ProducerInterceptor接口的父接口Configurable中的方法。
一般情況下隻需要關注並實現onSend或者onAcknowledgement方法即可。下麵我們來舉個案例,通過onSend方法來過濾消息體為空的消息以及通過onAcknowledgement方法來計算發送消息的成功率。
public class ProducerInterceptorDemo implements ProducerInterceptor<String,String> {
private volatile long sendSuccess = 0;
private volatile long sendFailure = 0;
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
if(record.value().length()<=0)
return null;
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
if (exception == null) {
sendSuccess++;
} else {
sendFailure ++;
}
}
@Override
public void close() {
double successRatio = (double)sendSuccess / (sendFailure + sendSuccess);
System.out.println("[INFO] 發送成功率="+String.format("%f", successRatio * 100)+"%");
}
@Override
public void configure(Map<String, ?> configs) {}
}
自定義的ProducerInterceptorDemo類實現之後就可以在Kafka Producer的主程序中指定,示例代碼如下:
public class ProducerMain {
public static final String brokerList = "localhost:9092";
public static final String topic = "hidden-topic";
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties properties = new Properties();
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("bootstrap.servers", brokerList);
properties.put("interceptor.classes", "com.hidden.producer.ProducerInterceptorDemo");
Producer<String, String> producer = new KafkaProducer<String, String>(properties);
for(int i=0;i<100;i++) {
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic, "msg-" + i);
producer.send(producerRecord).get();
}
producer.close();
}
}
Kafka Producer不僅可以指定一個攔截器,還可以指定多個攔截器以形成攔截鏈,這個攔截鏈會按照其中的攔截器的加入順序一一執行。比如上麵的程序多添加一個攔截器,示例如下:
properties.put("interceptor.classes", "com.hidden.producer.ProducerInterceptorDemo,com.hidden.producer.ProducerInterceptorDemoPlus");
這樣Kafka Producer會先執行攔截器ProducerInterceptorDemo,之後再執行ProducerInterceptorDemoPlus。
有關interceptor.classes參數,在kafka 1.0.0版本中的定義如下:
NAME | DESCRIPTION | TYPE | DEFAULT | VALID VALUES | IMPORTANCE |
---|---|---|---|---|---|
interceptor.calssses | A list of classes to use as interceptors. Implementing the org.apache.kafka.clients.producer.ProducerInterceptor interface allows you to intercept (and possibly mutate) the records received by the producer before they are published to the Kafka cluster. By default, there no interceptors. | list | null | low |
PS:消息中間件(Kafka、RabbitMQ)交流可加微信:hiddenzzh
歡迎支持筆者新書:《RabbitMQ實戰指南》以及歡迎關注微信公眾號:Kafka技術專欄。
最後更新:2017-11-15 20:05:09
上一篇:
網站定位有多重要?直接決定企業網絡營銷效果!
下一篇:
阿裏雲競價實例發布,GPU雲服務器低至1折
提升Android應用開發性能的十大要點
金融安全資訊精選 2017年第七期:Equifax 泄漏 1.43 億用戶數據,Struts2 REST插件遠程執行命令漏洞全麵分析,阿裏雲護航金磚五國大會
Sql Server substring(expression, start, length)函數
三個小技巧,讓客戶管理變簡單
通過實例模擬ASP.NET MVC的Model綁定的機製:集合+字典
基於MaxCompute平台進行機器學習並展示結果
PL/SQL學習筆記(四)
杭城上演阿裏巴巴“春運”大片……
OSS異常流量排查及防護
聊聊 Laravel 5.5 的 「自動發現」和此刻心情!