閱讀726 返回首頁    go 阿裏雲 go 技術社區[雲棲]


《kafka中文手冊》-快速開始(二)

Putting the Pieces Together 把各個塊整合起來

This combination of messaging, storage, and stream processing may seem unusual but it is essential to Kafka’s role as a streaming platform.

組合消息, 存儲, 流處理這些看起來不太平常, 但是這些仍然是kafka的作流處理平台的主要功能

A distributed file system like HDFS allows storing static files for batch processing. Effectively a system like this allows storing and processing historical data from the past.

像hdfs分布式文件處理係統, 允許存儲靜態數據用於批處理, 能使得係統在處理和分析過往的曆史數據時更為有效

A traditional enterprise messaging system allows processing future messages that will arrive after you subscribe. Applications built in this way process future data as it arrives.

像傳統的消息係統, 允許處理在你訂閱之前的信息, 像這樣的應用可以處理之前到達的數據

Kafka combines both of these capabilities, and the combination is critical both for Kafka usage as a platform for streaming applications as well as for streaming data pipelines.

kafka整合和所有這些功能, 這些組合包括把kafka平台當作一個流處理應用, 或者是作為流處理的管道

By combining storage and low-latency subscriptions, streaming applications can treat both past and future data the same way. That is a single application can process historical, stored data but rather than ending when it reaches the last record it can keep processing as future data arrives. This is a generalized notion of stream processing that subsumes batch processing as well as message-driven applications.

通過組合數據存儲和低訂閱開銷, 流處理應用可以平等對待之前到達記錄或即將到達的記錄, 這就是一個應用可以處理曆史存儲的數據, 也可以在讀到最後記錄後, 保持等待未來的數據進行處理. 這是流處理,包括批處理以及消息驅動的應用的一個廣義的概念

Likewise for streaming data pipelines the combination of subscription to real-time events make it possible to use Kafka for very low-latency pipelines; but the ability to store data reliably make it possible to use it for critical data where the delivery of data must be guaranteed or for integration with offline systems that load data only periodically or may go down for extended periods of time for maintenance. The stream processing facilities make it possible to transform data as it arrives.

同樣的像流處理管道, 使用kafka在實時事件係統能實現比較低的延遲管道; 在kafka的存儲能力, 使得一些離線係統, 如定時加載數據, 或者維護宕機時數據分發能力更有保障性. 流處理功能在數據到達時進行數據轉換處理

For more information on the guarantees, apis, and capabilities Kafka provides see the rest of the documentation.

更多關於kafka提供的功能, 服務和api, 可以查看後文

1.2 Use Cases 用例

Here is a description of a few of the popular use cases for Apache Kafka™. For an overview of a number of these areas in action, see this blog post.

這裏提供者一些常用的Kafak使用場景, 更多這些領域的詳細說明可以參考這裏,  this blog post.

Messaging 消息通訊

Kafka works well as a replacement for a more traditional message broker. Message brokers are used for a variety of reasons (to decouple processing from data producers, to buffer unprocessed messages, etc). In comparison to most messaging systems Kafka has better throughput, built-in partitioning, replication, and fault-tolerance which makes it a good solution for large scale message processing applications.

Kafka  可以很好替代傳統的消息服務器, 消息服務器的使用有多方麵的原因(比如, 可以從生產者上解耦出數據處理, 緩衝未處理的數據), 相比其它消息係統, kafka有更好的吞吐量, 分區機製, 複製和容錯能力, 更能適用於大規模的在線數據處理

In our experience messaging uses are often comparatively low-throughput, but may require low end-to-end latency and often depend on the strong durability guarantees Kafka provides.

從經驗來看, 消息係統一般吞吐量都比較小, 更多的是要求更低的端到端的延遲, 這些功能都可以依賴於kafka的高可保障

In this domain Kafka is comparable to traditional messaging systems such as ActiveMQ or RabbitMQ.

在這個領域上, kafka可以類比於傳統的消息係統, 如: ActiveMQ or RabbitMQ.

Website Activity Tracking 網頁日誌跟蹤

The original use case for Kafka was to be able to rebuild a user activity tracking pipeline as a set of real-time publish-subscribe feeds. This means site activity (page views, searches, or other actions users may take) is published to central topics with one topic per activity type. These feeds are available for subscription for a range of use cases including real-time processing, real-time monitoring, and loading into Hadoop or offline data warehousing systems for offline processing and reporting.

kafka最通常一種使用方式是通過構建用戶活動跟蹤管道作為實時發布和訂閱反饋隊列. 頁麵操作(查看, 檢索或任何用戶操作)都可以按活動類型發送的不同的topic上, 這些反饋信息, 有助於構建一個實時處理, 實時監控, 或加載到hadoop集群, 構建數據倉庫用於離線處理和分析

Activity tracking is often very high volume as many activity messages are generated for each user page view.

由於每個用戶頁麵訪問都要記錄, 活動日誌跟蹤一般會有大量的訪問消息被記錄

Metrics 度量

Kafka is often used for operational monitoring data. This involves aggregating statistics from distributed applications to produce centralized feeds of operational data.

Kafak還經常用於運行監控數據的存儲, 這涉及到對分布式應用的運行數的及時匯總統計

Log Aggregation 日誌匯聚

Many people use Kafka as a replacement for a log aggregation solution. Log aggregation typically collects physical log files off servers and puts them in a central place (a file server or HDFS perhaps) for processing. Kafka abstracts away the details of files and gives a cleaner abstraction of log or event data as a stream of messages. This allows for lower-latency processing and easier support for multiple data sources and distributed data consumption. In comparison to log-centric systems like Scribe or Flume, Kafka offers equally good performance, stronger durability guarantees due to replication, and much lower end-to-end latency.

很多人使用kafka作為日誌匯總的替代品. 典型的情況下, 日誌匯總從物理機上采集回來並忖道中央存儲中心(如hdfs分布式文件係統)後等待處理, kafka將文件的細節抽象出來,並將日誌或事件數據清理和轉換成消息流. 這方便與地延遲的數據處理, 更容易支持多個數據源, 和分布式的數據消費. 比起典型的日誌中心係統如Scribe或者flume 係統, kafka提供等同更好的性能, 更強大的複製保證和更低的端到端的延遲

Stream Processing 流處理

Many users of Kafka process data in processing pipelines consisting of multiple stages, where raw input data is consumed from Kafka topics and then aggregated, enriched, or otherwise transformed into new topics for further consumption or follow-up processing. For example, a processing pipeline for recommending news articles might crawl article content from RSS feeds and publish it to an “articles” topic; further processing might normalize or deduplicate this content and published the cleansed article content to a new topic; a final processing stage might attempt to recommend this content to users. Such processing pipelines create graphs of real-time data flows based on the individual topics. Starting in 0.10.0.0, a light-weight but powerful stream processing library called Kafka Streams is available in Apache Kafka to perform such data processing as described above. Apart from Kafka Streams, alternative open source stream processing tools include Apache Storm and Apache Samza.

更多的kafka用戶在處理數據時一般都是多流程多步驟的, 原始數據從kafka的topic裏麵被讀取, 然後匯總, 分析 然後轉換到新的topic中進行後續的消費處理. 例如, 文章推薦處理管道肯能從RSS feeds裏麵抓取文章內容, 然後發布到文章這個topic中, 後麵在繼續規範化處理,除去重複後發布到另外一個新的topic中去, 一個最終的步驟可能是把文章內容推薦給用戶, 像這樣的實時係統流數據處理管道基於各個獨立的topic, 從0.10.0.0開始, kafak提供一個輕量級, 但是非常強大的流處理api叫做 Kafka Streams , 可以處理上述描述的任務情景. 除了kafka的流機製外, 可選擇開源項目有e Apache Storm 和 Apache Samza.

Event Sourcing 事件源

Event sourcing is a style of application design where state changes are logged as a time-ordered sequence of records. Kafka’s support for very large stored log data makes it an excellent backend for an application built in this style.

時間記錄是應用在狀態變化時按時間順序依次記錄狀態變化日誌的一種設計風格. kafka很適合作為這種風格的後端服務器

Commit Log 提交日誌

Kafka can serve as a kind of external commit-log for a distributed system. The log helps replicate data between nodes and acts as a re-syncing mechanism for failed nodes to restore their data. The log compaction feature in Kafka helps support this usage. In this usage Kafka is similar to Apache BookKeeperproject.

Kafka適用於分布式係統的外部提交日誌, 這些日誌方便於在節點間進行複製, 並在服務器故障是提供重新同步機能. kafka的日誌壓縮特性有利於這方麵的使用, 這個特性有點兒像Apache BookKeeper 這個項目

1.3 Quick Start 快速開始

This tutorial assumes you are starting fresh and have no existing Kafka™ or ZooKeeper data. Since Kafka console scripts are different for Unix-based and Windows platforms, on Windows platforms use bin\windows\ instead of bin/, and change the script extension to .bat.

該入門指南假定你對kafka和zookeeper是個新手, kafka的控製台腳步window和unix係統不一樣, 如果在window係統, 請使用 bin\windows\目錄下的腳本, 而不是使用bin/, 下的腳本

Step 1: Download the code

Download the 0.10.1.0 release and un-tar it. 下載 0.10.1.0 版本並解壓

> tar -xzf kafka_2.11-0.10.1.0.tgz
> cd kafka_2.11-0.10.1.0

Step 2: Start the server

Kafka uses ZooKeeper so you need to first start a ZooKeeper server if you don’t already have one. You can use the convenience script packaged with kafka to get a quick-and-dirty single-node ZooKeeper instance.

由於kafka使用ZooKeepe服務器,  如果你沒有zookeeper服務器需要先啟動一個, 你可以使用kafka已經打包好的快捷腳本用於創建一個單個節點的zookeeper實例

> bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...

Now start the Kafka server: 現在可以啟動kafka服務器

> bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...

Step 3: Create a topic

Let’s create a topic named “test” with a single partition and only one replica: 創建一個隻有一個分區和一個副本的topic叫做”test”,

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

We can now see that topic if we run the list topic command: 如果使用查看topic查看命令, 我們就可以看到所有topic列表

> bin/kafka-topics.sh --list --zookeeper localhost:2181
test

Alternatively, instead of manually creating topics you can also configure your brokers to auto-create topics when a non-existent topic is published to.

還有一種可選的方式, 如果不想手動創建topic, 你可以配置服務器在消息發時, 自動創建topic對象

Step 4: Send some messages

Kafka comes with a command line client that will take input from a file or from standard input and send it out as messages to the Kafka cluster. By default, each line will be sent as a separate message.

kafka自帶的命令行終端腳本, 可以從文件或標準輸入讀取行輸入, 並發送消息到kafka集群, 默認每行數據當作一條獨立的消息進行發送

Run the producer and then type a few messages into the console to send to the server.

運行發布者終端腳步, 然後從終端輸入一些消息後發送到服務器

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message

Step 5: Start a consumer

Kafka also has a command line consumer that will dump out messages to standard output.

kafka也自帶一個消費者命令行終端腳本, 可以把消息打印到終端上

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message

If you have each of the above commands running in a different terminal then you should now be able to type messages into the producer terminal and see them appear in the consumer terminal.

如果上麵的兩個命令跑在不同的終端上, 則從提供者終端輸入消息, 會在消費者終端展現出來

All of the command line tools have additional options; running the command with no arguments will display usage information documenting them in more detail.

上麵的命令都需要而外的命令行參數, 如果隻輸入命令不帶任何參數, 則會提示更多關於該命令的使用說明

Step 6: Setting up a multi-broker cluster

So far we have been running against a single broker, but that’s no fun. For Kafka, a single broker is just a cluster of size one, so nothing much changes other than starting a few more broker instances. But just to get feel for it, let’s expand our cluster to three nodes (still all on our local machine).

到現在為止, 我們隻跑了當個服務器實例, 但是這個不好玩, 對於kafka來說, 單個服務器實例意味著這個集群隻有一個成員, 如果要啟動多個實例也不需要做太多的變化. 現在來感受下, 把我們的集群擴展到3個機器(在同一台物理機上)

First we make a config file for each of the brokers (on Windows use the copy command instead):

所需我們給每個不同的服務器複製一份不同的配置文件

> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties

Now edit these new files and set the following properties:現在重新配置這些新的文件

config/server-1.properties:
    broker.id=1
    listeners=PLAINTEXT://:9093
    log.dir=/tmp/kafka-logs-1

config/server-2.properties:
    broker.id=2
    listeners=PLAINTEXT://:9094
    log.dir=/tmp/kafka-logs-2

The broker.id property is the unique and permanent name of each node in the cluster. We have to override the port and log directory only because we are running these all on the same machine and we want to keep the brokers from all trying to register on the same port or overwrite each other’s data.

broker.id 屬性對於集群中的每個服務器實例都必須是唯一的且不變的, 我們重新了端口號和日誌目錄, 是因為我們實例都是跑在同一台物理機器上, 需要使用不同的端口和目錄來防止衝突

We already have Zookeeper and our single node started, so we just need to start the two new nodes:

我們已經有了Zookeeper服務器, 而且已經有啟動一個實例, 現在我們再啟動2個實例

> bin/kafka-server-start.sh config/server-1.properties &
...
> bin/kafka-server-start.sh config/server-2.properties &
...

Now create a new topic with a replication factor of three: 現在可以創建一個topic包含3個副本

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

Okay but now that we have a cluster how can we know which broker is doing what? To see that run the “describe topics” command:

Ok, 現在如果我們怎麼知道那個實例在負責什麼? 可以通過 “describe topics”命令查看

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic	PartitionCount:1	ReplicationFactor:3	Configs:
	Topic: my-replicated-topic	Partition: 0	Leader: 1	Replicas: 1,2,0	Isr: 1,2,0

Here is an explanation of output. The first line gives a summary of all the partitions, each additional line gives information about one partition. Since we have only one partition for this topic there is only one line.

這裏解析下輸出信息, 第一行是所有的分區匯總, 每行分區的詳細信息, 因為我們隻有一個分區, 所以隻有一行

  • “leader” is the node responsible for all reads and writes for the given partition. Each node will be the leader for a randomly selected portion of the partitions.
  • “leader”表示這個實例負責響應指定分區的讀寫請求, 每個實例都有可能被隨機選擇為部分分區的leader負責人
  • “replicas” is the list of nodes that replicate the log for this partition regardless of whether they are the leader or even if they are currently alive.
  • “replicas”  表示當前分區分發的所有副本所在的所有實例列表, 不管這個實例是否有存活
  • “isr” is the set of “in-sync” replicas. This is the subset of the replicas list that is currently alive and caught-up to the leader.
  • “isr”  表示存儲當前分區的日誌都已經同步到leader的服務器的實例集合

Note that in my example node 1 is the leader for the only partition of the topic. 注意我這個例子隻有實例1是主服務器, 因為topic隻有一個分區

We can run the same command on the original topic we created to see where it is: 我們可以運行同樣的命令在原來創建的topic上

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test	PartitionCount:1	ReplicationFactor:1	Configs:
	Topic: test	Partition: 0	Leader: 0	Replicas: 0	Isr: 0

So there is no surprise there—the original topic has no replicas and is on server 0, the only server in our cluster when we created it.

意料之中, 原來的topic沒有副本, 而且由實例0負責, 實例0是我們集群最初創建時的唯一實例

Let’s publish a few messages to our new topic: 我們發布點消息到新的主題上

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
...
my test message 1
my test message 2
^C

Now let’s consume these messages: 現在我們開始消費這些消息

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C

Now let’s test out fault-tolerance. Broker 1 was acting as the leader so let’s kill it: 現在,讓我們測試下容錯能力, 實例1現在是主服務器, 我們現在把它kill掉

> ps aux | grep server-1.properties
7564 ttys002    0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java...
> kill -9 7564

On Windows use:

> wmic process get processid,caption,commandline | find "java.exe" | find "server-1.properties"
java.exe    java  -Xmx1G -Xms1G -server -XX:+UseG1GC ... build\libs\kafka_2.10-0.10.1.0.jar"  kafka.Kafka config\server-1.properties    644
> taskkill /pid 644 /f

Leadership has switched to one of the slaves and node 1 is no longer in the in-sync replica set: 主服務器切換到原來的兩個從服務器裏麵, 原來的實例1也不在同步副本裏麵了

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic	PartitionCount:1	ReplicationFactor:3	Configs:
	Topic: my-replicated-topic	Partition: 0	Leader: 2	Replicas: 1,2,0	Isr: 2,0

But the messages are still available for consumption even though the leader that took the writes originally is down: 但是消息還是可以消費, 盡管原來接受消息的主服務器已經宕機了

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C

轉載自 並發編程網 - ifeve.com

最後更新:2017-05-19 10:25:03

  上一篇:go  《kafka中文手冊》-快速開始(三)
  下一篇:go  《kafka中文手冊》-快速開始(一)