阅读745 返回首页    go 人物


《KAFKA官方文档》入门指南(四)

1.4生态系统

除了Kafka的主要版本之外,还有很多应用集成了Kafka工具。该生态系统页面中列出的许多工具,包括流处理系统,Hadoop的集成,监控和部署工具。

1.5从以前版本升级

0.8.40.9.x0.10.0.x0.10.1.x升级到0.10.2.0

0.10.2.0的有线协议有变化。通过下面的推荐滚动升级计划,你能保证在升级过程中无需停机。但是,请在升级之前查看0.10.2.0版本显着的变化

从0.10.2版本开始,Java客户端(生产者和消费者)已获得与旧版本代理服务器沟通的能力。版本0.10.2客户可以跟0.10.0版或更新版本的代理沟通。但是,如果你的代理比0.10.0老,你必须在升级客户端之前升级Kafka集群中的所有代理服务器(Broker)。版本0.10.2代理支持0.8.x和更新的客户端。

对于滚动升级:

  1. 更新所有代理服务器上的server.properties文件,添加以下属性:
    • inter.broker.protocol.version = CURRENT_KAFKA_VERSION(例如0.8.2,0.9.0,0.10.0或0.10.1)。
    • log.message.format.version = CURRENT_KAFKA_VERSION(参见升级后的潜在性能的影响了解此配置做什么的详细信息。)
  2. 逐一升级代理:关闭代理,更新代码,并重新启动。
  3. 一旦整个群集升级成功,通过编辑inter.broker.protocol.version将其设置为0.10.2的协议版本。
  4. 如果您以前的消息格式为0.10.0,改变log.message.format.version至0.10.2(这是一个无效操作,因为0.10.0,0.10.1和0.10.2的消息格式相同)。如果您以前的消息格式版本低于0.10.0,不要改变log.message.format.version – 这个参数只能在所有的消费者都已经升级到0.10.0.0或更高版本之后改动。
  5. 逐一重新启动代理服务器使新协议版本生效。
  6. 如果这时log.message.format.version仍比0.10.0低,等到所有的消费者都已经升级到0.10.0或更高版本,然后更改每个代理服务器的log.message.format.version到0.10.2,然后逐一重新启动。

注意:如果你愿意接受宕机,你可以简单地把所有的代理服务器关闭,更新代码,然后重新启动他们。他们将默认使用新的协议。

注:改变协议版本并重新启动可以在代理服务器升级之后的任何时间做,没有必要必须立刻就做。

升级0.10.1版本的Kafka流应用

  • 从0.10.1升级您的流应用程序到0.10.2不需要升级代理。0.10.2 Kafka流应用程序可以连接到0.10.2和0.10.1代理(但无法连接到 0.10.0的代理)。
  • 你需要重新编译代码。只是替换Kafka流的jar文件将无法正常工作,这破坏你的应用程序。
  • 如果您使用自定义(即用户实现的)的时间戳提取,则需要更新此代码,因为TimestampExtractor接口改变了。
  • 如果您注册了自定义指标,您将需要更新此代码,因为StreamsMetric接口被改变了。
  • 0.10.2 流 API的变化更多的细节。

0.10.2.1显着的变化

  • 对于StreamsConfig类的两个配置的默认值的修改提高了Kafka流应用的弹性。内部Kafka流生产者retries默认值从0变化到10,内部Kafka流消费者max.poll.interval.ms 缺省值从300000到改变Integer.MAX_VALUE。

0.10.2.0显着的变化

  • 在Java客户端(生产者和消费者)已获得与旧版本代理沟通的能力。版本0.10.2客户端可以跟0.10.0版或更新版本的代理沟通。请注意,某些功能在跟就代理沟通的时候不可用或被限制了。
  • 在Java消费者中有几种方法现在可能抛出InterruptException如果调用线程被中断。请参阅KafkaConsumer的Javadoc,对这种变化有一个更深入的解释。
  • Java的消费者现在被恰当关闭。默认情况下,消费者会等待30秒才能完成挂起的请求。一个带有timeout参数的新的API已添加到KafkaConsumer去控制最大等待时间。
  • 用逗号分隔的多个正则表达式可以传递多个Java消费者给MirrorMaker–whitelist选择。这使得与MirrorMaker使用老Scala消费者时的行为一致。
  • 从0.10.1升级您的流应用程序0.10.2不需要代理服务器升级。Kafka 0.10.2流应用程序可以连接到0.10.2和0.10.1代理(但无法连接到0.10.0代理)。
  • Zookeeper的依赖从流API中删除。流API现在使用Kafka协议来管理内部主题,而不是直接修改动物园管理员的主题。这消除了需要直接访问Zookeeper的特权,而“StreamsConfig.ZOOKEEPER_CONFIG”也不需要在流应用被设置。如果Kafka集群是安全认证的,流应用程序必须具备必要的安全权限才可以创建新的主题。
  • 一些新的参数,包括“security.protocol”, “connections.max.idle.ms”, “retry.backoff.ms”, “reconnect.backoff.ms”和“request.timeout.ms”添加到StreamsConfig类。如果用户需要设置这些,要注意这些默认值。欲了解更多详情,请参阅3.5Kafka流CONFIGS
  • 该offsets.topic.replication.factor代理的配置现在在主题生产中强制使用。直到集群的大小符合这个复制因子要求,否则,主题的生产将失败,返回GROUP_COORDINATOR_NOT_AVAILABLE错误。

新的协议版本

  • KIP-88:OffsetFetchRequest v2支持偏移检索所有的主题,如果topics数组设置为null。
  • KIP-88:OffsetFetchResponse V2引入了顶级error_code域。
  • KIP-103:UpdateMetadataRequest v3引入一个listener_name字段到end_points数组中的元素。
  • KIP-108:CreateTopicsRequest V1引入了一个validate_only参数。
  • KIP-108:CreateTopicsResponse V1引入了error_message到数组topic_errors的元素。

0.8.40.9.x版本或0.10.0.X升级到0.10.1.0

0.10.1.0有线协议发生了变化。通过下面的推荐滚动升级计划,能保证在升级过程中无需停机。但是,请注意在升级之前仔细阅读0.10.1.0潜在的重大更改
注意:由于新协议的引入,它是升级你的客户端之前请先完成Kafka集群的升级(即0.10.1.x客户端仅支持0.10.1.x或更高版本的代理,但0.10.1.x的代理可以支持旧版本客户端)。

对于滚动升级:

  1. 更新所有代理上的server.properties文件,并添加以下属性:
    • inter.broker.protocol.version = CURRENT_KAFKA_VERSION(例如0.8.2.0,0.9.0.0或0.10.0.0)。
    • log.message.format.version = CURRENT_KAFKA_VERSION(参见升级后的潜在性能的影响对于此配置做什么的详细信息。)
  2. 升级代理服务器一次一个:关闭代理,更新代码,并重新启动。
  3. 一旦整个群集升级完成,通过编辑inter.broker.protocol.version并将其设置为0.10.1.0的协议版本。
  4. 如果您以前的消息格式为0.10.0,改变log.message.format.version至0.10.1(这是一个无效操作,如果0.10.0和0.10.1两个协议的消息格式相同)。如果您以前的消息格式版本低于0.10.0,不要改变log.message.format.version — 这个参数只能在所有的消费者都已经升级到0.10.0.0或更高版本之后修改。
  5. 逐一重新启动代理,新版本协议生效。
  6. 如果log.message.format.version仍比0.10.0低,等到所有的消费者都已经升级到0.10.0或更高版本,然后更改log.message.format.version到0.10.1,逐一重新启动代理服务器。

注意:如果你愿意接受宕机,你可以简单地把所有的代理服务器关闭,更新代码,然后重新启动他们。他们将默认使用新的协议。

注:改变协议版本并重新启动可以在代理服务器升级之后的任何时间做,没有必要必须立刻就做。

0.10.1.0的重大更改

  • 日志保留时间不再基于日志段的最后修改时间。相反,它会基于日志段里拥有最大的时间戳的消息。
  • 日志滚动时间不再取决于日志段创建时间。相反,它现在是基于消息的时间戳。进一步来说,如果日志段中第一个消息的时间戳是T,当一个新的消息具有的时间戳大于或等于T + log.roll.m,该日志将被覆盖。
  • 0.10.0的打开文件的处理程序将增加〜33%,因为每个日志段增加的时间索引文件。
  • 时间索引和偏移索引共享相同的索引大小的配置。因为时间索引条目大小是1.5倍偏移索引条目的大小。用户可能需要增加log.index.size.max.bytes以避免潜在的频繁的日志滚动。
  • 由于增加的索引文件,在某些代理服务器上具有大量的日志段(例如> 15K),代理启动期间日志加载过程可能很长。根据我们的实验,num.recovery.threads.per.data.dir设置为1可减少日志装载时间。

升级0.10.0Kafka流应用

  • 从0.10.0升级您的流应用程序到0.10.1确实需要一个代理的升级,因为Kafka 0.10.1的流应用程序只能连接到0.10.1代理。
  • 有几个API的变化不向后兼容(参见流API在0.10.1的变化有详细介绍)。因此,你需要更新和重新编译代码。只是交换了Kafka流库的jar文件将无法正常工作,并会破坏你的应用程序。

0.10.1.0显着的变化

  • 新的Java消费者不是beta版了,我们推荐它做新的应用开发。老Scala消费者仍然支持,但他们会在未来的版本中将会弃用,并将在未来的主版本中删除。
  • 在使用像MirrorMaker和控制台消费者新建消费者的过程中–new-consumer/ –new.consumer开关不再被需要; 一个简单地使用是通过一个Kafka代理去连接,而不是Zookeeper的合集。此外,控制台消费者去连接旧版本的消费者已被弃用,并将在未来的主版本中删除。
  • Kafka集群现在可以通过一个集群ID被唯一标识。其会在一个代理升级到0.10.1.0时自动生成。集群ID经由kafka.server可用:type= KafkaServer,name= ClusterId metric ,它是所述元数据响应的一部分。串行器,客户端拦截器和度量报告可以通过实现ClusterResourceListener接口接收集群ID。
  • BrokerState “RunningAsController”(值4)已被删除。由于一个bug,代理在转换状态之前只会简单的这种状态下,因此去除的影响应该很小。一种推荐的检测方法是一个给定的代理的控制器是由kafka.controller实现:type=KafkaController,name=ActiveControllerCount metric。
  • 新的Java消费者现在可以允许用户通过时间戳在分区上搜索偏移量(offset)。
  • 新的Java消费者现在可以从后台线程支持心跳检查。有一个新的配置 max.poll.interval.ms,它控制消费者会主动离开组(5分钟默认情况下)之前轮询调用的最大时间。配置的值 request.timeout.ms必须始终大于max.poll.interval.ms因为这是一个JoinGroup请求可以在服务器上被阻止到消费者被负载均衡之前的最长时间.所以我们可以改变默认值为刚好超过5分钟。最后,默认值session.timeout.ms已调整到10秒,默认值max.poll.records已更改为500。
  • 当授权者和用户没有说明某个主题的授权,代理将不再返回TOPIC_AUTHORIZATION_FAILED给请求,因为这会泄漏主题名称。相反,UNKNOWN_TOPIC_OR_PARTITION错误代码将被返回。使用Kafka生产者和消费者通常会在收到未知的主题错误时自动重试,这可能会导致意外的超时或延迟。如果你怀疑这种情况发生了,你可以查看客户端的log去检查。
  • 获取返回有默认的大小限制(消费者50 MB和副本的复制10 MB)。现有的每个分区的限制也适用(消费者和副本复制为1 MB)。请注意,这些限制都不是绝对最大值,在下一个要点有解释。
  • 消费者和副本可以继续进行,如果发现一个消息大于返回/分区大小的限制。更具体地,如果在非空的分区上提取的第一个消息比任一个或两个限值大,仍然会被返回。
  • 重载的构造函数加入到kafka.api.FetchRequest和kafka.javaapi.FetchRequest允许调用者指定分区顺序(因为顺序在V3是很重要的)。先前存在的构造函数被弃用,在发送请求以避免饥饿问题之前,分区会被洗牌。
  • 转载自 并发编程网 - ifeve.com

最后更新:2017-05-18 20:36:04

  上一篇:go  Clojure世界:日志管理——clojure.tools.logging
  下一篇:go  Clojure世界: STM的统计