846
王者榮耀
《KAFKA官方文檔》第三章:快速入門(一)
快速入門
- 翻譯者:kimmking@163.com
- 原文:kafka.apache.org/quickstart
本教程假設讀者完全從零開始,電腦上沒有已經存在的Kafka和Zookeeper環境。以下內容需要注意的是:因為在類Unix平台和Windows平台上的Kafka控製腳本不同,在Windows平台上,需要使用路徑\bin\windows
代替/bin
,腳本擴展名改為.bat
。
第一步:下載kafka
下載Kafka 0.10.2.0版本 並解壓:
>tar -xzf kafka_2.11-0.10.2.0.tgz
>cd kafka_2.11-0.10.2.0
第二步:啟動kafka服務端
Kafka中使用了Zookeeper,所以我們需要先啟動一個Zookeeper服務端。我們可以使用kafka中已經打包好的腳本方便的完成這個操作,快遞啟動一個單節點的Zookeeper實例。
>bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
…
然後啟動kafka服務端:
>bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
…
第三步:創建主題
現在我們創建一個單一分區(partition)並且隻有單一複製(replica)的主題,名字叫test
:
>bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic test
我們可以使用如下命令列出主題列表:
>bin/kafka-topics.sh –list –zookeeper localhost:2181
test
當然,我們也可以通過在服務端(broker)配置自動創建主題的選項,這樣當有消息發送到一個不存在的主題時係統會自動創建它。
第四步:發送消息
Kafka自帶了一個命令行工具,它可以從一個文件或標準輸入流發送消息到Kafka集群。默認情況下,每一行內容將被當做一個單獨的消息。 運行以下生產者腳本,然後通過控製台輸入一些字符,即可作為消息發送到服務端:
>bin/kafka-console-producer.sh –broker-list localhost:9092 –topic test
This is a message
This is another message
第五步:啟動消費者
Kafka同樣也提供了一個消費者腳本,它可以消費掉消息並輸出到命令行標準輸出流(STDOUT):
>bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic test –from-beginning
This is a message
This is another message
如果我們在不同的終端窗口運行如上的兩個命令,這時就可以在消息生產者窗口輸入內容,然後在消費者窗口看到它。
所有的命令行工具都有額外的參數,運行命令時不帶任何參數即可顯示出參數信息詳情。
第六步:啟動一個多broker集群
到目前為止,我們隻啟動了一個單broker。對於Kafka來說,一個單broker也是一個集群,隻不過集群的大小是1。其實我們啟動一個多broker集群的話,並不會複雜多少。現在我們來嚐試一下,如何在同一個機器上啟動3個broker節點的集群。
首先,我們為每一個broker創建一個配置文件(Windows上使用copy命令代替cp)。
>cp config/server.properties config/server-1.properties
>cp config/server.properties config/server-2.properties
按如下內容編輯各個配置文件:
“` config/server-1.properties: broker.id=1 listeners=PLAINTEXT://:9093 log.dir=/tmp/kafka-logs-1
config/server-2.properties: broker.id=2 listeners=PLAINTEXT://:9094 log.dir=/tmp/kafka-logs-2 “`
其中broker.id屬性是每個節點在集群中唯一的名字。端口和日誌存儲目錄則由於我們這幾個節點都在同一台機器上啟動而必須要修改。 前麵的步驟裏我們已經有了一個啟動好的單節點kafka和Zookeeper,現在我們隻需要啟動這兩個新配置的節點:
>bin/kafka-server-start.sh config/server-1.properties &
>bin/kafka-server-start.sh config/server-2.properties &
現在我們創建一個複製因子為3的新主題my-replicated-topic:
>bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 3 –partitions 1 –topic my-replicated-topic
創建完成,但是怎麼才能知道主題被創建在整個集群中的哪個broker上了呢?事實上我們可以使用如下顯示主題描述信息的命令:
>bin/kafka-topics.sh –describe –zookeeper localhost:2181 –topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic:my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
此時會輸出主題的描述信息:第一行給出了所有分區信息的摘要,接下來的每一行則給出具體的每一個分區信息。因為我們前麵創建的這個主題隻有一個分區,所以就隻展示了一行。
“leader”節點 “leader”節點負責響應給定節點的所有讀寫操作。每個節點都可能成為所有分區中一個隨機選擇分區的leader。 “replicas”是複製當前分區的節點列表,無論這些節點是不是leader、是不是可用。 “isr”是目前處於同步狀態的replicas集合。它是replicas列表的子集,其中僅包含當前可用並且與leader同步的節點。 注意上述例子中,編號為1的節點是這個隻有一個分區的主題的leader。
我們可以在最開始創建的主題上運行同樣的命令,看看會發生什麼:
>bin/kafka-topics.sh –describe –zookeeper localhost:2181 –topic test
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
結果是很明顯的:原來的主題在編號為0的服務端上,它是我們創建的這個集群上唯一的服務端,沒有複製節點(replicas)。
現在我們來發布一些消息到新創建的主題:
>bin/kafka-console-producer.sh –broker-list localhost:9092 –topic my-replicated-topic
my test message 1
my test message 2
^C
然後消費這些消息:
>bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –from-beginning –topic my-replicated-topic
my test message 1
my test message 2
^C
接著我們來測試一下容錯性。編號為1的broker現在是leader,我們把它kill掉:
>ps aux | grep server-1.properties
7564 ttys002 0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java…
>kill -9 7564
在Windows上使用:
On Windows use:
>wmic process get processid,caption,commandline | find “java.exe” | find “server-1.properties”
java.exe java -Xmx1G -Xms1G -server -XX:+UseG1GC … build\libs\kafka_2.10-0.10.2.0.jar” kafka.Kafka config\server-1.properties 644
taskkill /pid 644 /f
主節點直接切換到其中的一個從節點,並且編號為1的節點不再位於同步複製節點集合了:
>bin/kafka-topics.sh –describe –zookeeper localhost:2181 –topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replated-topic Partition: 0 Leader: 2 Replicas: 1,2,0 Isr: 2,0
但是消息現在仍然對消費者可用,盡管負責處理寫消息的主節點已經宕掉了:
>bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –from-beginning –topic my-replicated-topic
my test message 1
my test message 2
^C
第七步:使用Kafka連接器(Kafka Connect)導入導出數據
使用控製台讀寫數據固然方便,但是有時我們還是希望從其他數據源導入數據,或者導出數據到其他係統。很多時候我們可以無需通過編寫集成代碼,僅僅使用Kafka連接器就可以實現數據導入導出。
Kafka連接器是一個用於從Kafka導入導出數據的工具。它可以通過擴展實現自定義邏輯,或者直接與外部係統交互。在本教程我們將展示如何簡單的使用Kafka連接器,實現從一個文件導入數據到Kafka主題,以及從Kafka主題導出數據到一個文件。
首先,我們創建一個測試用的文本文件:
>echo -e “foo\nbar” > test.txt
然後我們在單機模式啟動兩個連接器,即它們運行在同一個本地進程。這裏我們使用3個配置文件作為參數。第一個文件是針對Kafka連接器進程的通用配置,包含連接到的Kafka Broker和數據的序列化格式。後麵的每一個文件代表一個連接器。它們每個都包含一個唯一的連接器名稱,要實例化的連接器類型和此連接器需要的其他配置。
>bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
這些示例配置文件都包含在Kafka中,它們使用前麵步驟啟動的本地集群配置,創建兩個連接器:第一個作為源連接器,從一個文本文件讀取數據然後將每一行作為一個消息寫入到指定的Kafka主題;第二個作為接收端連接器,從一個Kafka主題讀取消息,並將每條消息作為一行寫入指定的文本文件。
啟動過程中我們可以看到一些日誌信息,其中包括哪些連接器被實例化了。一旦Kafka連接器進程啟動,源連接器就開始從test.txt文件讀取信息,然後把每一行內容作為一個消息發送到名為connect-test的主題;接收端連接器就開始從connect-test主題讀取消息,然後把每一個消息內容作為一行寫入test.sink.txt文件。我們可以查看這個文件的內容來驗證這些經過整個消息管道傳遞的數據:
>cat test.sink.txt
foo
bar
消息數據被Kafka存儲在connect-test主題中,這樣我們也可以在控製台啟動一個消費者來查看主題裏的消息(或者使用自定義的消費代碼來處理):
>bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic connect-test –from-beginning
{“schema”:{“type”:”string”,”optional”:false},”payload”:”foo”} {“schema”:{“type”:”string”,”optional”:false},”payload”:”bar”}
…
這些連接器會持續的處理數據,因此我們可以通過添加數據到輸入文件,然後看到消息通過整個管道: The connectors continue to process data, so we can add data to the file and see it move through the pipeline:
>echo “Another line” >> test.txt
然後我們可以看到這行數據在消費者所在的控製台以及接收端文件裏出現。
最後更新:2017-05-18 20:31:34