我的网站突然找不到网页了,安徽省合肥市建设局网站,一个好网站建设,抖音开放平台登录入口官网文章目录 整体架构元数据更新 整体架构
消息在真正发往Kafka之前#xff0c;有可能需要经历拦截器#xff08;Interceptor#xff09;、序列化器#xff08;Serializer#xff09;和分区器#xff08;Partitioner#xff09;等一系列的作用#xff0c;那么在此之后又会… 文章目录 整体架构元数据更新 整体架构
消息在真正发往Kafka之前有可能需要经历拦截器Interceptor、序列化器Serializer和分区器Partitioner等一系列的作用那么在此之后又会发生什么呢下面我们来看一下生产者客户端的整体架构如图所示。 整个生产者客户端由两个线程协调运行这两个线程分别为主线程和Sender线程发送线程。在主线程中由KafkaProducer创建消息然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器RecordAccumulator也称为消息收集器中。Sender线程负责从RecordAccumulator中获取消息并将其发送到Kafka中。
RecordAccumulator主要用来缓存消息以便Sender线程可以批量发送进而减少网络传输的资源消耗以提升性能。RecordAccumulator缓存的大小可以通过生产者客户端参数buffer.memory配置默认值为33554432B即32MB。如果生产者发送消息的速度超过发送到服务器的速度则会导致生产者空间不足这个时候KafkaProducer的send0方法调用要么被阻塞要么抛出异常这个取决于参数max.b1ock.ms的配置此参数的默认值为60000即60秒。
主线程中发送过来的消息都会被追加到RecordAccumulator的某个双端队列Deque中在RecordAccumulator的内部为每个分区都维护了一个双端队列队列中的内容就是ProducerBatch即Deque。消息写入缓存时追加到双端队列的尾部Sender读取消息时从双端队列的头部读取。注意ProducerBatch不是ProducerRecordProducerBatch中可以包含一至多个ProducerRecord。通俗地说ProducerRecord是生产者中创建的消息而ProducerBatch是指一个消息批次ProducerRecord会被包含在ProducerBatch中这样可以使字节的使用更加紧漆。与此同时将较小的ProducerRecord拼漆成一个较大的ProducerBatch也可以减少网络请求的次数以提升整体的吞吐量。ProducerBatch和消息的具体格式有关。如果生产者客户端需要向很多分区发送消息则可以将buffer.memory参数适当调大以增加整体的吞吐量。
消息在网络上都是以字节Byte的形式传输的在发送之前需要创建一块内存区域来保存对应的消息。在Kafka生产者客户端中通过java.io.ByteBuffer实现消息内存的创建和释放。不过频繁的创建和释放是比较耗费资源的在RecordAccumulator的内部还有一个BufferPool它主要用来实现ByteBuffer的复用以实现缓存的高效利用。不过BufferPool只针对特定大小的ByteBuffer进行管理而其他大小的ByteBuffer不会缓存进BufferPool中这个特定的大小由batch.size参数来指定默认值为16384B即16KB。我们可以适当地调大batch.size参数以便多缓存一些消息。
ProducerBatch的大小和batch.size参数也有着密切的关系。当一条消息ProducerRecord流入RecordAccumulator时会先寻找与消息分区所对应的双端队列如果没有则新建再从这个双端队列的尾部获取一个ProducerBatch如果没有则新建查看ProducerBatch中是否还可以写入这个ProducerRecord如果可以则写入如果不可以则需要创建一个新的ProducerBatch。在新建ProducerBatch时评估这条消息的大小是否超过batch.size参数的大小如果不超过那么就以batch.size参数的大小来创建ProducerBatch这样在使用完这段内存区域之后可以通过BufferPool的管理来进行复用如果超过那么就以评估的大小来创建ProducerBatch这段内存区域不会被复用。
Sender从RecordAccumulator中获取缓存的消息之后会进一步将原本分区Deque的保存形式转变成NodeListProducerBatch的形式其中Node表示Kafka集群的broker节点。对于网络连接来说生产者客户端是与具体的broker节点建立的连接也就是向具体的broker节点发送消息而并不关心消息属于哪一个分区而对于KafkaProducer的应用逻辑而言我们只关注向哪个分区中发送哪些消息所以在这单需要做一个应用逻辑层面到网络IO层面的转换。
在转换成NodeListProducerBatch的形式之后Sender还会进一步封装成NodeRequest的形式这样就可以将Request请求发往各个Node了这里的Request是指Kafka的各种协议请求对于消息发送而言就是指具体的ProduceRequest。
请求在从Sender线程发往Kafka之前还会保存到InFlightRequests中InFlightRequests保存对象的具体形式为MapNodeIdDeque它的主要作用是缓存了已经发出去但还没有收到响应的请求NodeId是一个String类型表示节点的id编号。与此同时InFlightRequests还提供了许多管理类的方法并且通过配置参数还可以限制每个连接也就是客户端与Node之间的连接最多缓存的请求数。这个配置参数为max.in.flight.requests.per.connection默认值为5即每个连接最多只能缓存5个未响应的请求超过该数值之后就不能再向这个连接发送更多的请求了除非有缓存的请求收到了响应Response。通过比较Deque的size与这个参数的大小来判断对应的Node中是否已经堆积了很多未响应的消息如果真是如此那么说明这个Node节点负载较大或网络连接有问题再继续向其发送请求会增大请求超时的可能。
元数据更新
上面提及的InFlightRequests还可以获得leastLoadedNode即所有Node中负载最小的那一个。这里的负载最小是通过每个Node在InFlightRequests中还未确认的请求决定的未确认的请求越多则认为负载越大。对于图中的InFlightRequests来说图中展示了三个节点Node0、Node1和Node2很明显Node1的负载最小。也就是说Node1为当前的leastLoadedNodec选择leastLoadedNode发送请求可以使它能够尽快发出避免因网络拥塞等异常而影响整体的进度。leastLoadedNode的概念可以用于多个应用场合比如元数据请求、消费者组播协议的交互。
我们只知道主题的名称对于其他一些必要的信息却一无所知。KafkaProducer要将此消息追加到指定主题的某个分区所对应的leader副本之前首先需要知道主题的分区数量然后经过计算得出或者直接指定目标分区之后KafkaProducer需要知道目标分区的leader副本所在的broker节点的地址、端口等信息才能建立连接最终才能将消息发送到Kafka在这一过程中所需要的信息都属于元数据信息。
在上面的讲解中我们了解了bootstrap.servers参数只需要配置部分broker节点的地址即可不需要配置所有broker节点的地址因为客户端可以自己发现其他broker节点的地址这一过程也属于元数据相关的更新操作。与此同时分区数量及leader副本的分布都会动态地变化客户端也需要动态地捕捉这些变化。
元数据是指Kafka集群的元数据这些元数据具体记录了集群中有哪些主题这些主题有哪些分区每个分区的leader副本分配在哪个节点上follower副本分配在哪些节点上哪些副本在AR、ISR等集合中集群中有哪些节点控制器节点又是哪一个等信息。
当客户端中没有需要使用的元数据信息时比如没有指定的主题信息或者超过metadata.max.age.ms时间没有更新元数据都会引起元数据的更新操作。客户端参数metadata.max.age.ms的默认值为300000即5分钟。元数据的更新操作是在客户端内部进行的对客户端的外部使用者不可见。当需要更新元数据时会先挑选出leastLoadedNode然后向这个Node发送MetadataRequest请求来获取具体的元数据信息。这个更新操作是由Sender线程发起的在创建完MetadataRequest之后同样会存入InFlightRequests之后的步骤就和发送消息时的类似。元数据虽然由Sender线程负责更新但是主线程也需要读取这些信息这里的数据同步通过synchronized和final关键字来保障。