阅读189 返回首页    go 阿里云 go 技术社区[云栖]


《KAFKA官方文档》入门指南(三)

7步:使用Kafka连接导入/导出数据

从控制台写入数据和写回控制台是一个很方便入门的例子,但你可能想用Kafka使用其他来源的数据或导出Kafka的数据到其他系统。相对于许多系统需要编写定制集成的代码,您可以使用Kafka连接到系统去导入或导出数据。

Kafka Connect是包括在Kafka中一个工具,用来导入导出数据到Kafka。它是connectors的一个可扩展工具,其执行定制逻辑,用于与外部系统交互。在这个快速入门,我们将看到如何使用Kafka Connect做一些简单的连接器从一个文件导入数据到Kafka的主题,和将主题数据导出到一个文件。

首先,我们需要创建一些原始数据来开始测试:

> echo -e "foo\nbar" > test.txt

接下来,我们将启动两个运行在独立模式的连接器,这意味着他们在一个单一的,局部的,专用的进程中运行。我们提供三个配置文件作为参数。第一始终是Kafka连接过程中的公共配置,如要连接到的Kafka的代理服务器的配置和数据的序列化格式的配置。剩余的每个配置文件用来创建指定的连接器。这些文件包括一个唯一的连接器名称,需要实例化的连接器类,还有创建该连接器所需的其他配置。

> bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

用这些Kafka的示例配置文件,使用前面已经启动的本地群集的默认配置,建立两个连接器:第一是一个源连接器,其从输入文件中读取每行的内容,发布到的Kafka主题和第二个是一个sink连接器负责从Kafka主题读取消息,生产出的消息按行输出到文件。

在启动过程中,你会看到一些日志信息,包括一些表明该连接器被实例化的信息。一旦Kafka Connect进程已经开始,源连接器应该开始从test.txt读取每行的消息,并将其生产发布到主题connect-test,而sink连接器应该从主题connect-test读取消息,并将其写入文件test.sink.txt。我们可以通过检查输出文件的内容来验证数据都已通过整个管道输送:

> cat test.sink.txt
foo
bar

请注意,数据被存储在Kafka主题的connect-test中,所以我们也可以运行控制台消费者消费主题中的数据(或使用定制的消费者代码来处理它):

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
...

连接器不停的处理数据,因此我们可以将数据添加到该文件,并能看到数据通过管道移动:

> echo "Another line" >> test.txt

您应该看到一行消息出现在控制台消费者的控制台和sink文件中。

8步:使用Kafka Streams处理数据

Kafka Streams 是Kafka的客户端库, 用来做实时流处理和分析存储在Kafka代理服务器的数据。该快速入门例子将演示如何运行这个流应用库。这里是要点WordCountDemo的示例代码(转换为方便阅读的Java 8 lambda表达式)。

// Serializers/deserializers (serde) for String and Long types
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();

// Construct a `KStream` from the input topic ""streams-file-input", where message values
// represent lines of text (for the sake of this example, we ignore whatever may be stored
// in the message keys).
KStream<String, String> textLines = builder.stream(stringSerde, stringSerde, "streams-file-input");

KTable<String, Long> wordCounts = textLines
    // Split each text line, by whitespace, into words.
    .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))

    // Group the text words as message keys
    .groupBy((key, value) -> value)

    // Count the occurrences of each word (message key).
    .count("Counts")

// Store the running counts as a changelog stream to the output topic.
wordCounts.to(stringSerde, longSerde, "streams-wordcount-output");

它实现了单词计数算法,计算输入文本中一个单词的出现次数。然而,与其他单词计数的算法不同,其他的算法一般都是对有界数据进行操作,该算法演示应用程序的表现略有不同,因为他可以被设计去操作无限的,无界的流数据。和操作有界数据的算法相似,它是一个有状态的算法,可以跟踪和更新单词的计数。然而,因为它必须承担潜在的无界输入数据的处理,它会周期性地输出其当前状态和结果,同时继续处理更多的数据,因为它无法知道他有没有处理完“所有”的输入数据。

作为第一步骤,我们将准备好输入到Kafka主题的数据,随后由Kafka Streams应用程序进行处理。

> echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > file-input.txt

或在Windows上:

> echo all streams lead to kafka> file-input.txt
> echo hello kafka streams>> file-input.txt
> echo|set /p=join kafka summit>> file-input.txt

接下来,我们使用控制台生产者把输入的数据发送到主题名streams-file-input 的主题上,其内容从STDIN一行一行的读取,并一行一行的发布到主题,每一行的消息都有一个空键和编码后的字符串(在实践中,当应用程序将启动并运行后,流数据很可能会持续流入Kafka):

> bin/kafka-topics.sh --create \
            --zookeeper localhost:2181 \
            --replication-factor 1 \
            --partitions 1 \
            --topic streams-file-input
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-file-input < file-input.txt

现在,我们可以运行单词计数应用程序来处理输入数据:

> bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo

演示应用程序将从输入主题streams-file-input读取数据,对读取的消息的执行单词计数算法,并且持续写入其当前结果到输出主题streams-wordcount-output。因此,除了写回Kafka的日志条目,不会有任何的STDOUT输出。该演示将运行几秒钟,与典型的流处理应用不同,演示程序会自动终止。

现在,我们通过读取输出主题的输出得到单词计数演示程序的结果:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
            --topic streams-wordcount-output \
            --from-beginning \
            --formatter kafka.tools.DefaultMessageFormatter \
            --property print.key=true \
            --property print.value=true \
            --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
            --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

下面的数据会被输出到控制台:

all     1
lead    1
to      1
hello   1
streams 2
join    1
kafka   3
summit  1

这里,第一列是java.lang.String类型的消息健,而第二列是java.lang.Long型消息值。注意,这里的输出其实是数据更新的连续流,每个数据记录(上面的例子里的每行的输出)都有一个单词更新后的数目值,例如“Kafka”作为键的记录。对于具有相同键的多个记录,每个后面的记录都是前一个记录的更新。

下面的两个图说明什么发生在幕后的过程。第一列显示的当前状态的变化,用KTable<String, Long>来统计单词出现的数目。第二列显示KTable状态更新导致的发生变化的记录,这个变化的记录被发送到输出Kafka主题streams-wordcount-output

首先, “all streams lead to kafka”这样一行文本正在被处理。当新的单词被处理的时候,KTable会增加一个新的表项(以绿色背景高亮显示),并有相应的变化记录发送到下游KStream。

当第二行“hello kafka streams”被处理的时候,我们观察到,现有的KTable中的表项第一次被更新(这里: 单词 “kafka” 和 “streams”)。再次,改变的记录被发送到输出话题。

以此类推(我们跳过的第三行是如何被处理的插图)。这就解释了为什么输出主题有我们上面例子显示的内容,因为它包含了完整的更改记录。

跳出这个具体的例子我们从整体去看, Kafka流利用表和日志变化(changelog)流之间的二元性(here: 表= the KTable, 日志变化流 = the downstream KStream):你可以发布的每一个表的变化去一个流,如果你从开始到结束消费了整个的日志变化(changelog)流,你可以重建表的内容。

现在,你可以写更多的输入信息到streams-file-input主题,并观察更多的信息加入到了 streams-wordcount-output主题,反映了更新后的单词数目(例如,使用上述的控制台生产者和控制台消费者)。

您可以通过Ctrl-C 停止控制台消费者。


转载自 并发编程网 - ifeve.com

最后更新:2017-05-18 20:36:02

  上一篇:go  storm常见问题解答
  下一篇:go  《KAFKA官方文档》入门指南(二)