Kafka动态配置实现原理解析计算机j

问题导读

ApacheKafka在全球各个领域各大公司获得广泛使用,得益于它强大的功能和不断完善的生态。其中Kafka动态配置是一个比较高频好用的功能,下面我们就来一探究竟。

动态配置是如何设计的?动态配置优先级是怎样的?Broker初始化是如何读取配置的?动态配置支持哪些特性功能?动态配置如何使用呢?前言介绍

Kafka初始开源的几个版本,当broker初始化启动时,所有配置信息只能从server.properties静态文件读取,以后不再发生任何更改,随着Kafka逐步迭代,在线业务对稳定性和个性化要求越来越突出,需要能支持在线修改功能动态生效的需求应运而生。例如:按照topic维度清理数据,依据clientid限流,根据用户名称进行访问权限控制等等。目前Kafka最新版本支持以下几类动态配置。

发展历程

动态配置文件发展历程:

kafka在0.8.0对topic的管理功能分布在三个shell中,它们分别是kafka-list-topic.sh、kafka-create-topic.sh、kafka-delete-topic.sh、kafka-add-partitions.sh,后来社区考虑到topic管理功能过于分散,到了0.8.1版本有关topic所有功能收敛到kafka-topics.sh中。0.8.0中只有topic的创建、删除和列表及添加分区功能,到了0.8.1开始支持topics动态配置了。

0.9.0.0开始支持client(producer和consumer)客户端配额限流支持,确保不因为某个或少数几个topic的客户端占满了broker带宽资源和磁盘IO资源,影响其他客户端的正常读写,导致集群内主从同步也受到影响。这个功能对确保系统SLA大有好处,通过服务降级,保证写/生产不受影响,降低或暂停读/消费流量更容易解决系统资源瓶颈。

0.9.0版本动态配置与topic管理分离,为了保持向下兼容kafka-topics.sh依然包含操作topic动态配置功能,新增kafka-configs.sh支持clients和topics动态配置功能,所以kafka-topics.sh和kafka-configs.sh任意一个都可以修改topic动态配置

0.10.1.0版本新增支持users和brokers动态配置功能,user动态配置用于访问资源的权限控制,提升集群的访问和数据安全性,例如:用户对读/写/创建/删除等操作和API、topic、group资源访问控制。broker动态配置,在不用重启及影响服务运行情况下,broker级别功能实现动态生效,例如:副本注册复制速率、磁盘内挂载点间数据迁移速率、网络请求的线程数、处理请求的I/O线程数等等全局参数等等。

0.10.1.0~2.3.1版本都支持topics、clients、users、brokers四类型动态配置的11种粒度配置对象,只是配置模块和属性字段有增减与调整。

动态配置设计原理

用户使用kafka-configs.sh脚本,根据格式和参数规范要求,ConfigCommand类进行相关逻辑处理、json格式和内容校验,生成notificationjson,写入到序列化持久节点上,zk路径为xxx/config/changes/config_change_seqNo,节点名称为config_change_seqNo,其中seqNo从1开始的自增序号。kafka集群中所有broker通过监听zk上xxx/config/changes的children变化,每次获得比当前内存中last_seqNo大的seqNo的json内容,从中读取entity_type/entity_name相对路径,由此判断如何从xxx/config/topics

clients

users

brokers四种类型中读取哪个配置路径。同一个Broker在操作过程中任何时刻只能串行读写一种类型的配置,多种配置需要串行操作。

各个角色的作用:

kafka-config.sh:负责写dynamicconfig和notification,写顺序上图有先后标志。

broker:负责监听xxx/config/changes子节点变化和读取entity_type/entity_name路径节点上内容

zk:负责存储notification和dynamicconfig及下发配置给相应的broker

notificationjson内容:

V10.10.0.1及以前版本有效

V20.10.1.0~2.3.1当前最新版本都有效

以上不管是version1还是version2,本质上没有变化。都是通过entity_type/entity_name获得entity_path的zk相对路径,全路径为xxx/config/entityType/entityName,具体请看如下详图

entity_type=topics

clients

users

brokers

entity_name=topicName

clientId

userId

(brokerId

default)

当entity_type为brokers时,brokerId为broker编号与自己的server.properties对应,只对某个broker生效。“default”指对所有broker生效。而entity_type为topics

clients

users对所有broker都生效。通过以上entity_type/entity_name六种组合成六个zk相对路径。

topics和clients组合原理一样,但users和brokers却略有不同,他们各自有2个组合,除了普通组合还有复合组合,两种类型组合在一起,例如users有users与clients组合,zk路径为users/user/clients/clientId;brokers动态配置非常实用,不需要重启就能动态更改任意数量brokers配置,更改所有brokers为xxx/brokers/default

四类动态配置11种zk相对路径,根据11种zk相对路径可以读取11种粒度配置对象dynamicconfig。

default说明:某种类型下所有作用域生效,例如xxx/clients/default和xxx/brokers/default就是集群内所有Allclients和集群内所有Allbrokers配置都会生效,其他同理。

dynamicconfig内容示例:

entity_type/entity_name=topics/topic_name=topics/finalTest

entity_type/entity_name=clients/clientId=clients/camusall

entity_type/entity_name=brokers/allbrokers=brokers/default

entity_type/entity_name=brokers/brokerId与allbrokers的配置形成完全一样,只是作用域范围不同而已,此处省略。

写配置格式校验

如果写入配置不进行规范校验,broker就会读取处理过程中,就会卡住或阻塞,影响服务运行稳定性。所以配置校验至关重要,校验规则如下:

配置参数格式必须合法,否则报错不予接收输入配置项进行校验,输入参数必须是kafka包含的配置项,否则过滤掉config_change_seqNo生成规则

kafka-configs.sh脚本每成功执行一次,在zk上就创建一个新的seqNode节点(即/xxx/config/changes/config_change_seqNo),seqNode是zk的持久顺序节点(PERSISTENT_SEQUENTIAL),它的组成是seqNode=seqNodePrefix+seqNodeSuffix,config_change_固定为seqNode的前缀,seqNodeSuffix=seqNo为seqNode的后缀,seqNo是10位数字的序列号,这个序列号后缀是自增的,由zk服务端自动生成和维护,每次事务请求成功就加1,它与MySQL的自增id原理一样。

config_change_seqno清除规则

集群经过长期运行积累,xxx/config/changes下会留存大量历史节点,如果不及时清理,会有以下影响:

大量无用的seqNode进行传输,会增加网络带宽负担占用zk服务端内存及存储资源Kafka会做大量无效判断和计算综上所述,因此必须要及时清除无用的seqNode集合,清除公式步骤如下:

当broker监听到notification变化回调时,记录系统时间。获取xxx/config/changes下所有子节点,读取每个seqNode的创建时间系统时间减去seqNode创建时间,如果时间差值大于过期时间,即changeExpirationMs,就会被删除changeExpirationMs默认为15分钟,可由broker配置。config_change_seqno判断处理

前面提到每当触发回调处理,seqNode节点创建时间过期15分钟会被删除,删除条件是触发才会被执行,如果长时间不创建就可能有少数几个seqNode一直保留。如果短时间内(15分钟内)创建大量seqNode,又不会立即被删除,只有等到下次触发达到条件才行,那怎么判断哪些会被处理呢?broker缓存中维护一个变量lastExecutedSeqNo,它负责保存执行历史中seqNode最大顺序号,所以每当触发回调获取seqNodeSet列表时,都能轻易判断出哪些需要处理计算,也会同步更新lastExecutedSeqNo。

notification作用

通知broker有新的动态配置产生,读取相应的动态配置不用监听大量四种类型配置下子节点,每个broker只需要监听一个notification节点即可,高效且性能也高大大减少broker监听数量,如果像controller监听/xxx/partitions/[0-N]/state一样,监听数量就是四种类型配置zk路径乘以broker数量了静态与动态的配置及优先级

静态与动态区别。静态配置是Broker内置默认配置和静态配置文件server.properties,broker启动前可以任意修改,启动后不可修改。动态配置是broker启动运行后,可以在线更新生效,偷偷说一句离线也可以改,就是不生效而已。

配置优先级。以上4个图包含4类型配置既有动态也有静态,那优先级如何呢?动态配置优先级高于静态配置。如上图1、2、4,环越小优先级越高,对于动态配置来说,修改配置作用域范围越小优先级越高,反之亦然。优先级最高的,会逐级覆盖相同配置项。

当broker启动时。读取顺序依次为broker内置默认配置,broker静态配置文件,动态配置。当配置项相同时,高优先级覆盖次优先的,其他依次类推

图示说明:上图1、2、3、4中,其中图1、2、4中环数字表示配置优先级关系,数字从1~5表示优先级从高到低。图3为两级关联。Users和Clients组合实现配置管理,这两者组合用于客户端配额限流,Users与Clients就像两级目录一样,一个User可以包含一个、多个clientId或所有clientId。图4中既有优先级关系也有配置参数包含关系,topics类型配置是brokers类型配置的子集,brokers除了包含topic配置外还有DynamicThreadPool、DynamicListenerConfig、DynamicConnectionQuota、LogCleaner配置。

如何使用动态配置

使用脚本kafka-configs.sh和kafka-topics.sh,kafka-configs.sh支持四种类型,kafka-topics.sh仅支持topics类型使用Apache-kafka官方提供的java版本客户端API调用直接写zk实现,具体遵循如上写notification和dynamic_config规范如想更深入了解kafka-configs.sh用法,请查看

总结

Kafka配置参数分为静态配置和动态配置,静态分为内置默认与外置用户配置,用户配置优先于内置配置动态配置为4类型11个zk相对路径,即11种粒度配置对象生效,同类型作用域范围越小优先级越高动态配置优先级比静态配置优先级高,动态配置中Users与Clients可以组合配置从设计原理中了解config_change_seqNo生成规则写上文中理解了写Notification的作用,从而理解什么场景会适合使用zk中持久顺序节点(PERSISTENT_SEQUENTIAL)注意事项:以上配置解析,是基于Kafka-2.3.1版本分析




转载请注明:http://www.aierlanlan.com/cyrz/1757.html