阅读68 返回首页    go 阿里云 go 技术社区[云栖]


《KAFKA官方文档》入门指南(二)

把功能组合起来

消息的传输,存储和流处理的组合看似不寻常却是Kafka作为流处理平台的关键。

像HDFS分布式文件系统,允许存储静态文件进行批量处理。像这样的系统允许存储和处理过去的历史数据

传统的企业消息系统允许处理您订阅后才抵达的消息。这样的系统只能处理将来到达的数据。

Kafka结合了这些功能,这种结合对Kafka作为流应用平台以及数据流处理的管道至关重要。

通过整合存储和低延迟订阅,流处理应用可以把过去和未来的数据用相同的方式处理。这样一个单独的应用程序,不但可以处理历史的,保存的数据,当它到达最后一条记录不会停止,继续等待处理未来到达的数据。这是泛化了的的流处理的概念,包括了批处理应用以及消息驱动的应用。

同样,流数据处理的管道结合实时事件的订阅使人们能够用Kafka实现低延迟的管道; 可靠的存储数据的能力使人们有可能使用它传输一些重要的必须保证可达的数据。可以与一个定期加载数据的线下系统集成,或者与一个因为维护长时间下线的系统集成。流处理的组件能够保证转换(处理)到达的数据。

有关Kafka提供的保证,API和功能的更多信息,看其余文件

1.2使用案例

下面描述了一些使用Apache Kafka™的流行用例。更多的关于这些领域实践的概述,参考这个博客

消息

Kafka能够很好的替代传统的消息中间件。消息中间件由于各种原因被使用(解耦数据的生产和消费,缓冲未处理的消息等)。相较于大多数消息处理系统,Kafka有更好的吞吐量,内置分区,副本复制和容错性,使其成为大规模消息处理应用的理想解决方案。

根据我们的经验消息的使用通常具有相对低的吞吐量,但可能需要端到端的低延迟,以及高可靠性的保证,这种低延迟和可靠性的保证恰恰是Kafka能够提供的。

在这一领域Kafka是能够和传统的消息系统相媲美的,例如ActiveMQ或 RabbitMQ

网站活动跟踪

最初的用例是用Kafka重建一个用户活动跟踪管道使之作为一组实时发布 – 订阅的数据源。这意味着网站活动(网页浏览,搜索,或其他可能的操作)被当作一组中心主题发布,每种活动被当作一个主题。这些数据源(feeds)可被一系列的应用订阅,包括实时处理,实时监测,加载到Hadoop系统或离线数据仓库系统进行离线处理和报告。

活动追踪通常会产生巨大的数据量,因为每个用户页面的浏览都会产生很多的活动消息。

测量

Kafka通常用于监测数据的处理。这涉及从分布式应用程序聚集统计数据,生产出集中的运行数据源feeds(以便订阅)。

日志聚合

许多人用Kafka作为日志聚合解决方案的替代品。日志聚合通常从服务器收集物理日志文件,并把它们放在一个集中的地方(文件服务器或HDFS)进行处理。Kafka抽象了文件的详细信息,把日志或事件数据的简洁抽象作为消息流传输。这为低时延的处理提供支持,而且更容易支持多个数据源和分布式的数据消费。相比集中式的日志处理系统,Scribe or Flume,Kafka提供同样良好的性能,而且因为副本备份提供了更强的可靠性保证和更低的端到端延迟。

流处理

Kafka的流数据管道在处理数据的时候包含多个阶段,其中原始输入数据从Kafka主题被消费然后汇总,加工,或转化成新主题用于进一步的消费或后续处理。例如,用于推荐新闻文章的数据流处理管道可能从RSS源抓取文章内容,并将其发布到“文章”主题; 进一步的处理可能是标准化或删除重复数据,然后发布处理过的文章内容到一个新的话题; 最后的处理阶段可能会尝试推荐这个内容给用户。这样的数据流处理管道基于各个主题创建了实时数据数据流程图。从版本0.10.0.0开始,Apache Kafka加入了轻量级的但功能强大的流处理库Kafka Streams ,Kafka Streams支持如上所述的数据处理。除了Kafka Streams,可以选择的开源流处理工具包括 Apache Storm and Apache Samza.

Event Sourcing

Event sourcing 是一种应用程序设计风格,是按照时间顺序记录的状态变化的序列。Kafka的非常强大的存储日志数据的能力使它成为构建这种应用程序的极好的后端选择。

Commit Log

Kafka可以为分布式系统提供一种外部提交日志(commit-log)服务。日志有助于节点之间复制数据,并作为一种数据重新同步机制用来恢复故障节点的数据。Kafka的log compaction 功能有助于支持这种用法。Kafka在这种用法中类似于Apache BookKeeper 项目。

1.3快速开始

本教程假设你从零开始,没有现成的Kafka或ZooKeeper数据。由于Kafka控制台脚本在Unix基础的和Windows平台上的不同,在Windows平台上使用bin\windows\,而不是bin/,并修改脚本扩展为.bat。

1步:下载代码

下载0.10.2.0释放和un-tar它。

> tar -xzf kafka_2.11-0.10.2.0.tgz
> cd kafka_2.11-0.10.2.0

2步:启动服务器

Kafka使用ZooKeeper的,所以你需要先启动ZooKeeper的服务器,如果你还没有,您可以使用Kafka包装里的方便脚本来得到一个快速和污染的单节点的ZooKeeper实例。

> bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...

现在启动Kafka服务器:

> bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...

3步:创建一个话题

让我们创建一个名为“test”主题,只有一个分区,只有一个副本:

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

现在我们可以看到,如果我们运行的列表主题命令话题:

> bin/kafka-topics.sh --list --zookeeper localhost:2181
test

除了手动创建主题,你还可以配置你的代理服务器(broker),当一个不存在的主题被发布的时候它能自动创建相应的主题。

4步:发送一些消息

Kafka带有一个命令行客户端,获取从文件或来自标准输入的输入,并作为消息发送到Kafka集群。默认情况下,每一行将被作为单独的消息发送。

运行生产者脚本,然后输入一些信息到控制台发送到服务器。

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message

5步:启动消费者

Kafka也有一个命令行消费者,将收到的消息输出到标准输出。

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message

如果你在不同的终端上运行上面的命令,那么你现在应该能看到从生产者终端输入的消息会出现在消费者终端。

所有的命令行工具都有其他选项; 不带参数运行命令将显示更加详细的使用信息。

6步:设置多代理群集

到目前为止,我们已经运行了单个代理的服务器,但是这没有乐趣。对于Kafka,一个代理是只有一个单节点的集群,因此多代理集群只是比开始多了一些代理实例外,没有什么太大的变化。但只是为了感受一下,我们的集群扩展到三个节点(所有的节点还是在本地机器上)。

首先,我们为每个经纪人做一个配置文件(在Windows上使用copy命令来代替):

> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties

现在,编辑这些新文件和设置以下属性:

config/server-1.properties:
    broker.id=1
    listeners=PLAINTEXT://:9093
    log.dir=/tmp/kafka-logs-1

config/server-2.properties:
    broker.id=2
    listeners=PLAINTEXT://:9094
    log.dir=/tmp/kafka-logs-2

该broker.id属性是集群中的每个节点的唯一和永久的名字。我们要重写端口和日志目录,因为我们都在同一台机器上运行这些代理,我们要防止经纪人在同一端口上注册或覆盖彼此的数据。

我们已经有Zookeeper服务和我们的单个节点服务,所以我们只需要启动两个新节点:

> bin/kafka-server-start.sh config/server-1.properties &
...
> bin/kafka-server-start.sh config/server-2.properties &
...

现在,创建一个新的具有三个的副本因子的主题:

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

好了,现在我们有一个集群,但是如何才能知道哪个代理节点在做什么?要查看运行“describe topics”命令:

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic	PartitionCount:1	ReplicationFactor:3	Configs:
	Topic: my-replicated-topic	Partition: 0	Leader: 1	Replicas: 1,2,0	Isr: 1,2,0

下面是输出的解释。第一行给出了所有分区的摘要,每个附加的行提供了一个分区的信息。由于我们只有一个分区,所以这个主题只有一行。

  • “Leader”,负责指定分区所有读取和写入的节点。每个节点将是一部分随机选择的分区中的领导者。
  • “Replicas”是此分区日志的节点列表集合,不管这些节点是否是领导者或者只是还活着(不在in-sync状态)。
  • “ISR”是一组”in-sync” 节点列表的集合。这个列表包括目前活着并跟leader保持同步的replicas,Isr 是Replicas的子集。

请注意,在我的例子节点1是该主题的唯一分区中的leader。

我们可以运行相同的命令看看我们创建原来的话题的状态:

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test	PartitionCount:1	ReplicationFactor:1	Configs:
	Topic: test	Partition: 0	Leader: 0	Replicas: 0	Isr: 0

所以毫不奇怪,原来的话题没有副本,只有我们创建它时的唯一的服务器0。

让我们发布一些消息到我们新的话题:

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
...
my test message 1
my test message 2
^C

现在让我们来消费这些消息:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C

现在,让我们测试容错性。代理1是领导者,让我们杀死它:

> ps aux | grep server-1.properties
7564 ttys002    0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java...
> kill -9 7564

在Windows上使用:

> wmic process get processid,caption,commandline | find "java.exe" | find "server-1.properties"
java.exe    java  -Xmx1G -Xms1G -server -XX:+UseG1GC ... build\libs\kafka_2.10-0.10.2.0.jar"  kafka.Kafka config\server-1.properties    644
> taskkill /pid 644 /f

领导权已经切换到备机中的一个节点上去了,节点1不再在同步中的副本集(in-sync replica set)中:

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic	PartitionCount:1	ReplicationFactor:3	Configs:
	Topic: my-replicated-topic	Partition: 0	Leader: 2	Replicas: 1,2,0	Isr: 2,0

但消息仍然是可用于消费,即使是原来负责写任务的领导者已经不在了:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic^C

转载自 并发编程网 - ifeve.com

最后更新:2017-05-18 20:34:39

  上一篇:go  《KAFKA官方文档》入门指南(三)
  下一篇:go  Storm源码浅析之topology的提交