服装网站建设的需求,wordpress 插件漏洞扫描,网页图片转换成word文档,wordpress柚子皮5.31目录
一、生产者的初始化流程
二、生产者到缓冲队列的流程
三、Sender拉取数据到Kafka流程
四、消费者初始化
五、主题订阅原理
六、消费者抓取数据原理
七、消费者组初始化
八、消费者组消费流程
九、提交offset原理 一、生产者的初始化流程
首先获取事务id和客户端…目录
一、生产者的初始化流程
二、生产者到缓冲队列的流程
三、Sender拉取数据到Kafka流程
四、消费者初始化
五、主题订阅原理
六、消费者抓取数据原理
七、消费者组初始化
八、消费者组消费流程
九、提交offset原理 一、生产者的初始化流程
首先获取事务id和客户端id用到事物必须要事物id不然报错每个生产者都需要唯一标识客户端id监控kafka相关情况的JmxReporter配置然后获取分区器如果用户有自定义的就读取配置的如果没有配置就用默认分区器然后key和value进行序列化然后就读取自定义拦截器可以定义多个拦截器组成拦截器链然后初始化控制单条日志的大小默认是1m缓冲区大小默认32m创建内存池缓存队列初始化批次大小默认16k压缩相关处理默认是none重试间隔时间默认100ms连接kafka集群获取元数据才能知道要发送到哪个分区创建sender线程会有个创建sender的方法sender线程负责拉取缓冲队列消息到Kafka在方法里面会定义缓存请求的个数默认5个然后请求超时的时间然后创建一个网络请求客户端对象会传入刚刚的参数还有客户端id重试时间发送缓冲区的大小128和接受缓冲区的大小32还有acks等配置。sender继承了Runnbale接口然后会new个sender线程出来用上面这些参数然后返回。sender放到后台启动sender线程
二、生产者到缓冲队列的流程
在执行到拦截器的时候就要调用一个onSend方法如果有多个拦截器每个拦截器都会走一次这个方法这个方法就是拦截器对数据加工的然后获取元数据要根据主题的分区放到对应的缓存队列序列化相关操作key和value的序列化和压缩分区操作如果指定了分区直接分配到指定分区没有指定就会根据分区器进行分配没有指定key就会粘性分区处理如果批次大小和活着时间到了不然就一直是那个满足才能创建新队列用如果指定key就根据key到hashcode进分区数取模保证序列化和压缩后数据大小能够传输他去读取配置的消息最大值和缓冲区大小如果有超过的抛异常向缓存队列里面追加数据获取或者创建一个队列按照分区然后尝试添加数据一般不成功因为还没申请内存然后根据16k和现在压缩后的总大小取最大值申请内存就申请这个大小内存池分配内存然后sender线程拿走就了会释放内存。如果批次大小满了或者有了新的批次需要创建就唤醒sender线程把缓冲队列的数据拉取过去。
三、Sender拉取数据到Kafka流程
事务相关操作获取元数据信息为了知道发到哪个分区判断32m缓存是否准备好先获取队列的信息先判断内存队列有没有数据判断leader是不是空如果没有目标那还是会抛出异常如果批次大小或时间满足一个条件就会发送。把所有请求按照节点为单位来发送请求这样一台机器只需要建立一次连接封装了个request然后通过网络客户端把数据发送过去然后服务端还是通过网络客户端获取结果
四、消费者初始化
消费者组平衡获取消费者组id和客户端id设置请求服务端等待时间默认30秒重试时间默认100毫秒拦截器链相关处理key和value的反序列化判断offset从什么位置开始消费获取消费者元数据重试时间、是否允许访问系统主题默认false是否允许自动创建topic主题默认true连接Kafka集群创建网络客户端对象连接重试时间默认50ms最大重试时间1s发送缓冲区128kb和接受缓冲区64kb大小指定消费者分区分配策略创建coordinator对象设置自动提交offset时间默认5s配置抓取数据的参数最少抓取多少最大一次抓取多少等
五、主题订阅原理
传入要订阅的主题如果为null直接抛出异常注册负载均衡监听器如果消费者组中有节点挂了要通知其他消费者按照主题自动订阅进行分配
六、消费者抓取数据原理
他首先先初始化消费者组和队列然后回调消息会到缓冲队列然后去队列抓取数据最多一次500条然后抓取后拦截器开始处理数据
七、消费者组初始化
先判断coordinator不为null那就说明为消费者组如果没有指定分区分配策略会抛出异常判断coordinator是否准备好他会循环创建查找coordinator的请求并发送并获取服务器返回到结果
他这整个消费者组初始化就是判断coordinator有没有准备好
八、消费者组消费流程
他会用判断coordinator是不是空是的话就等待他上来先去队列拉取数据一般是拉取不到的他先构造请求的入参最少一次抓多少最多抓多少超时时间等待然后调用send他送后返回future通过回调获取数据的他会循环遍历数据获取分区获取分区的数据如果有数据就放到消息队列里面然后就调用从队列拉取数据的方法拉取然后他有大小限制最大500他会循环一波一波拉取过去然后放到拦截器走加工操作
九、提交offset原理
同步提交找到coordinator然后调用commitOffset进行发送然后不停dowhile循环调用发送提交请求然后等待回调获取结果一直循环到成功为止。异步提交他还是用coordinator去提交但是他不等待结果他new了个监听等待结果。