网站一般费用,建设网络平台 请示,关于网站制作的论文,南京自助建站模板#x1f44f;作者简介#xff1a;大家好#xff0c;我是爱吃芝士的土豆倪#xff0c;24届校招生Java选手#xff0c;很高兴认识大家#x1f4d5;系列专栏#xff1a;Spring源码、JUC源码、Kafka原理#x1f525;如果感觉博主的文章还不错的话#xff0c;请#x1f44… 作者简介大家好我是爱吃芝士的土豆倪24届校招生Java选手很高兴认识大家系列专栏Spring源码、JUC源码、Kafka原理如果感觉博主的文章还不错的话请三连支持一下博主哦博主正在努力完成2023计划中源码溯源一探究竟联系方式nhs19990716加我进群大家一起学习一起进步一起对抗互联网寒冬 文章目录 安装部署安装 zookeeper 集群安装 kafka 集群 Kafka 运维监控Kafka-Eagle 简介Kafka-Eagle 安装启动 KafkaEagle访问 web 界面 命令行工具概述topic 管理操作kafka-topics查看 topic 列查看 topic 状态信息创建 topic删除 topic增加分区数动态配置 topic 参数 生产者kafka-console-producer消费者kafka-console-consumer消费组 消费位移的记录配置管理 kafka-config动态配置 topic 参数kafka是如何做到可以动态修改配置的 安装部署
安装 zookeeper 集群
配置zookeeper集群的核心就是以下每个zookeeper都要有
vi zoo.cfg
dataDir/opt/apps/data/zkdata
server.1doitedu01:2888:3888
server.2doitedu02:2888:3888
server.3doitedu03:2888:3888安装 kafka 集群
核心操作如下
vi server.properties#为依次增长的0、1、2、3、4集群中唯一 id
broker.id0# 数据存储的⽬录
# 每一个broker都把自己所管理的数据存储在自己的本地磁盘
log.dirs/opt/data/kafkadata#底层存储的数据日志留存时长默认 7 天
log.retention.hours168#底层存储的数据日志留存量默认 1G
log.retention.bytes1073741824#指定 zk 集群地址
zookeeper.connectdoitedu01:2181,doitedu02:2181,doitedu03:2181查看框架兼容的版本查看依赖的pom / 要去查看对应框架的源码然后进入查看
Kafka 运维监控
kafka 自身并没有集成监控管理系统因此对 kafka 的监控管理比较不便好在有大量的第三方监控管 理系统来使用比如Kafka Eagle
Kafka-Eagle 简介 Kafka-Eagle 安装
官方文档地址https://docs.kafka-eagle.org/
其中修改最核心的配置
######################################
# multi zookeeper kafka cluster list
# Settings prefixed with kafka.eagle. will be deprecated, use efak. instead
####################################### 给kafka集群起一个名字
efak.zk.cluster.aliascluster1
# 告诉它kafka集群所连接的zookeeper在哪里
cluster1.zk.listdoit01:2181,doit02:2181,doit03:2181######################################
# broker size online list
######################################
# 告诉集群broker服务器有多少台
cluster1.efak.broker.size3# 还需要一种数据库可以针对性的选择######################################
# kafka sqlite jdbc driver address 本地的嵌入式数据库
######################################
efak.driverorg.sqlite.JDBC
efak.urljdbc:sqlite:/opt/data/kafka-eagle/db/ke.db
efak.usernameroot
efak.passwordwww.kafka-eagle.org######################################
# kafka mysql jdbc driver address
######################################
#efak.drivercom.mysql.cj.jdbc.Driver
#efak.urljdbc:mysql://127.0.0.1:3306/ke?useUnicodetruecharacterEncodingUTF-8zeroDateTimeBehavior
convertToNull
#efak.usernameroot
#efak.password123456如上数据库选择的是 sqlite需要手动创建所配置的 db 文件存放目录/opt/data/kafka-eagle/db/
如果数据库选择的是 mysql则要放一个 mysql 的 jdbc 驱动 jar 包到 kafka-eagle 的 lib 目录中启动 KafkaEagle 访问 web 界面 初始是一个快速看板快速得到一些最要紧的信息。
启动先启动zookeeper再kafka
kafka默认对客户端暴漏的连接端口是 9092、zookeeper默认暴漏的端口是2181 显示信息比如说有多少分区、分布的均匀率、数据倾斜的比例、Leader倾斜的比例等参数。 Kafka-Eagle的意义
1.对生产力的提升。
2.监控Kafka集群的手段。
命令行工具
概述
Kafka 中提供了许多命令行工具位于$KAFKA HOME/bin 目录下用于管理集群的变更。
命令描述kafka-console-consumer.sh用于消费消息kafka-console-producer.sh用于生产消息kafka-topics.sh用于管理主题kafka-server-stop.sh用于关闭 Kafka 服务kafka-server-start.sh用于启动 Kafka 服务kafka-configs.sh用于配置管理kafka-consumer-perf-test.sh用于测试消费性能kafka-producer-perf-test.sh用于测试生产性能kafka-dump-log.sh用于查看数据日志内容kafka-preferred-replica-election.sh用于优先副本的选举kafka-reassign-partitions.sh用于分区重分配
topic 管理操作kafka-topics
查看 topic 列
bin/kafka-topics.sh --list --zookeeper doit01:2181查看 topic 状态信息
1查看 topic 详细信息
bin/kafka-topics.sh --zookeeper doitedu01:2181 --describe --topic topic-doit29从上面的结果中可以看出topic 的分区数量以及每个分区的副本数量Configs:compression.typegzip 代表着我们topic的数据是压缩的。
创建 topic
./kafka-topics.sh --zookeeper doitedu01:2181 --create --replication-factor 3
--partitions 3 --topic test参数解释
--replication-factor 副本数量
--partitions 分区数量
--topic topic 名称本方式副本的存储位置是系统自动决定的
手动指定分配方案分区数副本数存储位置
bin/kafka-topics.sh --create --topic tpc-1 --zookeeper doitedu01:2181
--replica-assignment 0:1:3,1:2:6该 topic将有如下 partition
partition0 所在节点 broker0、broker1、broker3
partition1 所在节点 broker1、broker2、broker6而顺序的不同有可能导致leader位置的不同删除 topic
bin/kafka-topics.sh --zookeeper doitedu01:2181 --delete --topic test删除 topicserver.properties 中需要一个参数处于启用状态 delete.topic.enable true
使用 kafka-topics .sh 脚本删除主题的行为本质上只是在 ZooKeeper 中的 /admin/delete_topics 路径 下建一个与待删除主题同名的节点以标记该主题为待删除的状态。然后由 Kafka 控制器异步完成。
增加分区数
kafka-topics.sh --zookeeper doit01:2181 --alter --topic doitedu-tpc2 --partitions 3Kafka 只支持增加分区不支持减少分区(增加分区不涉及历史数据的合并是一个轻量级的操作而减少分区必然涉及到历史数据的转移合并代价太大)
原因是减少分区代价太大数据的转移日志段拼接合并如果真的需要实现此功能则完全可以重新创建一个分区数较小的主题然后将现有主题中的消息按 照既定的逻辑复制过去
动态配置 topic 参数
通过管理命令可以为已创建的 topic 增加、修改、删除 topic level 参数
添加/修改 指定 topic 的配置参数
kafka-topics.sh --alter --topic tpc2 --config compression.typegzip --zookeeper doit01:2181
# --config compression.typegzip 修改或添加参数配置
# --add-config compression.typegzip 添加参数配置
# --delete-config compression.type 删除配置参topic 配置参数文档地址 https://kafka.apache.org/documentation/#topicconfigs
当然其核心的配置参数有蛮多的
broker的配置参数consumer的配置参数producer的配置参数topic的配置参数
生产者kafka-console-producer
bin/kafka-console-producer.sh --broker-list doitedu01:9092 --topic testhello word
kafka
nihao其实存在着一些思考和问题比如我们根本不知道到底是不是写进去了那么我们应该怎么办
消费者kafka-console-consumer
消费者在消费的时候需要指定要订阅的主题还可以指定消费的起始偏移量
起始偏移量的指定策略有 3 种
earliest 从最早的开始消费
latest 从最新的开始消费
指定的 offset 分区号偏移量 从你指定的位置开始消费
从之前所记录的偏移量开始消费
kafka 的 topic 中的消息是有序号的序号叫消息偏移量而且消息的偏移量是在各个 partition 中
独立维护的在各个分区内都是从 0 开始递增编号消费消息从开始的开始消费
bin/kafka-console-consumer.sh --bootstrap-server doitedu01:9092 --from-beginning
--topic test但是会存在一种情况比如说 先生产了很多消息进集群中然后开始消费的话可能不会保证有序因为数据是存储在不同的分区中的消费者在消费的时候是先把一个分区的数据消费完然后再去消息其他分区。所以这也就导致了全局顺序不一致的情况。 如果不加 --from-beginning 默认从最新的开始消费 当再次执行消费者的时候会返回0条因为已经没有最新的了已经存在的都叫老数据了。
如果此时还想让消费者 消费到数据那就去生产新的数据。
指定要消费的分区和要消费的起始 offset
bin/kafka-console-consumer.sh --bootstrap-server
doitedu01:9092,doitedu02:9092,doitedu03:9092 --topic doit14 --offset 2 --partition 0在这里其实要明白的一个点就是生产者把数据写入topic的时候默认是把数据在多个分区间轮询写入。
每一个消息都有一个序号对应的消息的序号offset递增都是每个分区内管理的消息的offset在topic中并不会有全局的递增号。所以offset是在各个分区内独立维护的那么也就意味着每个分区中都有offset0的消息
消费组
消费组是 kafka 为了提高消费并行度的一种机制
如果只有一个消费者那么就会是这样 消费者轮询消费对应的分区。
而如果topic中数据量太大而你需要多个并行处理任务去处理topic中的数据那么就需要消费组。
消费组内的各个消费者之间分担数据读取任务的最小单位是分区。
同一个分区只会被消费组内某一个消费者来负责读取。 而如果出现消费者组 中消费者 大于 分区数那么就会剩下来。 在kafka的底层逻辑中任何一个消费者都有自己所属的组
组和组之间没有任何关系大家都可以消费到目标topic中所有的数据但是组内的各个消费者就只能读到自己所分配的 分区 如何让多个消费者组成一个组 就是让这些消费者的 groupId 相同即可
KAFKA 中的消费组可以动态增减消费者 而且消费组中的消费者数量发生任意变动都会重新分配分区消费任务
消费位移的记录
kafka 的消费者可以记录自己所消费到的消息偏移量记录的这个偏移量就叫消费位移
记录这个消费到的位置作用就在于消费者重启后可以接续上一次消费到位置来继续往后面消费
其实讲白了就是为了断点续传。 例如上图 消费者A正在读突然消费者组里新增了一个消费者那么这个程序读的进程会被中断先停重新分配一下分区然后再来。
kafka消费者是有这个功能的它会自己去记消费到那条消息了万一消费者崩了重启也知道从哪里继续消费。
其消费的本质是按照组来记偏移量整个偏移量组内共享并不是按照单个消费者来记毕竟消费者组里的消费者可以动态收缩
如果此时又来了一个新的消费者组来消费 topic那么就没有对应的偏移量。
有一个比较经典的问题
如果我们消费一个数据已经读到了但是还没有来得及更新偏移量正要更新偏移量的时候崩溃了那么此时重启之后会发生什么
此时就会被重复消费。
该模式主要是 先读后记如果是先记后读可能连读都读不到
但是还有另外的一个情况就是可能重复的数据不止一条。
比如 消费了好几条再记录一次去读一批然后去更新偏移量。
kafka的消费者去读取数据是消费者主动向broker去请求拉取而不是broker服务器来推送具体拉取多少条是有参数配置的
如果拉取的速度比进行的速度要快的话那么消费者就经常的处于饥饿的状态如果进来的速度比我拉取的要快那么就会造成数据大量的积压。 如果在不同机器启动同一个消费者组里的消息者还是能够共享偏移量的
是因为偏移量数据并没有存储在本地磁盘上在0.11.x之前消费者确实是把自己消费到的位置消费位移记录到zk上之后是记录在kafka的一个内部的topic中 __consumer_offsets。
类似于mysql也是一样的逻辑内部也有一些系统内部表 通过指定 formatter 工具类来对__consumer_offsets 主题中的数据进行解析
bin/kafka-console-consumer.sh --bootstrap-server doitedu01:9092 --topic __consumer_offsets --formatter
kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatterconsumer去记录偏移量的时候不是 读到一批数据就记录一次也不是记录一次后再去读数据而是周期性定期去提交当前的位移。
如果真的发生了更新那就去改数字没发生更新就和原来一样。周期性 5s 去提交当前的位移
其实也可以从上面的记录中就可以看到kafka的消费者记录新的消费位移并不是去修改上一次的而是重新记录追加新记录像日志一样。
所以就像之前说的kafka的配置文件里面数据存储目录不叫data.dirs 而是叫 log.dirskafka之所以把自己的数据存储目录称之为 log目录是因为他底层存储数据的特性类似于 “日志” 数据只能不断追加。
这种日志也不能删除只能将超过日期的日志进行截断留下的各个消费者组的都有。当然针对这一点因为消费者组再启动消费的时候是可以显示指定起始偏移量也就是说可以忽略之前所记录的偏移量。
如果需要获取某个特定 consumer-group 的消费偏移量信息则需要计算该消费组的偏移量记录所在 分区 Math.abs(groupID.hashCode()) % numPartitions __consumer_offsets 的分区数为50
配置管理 kafka-config
kafka-configs.sh 脚本是专门用来进行动态参数配置操作的这里的操作是运行状态修改原有的配置 如此可以达到动态变更的目的
动态配置的参数会被存储在 zookeeper 上因而是持久生效的 可用参数的查阅地址 https://kafka.apache.org/documentation/#configuration
kafka-configs.sh 脚本包含变更 alter、查看 describe 这两种指令类型 kafka-configs. sh 支持主题、 broker 、用户和客户端这 4 个类型的配置。
kafka-configs.sh 脚本使用 entity-type 参数来指定操作配置的类型并且使 entity-name 参数来指定操 作配置的名称。
比如查看 topic 的配置可以按如下方式执行
bin/kafka-configs.sh --zookeeper doit01:2181 --describe --entity-type topics --entity-name tpc_2比如查看 broker 的动态配置可以按如下方式执行
bin/kafka-configs.sh --describe --entity-type brokers --entity-name 0 --zookeeper doit01:2181entity-type 和 entity-name 的对应关系 示例添加 topic 级别参数
bin/kafka-configs.sh --zookeeper doit:2181 --alter --entity-type topics --entity-name tpc22 --add-config
cleanup.policycompact,max.message.bytes10000使用 kafka-configs.sh 脚本来变更 alter 配置时会在 ZooKeeper 中创建一个命名形式为 /config//的节点并将变更的配置写入这个节点
示例添加 broker
kafka-configs.sh --entity-type brokers --entity-name 0 --alter --add-config log.flush.interval.ms1000
--bootstrap-server doit01:9092,doit02:9092,doit03:9092动态配置 topic 参数
通过管理命令可以为已创建的 topic 增加、修改、删除 topic level 参
添加/修改 指定 topic 的配置参数
kafka-topics.sh --topic doitedu-tpc2 --alter --config compression.typegzip --zookeeper doit01:2181如果利用 kafka-configs.sh 脚本来对 topic、producer、consumer、broker 等进行参数动态
添加、修改配置参数
bin/kafka-configs.sh --zookeeper doitedu01:2181 --entity-type topics --entity-name tpc_1
--alter --add-config compression.typegzip删除配置参数
bin/kafka-configs.sh --zookeeper doitedu01:2181 --entity-type topics --entity-name tpc_1
--alter --delete-config compression.typekafka是如何做到可以动态修改配置的
Kafka之所以能够动态配置是因为它设计时考虑到了在运行时动态更改配置的需求。Kafka的配置信息存储在Zookeeper中而不是像传统的配置文件那样静态地存储在本地磁盘上。这样一来当需要更改配置时只需要在Zookeeper上修改对应的配置节点Kafka会自动检测到变化并按照新的配置进行运行。
Kafka实现动态配置的原理是基于Zookeeper的Watcher机制。当Kafka启动时会将配置信息存储在Zookeeper的一个特定目录下并且通过Watcher监听该目录的变化。当配置信息发生变化时Zookeeper会通知KafkaKafka会重新加载新的配置并应用到运行中的服务。
以下是一个简化的伪代码示例展示了Kafka动态配置的实现原理
# Kafka启动时初始化配置
def initialize_config():config load_config_from_zookeeper() # 从Zookeeper加载配置apply_config(config) # 应用配置# 从Zookeeper加载配置
def load_config_from_zookeeper():config_data zookeeper.get(/kafka/config) # 从Zookeeper获取配置数据return parse_config(config_data) # 解析配置数据# 解析配置数据
def parse_config(config_data):# 将配置数据解析为可用的配置对象config Config()config.load_from_dict(config_data)return config# 应用配置
def apply_config(config):# 根据配置更新Kafka的运行时参数update_kafka_config(config)# 监听配置变化
def watch_config_changes():while True:changes zookeeper.watch(/kafka/config) # 监听配置目录config parse_config(changes) # 解析配置变化apply_config(config) # 应用配置# 修改配置
def modify_config(config_changes):zookeeper.set(/kafka/config, config_changes) # 更新Zookeeper上的配置数据# Kafka启动时初始化配置
initialize_config()# 启动监听配置变化的线程
start_thread(watch_config_changes)# 修改配置的示例
modify_config(new_config_changes)