《KAFKA官方文檔》入門指南(二)
把功能組合起來
消息的傳輸,存儲和流處理的組合看似不尋常卻是Kafka作為流處理平台的關鍵。
像HDFS分布式文件係統,允許存儲靜態文件進行批量處理。像這樣的係統允許存儲和處理過去的曆史數據。
傳統的企業消息係統允許處理您訂閱後才抵達的消息。這樣的係統隻能處理將來到達的數據。
Kafka結合了這些功能,這種結合對Kafka作為流應用平台以及數據流處理的管道至關重要。
通過整合存儲和低延遲訂閱,流處理應用可以把過去和未來的數據用相同的方式處理。這樣一個單獨的應用程序,不但可以處理曆史的,保存的數據,當它到達最後一條記錄不會停止,繼續等待處理未來到達的數據。這是泛化了的的流處理的概念,包括了批處理應用以及消息驅動的應用。
同樣,流數據處理的管道結合實時事件的訂閱使人們能夠用Kafka實現低延遲的管道; 可靠的存儲數據的能力使人們有可能使用它傳輸一些重要的必須保證可達的數據。可以與一個定期加載數據的線下係統集成,或者與一個因為維護長時間下線的係統集成。流處理的組件能夠保證轉換(處理)到達的數據。
有關Kafka提供的保證,API和功能的更多信息,看其餘文件。
下麵描述了一些使用Apache Kafka™的流行用例。更多的關於這些領域實踐的概述,參考這個博客。
Kafka能夠很好的替代傳統的消息中間件。消息中間件由於各種原因被使用(解耦數據的生產和消費,緩衝未處理的消息等)。相較於大多數消息處理係統,Kafka有更好的吞吐量,內置分區,副本複製和容錯性,使其成為大規模消息處理應用的理想解決方案。
根據我們的經驗消息的使用通常具有相對低的吞吐量,但可能需要端到端的低延遲,以及高可靠性的保證,這種低延遲和可靠性的保證恰恰是Kafka能夠提供的。
在這一領域Kafka是能夠和傳統的消息係統相媲美的,例如ActiveMQ或 RabbitMQ。
最初的用例是用Kafka重建一個用戶活動跟蹤管道使之作為一組實時發布 – 訂閱的數據源。這意味著網站活動(網頁瀏覽,搜索,或其他可能的操作)被當作一組中心主題發布,每種活動被當作一個主題。這些數據源(feeds)可被一係列的應用訂閱,包括實時處理,實時監測,加載到Hadoop係統或離線數據倉庫係統進行離線處理和報告。
活動追蹤通常會產生巨大的數據量,因為每個用戶頁麵的瀏覽都會產生很多的活動消息。
Kafka通常用於監測數據的處理。這涉及從分布式應用程序聚集統計數據,生產出集中的運行數據源feeds(以便訂閱)。
許多人用Kafka作為日誌聚合解決方案的替代品。日誌聚合通常從服務器收集物理日誌文件,並把它們放在一個集中的地方(文件服務器或HDFS)進行處理。Kafka抽象了文件的詳細信息,把日誌或事件數據的簡潔抽象作為消息流傳輸。這為低時延的處理提供支持,而且更容易支持多個數據源和分布式的數據消費。相比集中式的日誌處理係統,Scribe or Flume,Kafka提供同樣良好的性能,而且因為副本備份提供了更強的可靠性保證和更低的端到端延遲。
Kafka的流數據管道在處理數據的時候包含多個階段,其中原始輸入數據從Kafka主題被消費然後匯總,加工,或轉化成新主題用於進一步的消費或後續處理。例如,用於推薦新聞文章的數據流處理管道可能從RSS源抓取文章內容,並將其發布到“文章”主題; 進一步的處理可能是標準化或刪除重複數據,然後發布處理過的文章內容到一個新的話題; 最後的處理階段可能會嚐試推薦這個內容給用戶。這樣的數據流處理管道基於各個主題創建了實時數據數據流程圖。從版本0.10.0.0開始,Apache Kafka加入了輕量級的但功能強大的流處理庫Kafka Streams ,Kafka Streams支持如上所述的數據處理。除了Kafka Streams,可以選擇的開源流處理工具包括 Apache Storm and Apache Samza.
Event Sourcing
Event sourcing 是一種應用程序設計風格,是按照時間順序記錄的狀態變化的序列。Kafka的非常強大的存儲日誌數據的能力使它成為構建這種應用程序的極好的後端選擇。
Commit Log
Kafka可以為分布式係統提供一種外部提交日誌(commit-log)服務。日誌有助於節點之間複製數據,並作為一種數據重新同步機製用來恢複故障節點的數據。Kafka的log compaction 功能有助於支持這種用法。Kafka在這種用法中類似於Apache BookKeeper 項目。
本教程假設你從零開始,沒有現成的Kafka或ZooKeeper數據。由於Kafka控製台腳本在Unix基礎的和Windows平台上的不同,在Windows平台上使用bin\windows\,而不是bin/,並修改腳本擴展為.bat。
下載0.10.2.0釋放和un-tar它。
> tar -xzf kafka_2.11-0.10.2.0.tgz > cd kafka_2.11-0.10.2.0
Kafka使用ZooKeeper的,所以你需要先啟動ZooKeeper的服務器,如果你還沒有,您可以使用Kafka包裝裏的方便腳本來得到一個快速和汙染的單節點的ZooKeeper實例。
> bin/zookeeper-server-start.sh config/zookeeper.properties [2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig) ...
現在啟動Kafka服務器:
> bin/kafka-server-start.sh config/server.properties [2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties) [2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties) ...
讓我們創建一個名為“test”主題,隻有一個分區,隻有一個副本:
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
現在我們可以看到,如果我們運行的列表主題命令話題:
> bin/kafka-topics.sh --list --zookeeper localhost:2181 test
除了手動創建主題,你還可以配置你的代理服務器(broker),當一個不存在的主題被發布的時候它能自動創建相應的主題。
Kafka帶有一個命令行客戶端,獲取從文件或來自標準輸入的輸入,並作為消息發送到Kafka集群。默認情況下,每一行將被作為單獨的消息發送。
運行生產者腳本,然後輸入一些信息到控製台發送到服務器。
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test This is a message This is another message
Kafka也有一個命令行消費者,將收到的消息輸出到標準輸出。
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning This is a message This is another message
如果你在不同的終端上運行上麵的命令,那麼你現在應該能看到從生產者終端輸入的消息會出現在消費者終端。
所有的命令行工具都有其他選項; 不帶參數運行命令將顯示更加詳細的使用信息。
到目前為止,我們已經運行了單個代理的服務器,但是這沒有樂趣。對於Kafka,一個代理是隻有一個單節點的集群,因此多代理集群隻是比開始多了一些代理實例外,沒有什麼太大的變化。但隻是為了感受一下,我們的集群擴展到三個節點(所有的節點還是在本地機器上)。
首先,我們為每個經紀人做一個配置文件(在Windows上使用copy命令來代替):
> cp config/server.properties config/server-1.properties > cp config/server.properties config/server-2.properties
現在,編輯這些新文件和設置以下屬性:
config/server-1.properties: broker.id=1 listeners=PLAINTEXT://:9093 log.dir=/tmp/kafka-logs-1 config/server-2.properties: broker.id=2 listeners=PLAINTEXT://:9094 log.dir=/tmp/kafka-logs-2
該broker.id屬性是集群中的每個節點的唯一和永久的名字。我們要重寫端口和日誌目錄,因為我們都在同一台機器上運行這些代理,我們要防止經紀人在同一端口上注冊或覆蓋彼此的數據。
我們已經有Zookeeper服務和我們的單個節點服務,所以我們隻需要啟動兩個新節點:
> bin/kafka-server-start.sh config/server-1.properties & ... > bin/kafka-server-start.sh config/server-2.properties & ...
現在,創建一個新的具有三個的副本因子的主題:
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
好了,現在我們有一個集群,但是如何才能知道哪個代理節點在做什麼?要查看運行“describe topics”命令:
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
下麵是輸出的解釋。第一行給出了所有分區的摘要,每個附加的行提供了一個分區的信息。由於我們隻有一個分區,所以這個主題隻有一行。
- “Leader”,負責指定分區所有讀取和寫入的節點。每個節點將是一部分隨機選擇的分區中的領導者。
- “Replicas”是此分區日誌的節點列表集合,不管這些節點是否是領導者或者隻是還活著(不在in-sync狀態)。
- “ISR”是一組”in-sync” 節點列表的集合。這個列表包括目前活著並跟leader保持同步的replicas,Isr 是Replicas的子集。
請注意,在我的例子節點1是該主題的唯一分區中的leader。
我們可以運行相同的命令看看我們創建原來的話題的狀態:
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test Topic:test PartitionCount:1 ReplicationFactor:1 Configs: Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
所以毫不奇怪,原來的話題沒有副本,隻有我們創建它時的唯一的服務器0。
讓我們發布一些消息到我們新的話題:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic ... my test message 1 my test message 2 ^C
現在讓我們來消費這些消息:
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic ... my test message 1 my test message 2 ^C
現在,讓我們測試容錯性。代理1是領導者,讓我們殺死它:
> ps aux | grep server-1.properties 7564 ttys002 0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java... > kill -9 7564
在Windows上使用:
> wmic process get processid,caption,commandline | find "java.exe" | find "server-1.properties" java.exe java -Xmx1G -Xms1G -server -XX:+UseG1GC ... build\libs\kafka_2.10-0.10.2.0.jar" kafka.Kafka config\server-1.properties 644 > taskkill /pid 644 /f
領導權已經切換到備機中的一個節點上去了,節點1不再在同步中的副本集(in-sync replica set)中:
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 1,2,0 Isr: 2,0
但消息仍然是可用於消費,即使是原來負責寫任務的領導者已經不在了:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic^C
最後更新:2017-05-18 20:34:39