kafka詳解三:開發Kafka應用
一、整體看一下Kafka
我們知道,Kafka係統有三大組件:Producer、Consumer、broker 。

producers 生產(produce)消息(message)並推(push)送給brokers,consumers從brokers把消息提取(pull)出來消費(consume)。
二、開發一個Producer應用
Producers用來生產消息並把產生的消息推送到Kafka的Broker。Producers可以是各種應用,比如web應用,服務器端應用,代理應用以及log係統等等。當然,Producers現在有各種語言的實現比如Java、C、Python等。
我們先看一下Producer在Kafka中的角色:

2.1.kafka Producer 的 API
Kafka中和producer相關的API有三個類
- Producer:最主要的類,用來創建和推送消息
- KeyedMessage:定義要發送的消息對象,比如定義發送給哪個topic,partition key和發送的內容等。
- ProducerConfig:配置Producer,比如定義要連接的brokers、partition class、serializer class、partition key等
2.2下麵我們就寫一個最簡單的Producer:產生一條消息並推送給broker
package bonree.producer; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; /******************************************************************************* * BidPlanStructForm.java Created on 2014-7-8 * Author: <a href=mailto:wanghouda@126.com>houda</a> * @Title: SimpleProducer.java * @Package bonree.producer * Description: * Version: 1.0 ******************************************************************************/ public class SimpleProducer { private static Producer<Integer,String> producer; private final Properties props=new Properties(); public SimpleProducer(){ //定義連接的broker list props.put("metadata.broker.list", "192.168.4.31:9092"); //定義序列化類(Java對象傳輸前要序列化) props.put("serializer.class", "kafka.serializer.StringEncoder"); producer = new Producer<Integer, String>(new ProducerConfig(props)); } public static void main(String[] args) { SimpleProducer sp=new SimpleProducer(); //定義topic String topic="mytopic"; //定義要發送給topic的消息 String messageStr = "send a message to broker "; //構建消息對象 KeyedMessage<Integer, String> data = new KeyedMessage<Integer, String>(topic, messageStr); //推送消息到broker producer.send(data); producer.close(); } }
三、開發一個consumer應用
Consumer是用來消費Producer產生的消息的,當然一個Consumer可以是各種應用,如可以是一個實時的分析係統,也可以是一個數據倉庫或者是一個基於發布訂閱模式的解決方案等。Consumer端同樣有多種語言的實現,如Java、C、Python等。
我們看一下Consumer在Kafka中的角色:

3.1.kafka Producer 的 API
Kafka和Producer稍微有些不同,它提供了兩種類型的API
- high-level consumer API:提供了對底層API的抽象,使用起來比較簡單
- simple consumer API:允許重寫底層API的實現,提供了更多的控製權,當然使用起來也複雜一些
由於是第一個應用,我們這部分使用high-level API,它的特點每消費一個message自動移動offset值到下一個message,關於offset在後麵的部分會單獨介紹。與Producer類似,和Consumer相關的有三個主要的類:
- KafkaStream:這裏麵返回的就是Producer生產的消息
- ConsumerConfig:定義要連接zookeeper的一些配置信息(Kafka通過zookeeper均衡壓力,具體請查閱見麵幾篇文章),比如定義zookeeper的URL、group id、連接zookeeper過期時間等。
- ConsumerConnector:負責和zookeeper進行連接等工作
3.2下麵我們就寫一個最簡單的Consumer:從broker中消費一個消息
package bonree.consumer; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; /******************************************************************************* * Created on 2014-7-8 Author: <a * href=mailto:wanghouda@126.com>houda</a> * @Title: SimpleHLConsumer.java * @Package bonree.consumer Description: Version: 1.0 ******************************************************************************/ public class SimpleHLConsumer { private final ConsumerConnector consumer; private final String topic; public SimpleHLConsumer(String zookeeper, String groupId, String topic) { Properties props = new Properties(); //定義連接zookeeper信息 props.put("zookeeper.connect", zookeeper); //定義Consumer所有的groupID,關於groupID,後麵會繼續介紹 props.put("group.id", groupId); props.put("zookeeper.session.timeout.ms", "500"); props.put("zookeeper.sync.time.ms", "250"); props.put("auto.commit.interval.ms", "1000"); consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props)); this.topic = topic; } public void testConsumer() { Map<String, Integer> topicCount = new HashMap<String, Integer>(); //定義訂閱topic數量 topicCount.put(topic, new Integer(1)); //返回的是所有topic的Map Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount); //取出我們要需要的topic中的消息流 List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic); for (final KafkaStream stream : streams) { ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator(); while (consumerIte.hasNext()) System.out.println("Message from Single Topic :: " + new String(consumerIte.next().message())); } if (consumer != null) consumer.shutdown(); } public static void main(String[] args) { String topic = "mytopic"; SimpleHLConsumer simpleHLConsumer = new SimpleHLConsumer("192.168.4.32:2181", "testgroup", topic); simpleHLConsumer.testConsumer(); } }
四、運行查看結果
先啟動服務器端相關進程:
- 運行zookeeper:[root@localhost kafka-0.8]# bin/zookeeper-server-start.sh config/zookeeper.properties
- 運行Kafkabroker:[root@localhost kafka-0.8]# bin/kafka-server-start.sh config/server.properties
再運行我們寫的應用
- 運行剛才寫的SimpleHLConsumer 類的main函數,等待生產者生產消息
- 運行SimpleProducer的main函數,生產消息並push到broker
結果:運行完SimpleProducer後在SimpleHLConsumer的控製台即可看到生產者生產的消息:“send a message to broker”。
最後更新:2017-04-03 05:39:09