閱讀982 返回首頁    go 王者榮耀


《KAFKA官方文檔》第三章:快速入門(二)

第八步:使用Kafka流(Kafka Streams)處理數據

Kafka流是一個針對存儲於Kafka brokers上的數據進行實時流處理和分析的客戶端類庫。快速入門中的示例將展示如何使用這個類庫實現一個數據流處理應用。下麵是其中的WordCountDemo數單詞示例代碼片段(轉換成Java8的lambda表達式更便於閱讀)。

“` // 字符串和長整型的序列化器與反序列化器(serde) final Serde stringSerde = Serdes.String(); final Serde longSerde = Serdes.Long();

// 從streams-file-input主題構造一個KStream,消息值代表了文本的每一行(這裏我們忽略消息key中存儲的數據) KStream<String, String> textLines = builder.stream(stringSerde, stringSerde, “streams-file-input”);

KTable<String, Long> wordCounts = textLines // 按空格拆分每個文本行為多個單獨的詞 .flatMapValues(value -> Arrays.asList(value.toLowerCase().split(“\W+”)))

// 將單詞分組後作為消息的key
.groupBy((key, value) -> value)

// 統計每個單詞出現的次數(即消息的key)
.count("Counts")

// 將運行結果作為變更記錄流發送到輸出主題 wordCounts.to(stringSerde, longSerde, “streams-wordcount-output”); “`

上麵的代碼實現了數單詞算法(WordCount algorithm),即計算了輸入文本中每一個單詞出現的次數。不同於我們常見的計算給定數量文本的數單詞算法,這個示例被設計來操作一個無限的、不確定數據量的數據流。跟有界的情況類似,這是一個有狀態的算法,會跟蹤和更新單詞的數目。此算法必須假定輸入的數據是沒有邊界的,這樣因為不知道什麼時候處理完所有的數據,所以每當處理了新輸入的數據時,上述代碼會隨時輸出當前的狀態和處理結果。

我們先準備發送到Kafka主題的輸入數據,這些數據將被Kafka流程序依次處理。

>echo -e “all streams lead to kafka\nhello kafka streams\njoin kafka summit” > file-input.txt

在Windows上執行:

>echo all streams lead to kafka> file-input.txt

>echo hello kafka streams>> file-input.txt

>echo|set /p=join kafka summit>> file-input.txt

接著,我們控製台上使用生產者腳本將這些數據發送到一個叫streams-file-input的主題,此腳本會從標準輸入(STDIN)一行行的讀取數據,然後把每一行內容作為一個單獨的、key為null、值為字符串格式的Kafka消息,發送到這個主題(在實際應用中,隻要應用程序一直運行,數據就可以一直持續的流向Kafka):

>bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic streams-file-input

>bin/kafka-console-producer.sh –broker-list localhost:9092 –topic streams-file-input < file-input.txt

此時我們可以運行數單詞示例程序來處理輸入數據:

>bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo

示例程序將從streams-file-input主題讀入數據,然後用數單詞算法計算每一個消息數據,持續將當前計算結果寫入到streams-wordcount-output主題。到目前為止,我們在任何命令行的標準輸出上看不到這些結果,因為這些結果數據都被寫入到Kafka輸出主題streams-wordcount-output了。這個示例程序也將會一直運行,不像常見的流處理程序會在處理完以後自動結束。

現在我們可以通過讀取Kafka輸出主題來查看數單詞示例程序的輸出:

>bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic streams-wordcount-output –from-beginning –formatter kafka.tools.DefaultMessageFormatter –property print.key=true –property print.value=true –property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer –property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

可以看到在控製台上輸出如下:

all 1

lead 1

to 1

hello 1

streams 2

join 1

kafka 3

summit 1

第一列是java.lang.String類型的Kafka消息key,第二列是java.lang.Long類型的消息值。注意到輸出的實際上是一個持續更新的流,每一個數據記錄(上麵輸出的每一行)代表一個單詞(比如kafka)每次更新的計數。對於同一個key,會有多個記錄,每一個後麵的記錄代表之前記錄的更新結果。

下麵的兩幅圖展示了這個場景下到底發生了什麼。圖上的第一列展示了計算單詞出現次數的KTable<String, Long>的當前狀態演化。第二列展示了狀態更新導致的KTable變化記錄,也是被發送到Kafka輸出主題streams-wordcount-output中的數據。

“all stream lead to kafka”這行文本先被處理。隨著每一個新單詞作為一個表格中的新項(綠色背景高亮顯示)加入到KTable,KTable表格逐漸增長,同時相應的變化記錄被發送到下麵的KStream中。

當第二行文本“hello kafka streams”被處理後,我們可以看到與此同時在KTable中已經存在的項立即被更新(即kafak和streams)。同樣的變化記錄也被發送到輸出主題。

第三行處理也類似,我們暫且略去。這就解釋了為何輸出主題中的內容如上所,因為它包含了全部的變化記錄。

如果我們跳出這個具體的示例來看,Kafka流所做的事情就是表和變更記錄流之間的相互作用(這裏表指的是KTable,變更記錄流指的是下麵的KStream):表中的每一個變化記錄會發送到流中,當然如果我們從頭至尾的消費一個完整的變更記錄流,則可以重建這個表的全部內容。

現在我們可以寫入更多消息到stream-file-input主題,然後觀察這些消息被添加到streams-wordcount-output主題,這些消息反映出了更新的單詞計數(可以在控製台使用上麵提及的生產者腳本和消費者腳本來操作)。

最後我們可以在控製台使用Ctrl-C快捷鍵來結束消費者。

 

 

轉載自 並發編程網 - ifeve.com

最後更新:2017-05-18 20:31:37

  上一篇:go  找bug記(2)
  下一篇:go  《KAFKA官方文檔》第三章:快速入門(一)