单页网站后台订单系统,长沙网络推广平台,wordpress 视差模板,温州网页设计公司文章目录 基础客户端版本消息生产者消息消费者踩坑 客户端属性分析消费者分组消费机制生产者拦截器消息序列化消息分区路由机制生产者消息缓存机制发送应答机制生产者消息幂等性生产者消息事务 客户端流程总结 基础客户端版本
导入依赖
propertiesproject.build.… 文章目录 基础客户端版本消息生产者消息消费者踩坑 客户端属性分析消费者分组消费机制生产者拦截器消息序列化消息分区路由机制生产者消息缓存机制发送应答机制生产者消息幂等性生产者消息事务 客户端流程总结 基础客户端版本
导入依赖
propertiesproject.build.sourceEncodingUTF-8/project.build.sourceEncodingmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.targetspring-boot.version2.3.12.RELEASE/spring-boot.versionfastjson.version2.0.51/fastjson.version!--我服务器安装的kafka版本是3.4.0 所以最好和安装版本对应--kafka.version3.4.0/kafka.version
/propertiesdependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactIdversion${kafka.version}/version
/dependency消息生产者
package com.hs.kfk.basic;import org.apache.kafka.clients.producer.*;import java.util.Properties;
import java.util.concurrent.ExecutionException;/*** Description: 基本版本的消息生产者发送的消息就是一个简单是String* Author 胡尚* Date: 2024/8/7 18:33*/
public class BasicProducer {/*** 定义kafka服务端地址*/// private final static String BOOTSTRAP_SERVER 192.168.75.61:9092,192.168.75.62:9092,192.168.75.63:9092;private final static String BOOTSTRAP_SERVER worker1:9092,worker2:9092,worker3:9092;/*** 生产者往哪个topic中发送消息*/private final static String TOPIC_NAME disTopic;public static void main(String[] args) throws ExecutionException, InterruptedException {// 设置发送者相关的属性Properties properties new Properties();// 设置kafka端口properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);// 配置key的序列化类 去找org.apache.kafka.common.serialization.Serializer接口的实现类properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer);// 配置value的序列化类properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer);// 创建一个消息生产者对象ProducerString, String producer new KafkaProducer(properties);for (int i 0; i 5; i) {// 构建一条消息 构造方法中的传参是String topic, K key, V value// 这里的key和value的泛型要和上方定义的序列化类型匹配上ProducerRecordString, String record new ProducerRecord(TOPIC_NAME, Integer.toString(i), MyProducer i);// 发送消息// 发送消息方式一单向发送不关心服务端的应答, 仅仅把消息发送给服务器
// producer.send(record);// 发送消息方式二 同步发送get()获取服务端应答消息前会阻塞当前线程。RecordMetadata recordMetadata producer.send(record).get();String topic recordMetadata.topic();int partition recordMetadata.partition();long offset recordMetadata.offset();System.out.println(topic: topic \tpartition: partition \toffset: offset );// 发送消息方式三异步发送消息发送后不阻塞服务端有应答后会触发回调函数
// producer.send(record, new Callback() {
// Override
// public void onCompletion(RecordMetadata metadata, Exception exception) {
// // 使用 RecordMetadata 对象做相应的操作
// if (exception ! null){
// // 消息发送失败 处理逻辑
// }
// }
// });}// 消息生产者 调用close()方法producer.close();}
}控制台日志
SLF4J: Failed to load class org.slf4j.impl.StaticLoggerBinder.
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
topic:disTopic partition:0 offset: 4
topic:disTopic partition:3 offset: 4
topic:disTopic partition:0 offset: 5
topic:disTopic partition:3 offset: 5
topic:disTopic partition:1 offset: 2消息消费者
package com.hs.kfk.basic;import org.apache.kafka.clients.consumer.*;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;/*** Description: 基础班的消息消费方* Author 胡尚* Date: 2024/8/7 19:07*/
public class BasicConsumer {/*** 定义kafka服务端地址*/// private final static String BOOTSTRAP_SERVER 192.168.75.61:9092,192.168.75.62:9092,192.168.75.63:9092;private final static String BOOTSTRAP_SERVER worker1:9092,worker2:9092,worker3:9092;/*** 生产者往哪个topic中发送消息*/private final static String TOPIC_NAME disTopic;/*** 消费者组名*/private final static String CONSUMER_GROUP test;public static void main(String[] args) {// 设置消费者相关的属性Properties properties new Properties();// 设置kafka端口properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);// 每个消费者需要指定一个消费者组properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP);// 配置key的序列化类 去找org.apache.kafka.common.serialization.Deserializer接口的实现类properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer);// 配置value的序列化类properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer);// 创建一个消息消费者对象// 这里的key和value的泛型要和上方定义的序列化类型匹配上ConsumerString, String consumer new KafkaConsumer(properties);// 消费者可以订阅多个topicconsumer.subscribe(Arrays.asList(TOPIC_NAME));while (true) {// 消费者拉取消息 100毫秒超时时间// 这里一次拉取的是一批消息// 这里的key和value的泛型要和上方定义的序列化类型匹配上ConsumerRecordsString, String consumerRecords consumer.poll(Duration.ofMillis(100));// 处理消息for (ConsumerRecordString, String record : consumerRecords) {int partition record.partition();long offset record.offset();String topic record.topic();String key record.key();String message record.value();System.out.println(topic: topic \tpartition: partition \toffset: offset \tkey: key \tmessage: message);}// 提交offset消息就不会重复消费//同步提交表示必须等到offset提交完毕再去消费下一批数据。consumer.commitSync();// 异步提交表示发送完提交offset请求后就开始消费下一批数据了。不用等到Broker的确认。// consumer.commitAsync();}}
}控制台日志
SLF4J: Failed to load class org.slf4j.impl.StaticLoggerBinder.
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
topic:disTopic partition:3 offset: 4 key: 1 message: MyProducer1
topic:disTopic partition:3 offset: 5 key: 3 message: MyProducer3
topic:disTopic partition:1 offset: 2 key: 4 message: MyProducer4
topic:disTopic partition:0 offset: 4 key: 0 message: MyProducer0
topic:disTopic partition:0 offset: 5 key: 2 message: MyProducer2踩坑
问题消息生成者发送消息一直阻塞发送不出去kafka也接收不到消息
生产者方面的代码
// 直接调单机测试 一个ip
private final static String BOOTSTRAP_SERVER 192.168.75.61:9092;
private final static String TOPIC_NAME disTopic;public static void main(String[] args) throws ExecutionException, InterruptedException {Properties properties new Properties();// 设置kafka端口properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);// 配置key的序列化类 去找org.apache.kafka.common.serialization.Serializer接口的实现类properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);// 配置value的序列化类properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);// 创建一个消息生产者对象ProducerString, String producer new KafkaProducer(properties);for (int i 0; i 5; i) {// 构建一条消息 构造方法中的传参是String topic, K key, V valueProducerRecordString, String record new ProducerRecord(TOPIC_NAME, Integer.toString(i), MyProducer i);// 发送消息方式二 同步发送get()获取服务端应答消息前会阻塞当前线程。RecordMetadata recordMetadata producer.send(record).get();String topic recordMetadata.topic();int partition recordMetadata.partition();long offset recordMetadata.offset();System.out.println(topic: topic \tpartition: partition \toffset: offset );}// 消息生产者 调用close()方法producer.close();
} 问题截图 检查kafka服务是否正常手动启动生产者 能正常发送消息并且Topic也存在 检查windows能不能连接linux服务器使用telnet能连通网络没问题 跟踪源码看到发送消息时Cluster对象中的确是保存了我kafka集群相关的节点信息进一步确定网络没问题java代码都能访问通 进入到发送消息的方法这里也执行了 进入get()方法进行阻塞 消息也未发送至kafka因为消费方这里还有一个窗口 问题具体的解决
我三台 linux服务器中各自配置了域名解析
vi /etc/hosts192.168.75.61 worker1
192.168.75.62 worker2
192.168.75.63 worker3这里和搭建kafka集群时配置文件中中的配置没关系
而下图中也是直接返回的域名信息 接下来就在我windows上也配置了一个域名映射C:\Windows\System32\drivers\etc\hosts
192.168.75.61 worker1
192.168.75.62 worker2
192.168.75.63 worker3现在就能正常发送接收消息了
从下图中可以发现消费者也正常了Topic中4个partition其中两个分给了java应用两个分给了控制台窗口的消费者 客户端属性分析
消费者分组消费机制
消费者消费消息时需要通过group.id参数来指定消费者组源码中的描述如下
// 全类名org.apache.kafka.clients.CommonClientConfigs org.apache.kafka.clients.consumer.ConsumerConfig// 标识此消费者所属的消费者组的唯一字符串。如果消费者使用codesubscribe(topic)code或基于kafka的偏移量管理策略来使用组管理功能则需要此属性。
public static final String GROUP_ID_CONFIG group.id;// 我们最好为每一个组成员设置一个固定的instanceId这个参数通常可以用来减少Kafka不必要的rebalance。
// 如果不设置每一次消费者变动或者是网络问题kafka都会为多个partition和consumer实例进行负载均衡
// partition和consumer实例进行绑定如三方最近的一张图片所示
public static final String GROUP_INSTANCE_ID_CONFIG group.instance.id;我们通过消费者订阅某一个Topic就需要group.id这个参数
ConsumerString, String consumer new KafkaConsumer(properties);
// 消费者可以订阅多个topic
consumer.subscribe(Arrays.asList(TOPIC_NAME));大致流程如下图所示partition和某个Group中的consumer实例进行绑定 我们还可以基于kafka的偏移量管理策略的基础上自己对offset偏移量进行扩展管理。
如果仅仅基于kafka来管理当前消费者组对某个partition的消费offset可能存在一些问题因为这个offset是消费者通过下面两个方法调用kafka服务端再进行改变
//同步提交表示必须等到offset提交完毕再去消费下一批数据。
consumer.commitSync();
// 异步提交表示发送完提交offset请求后就开始消费下一批数据了。不用等到Broker的确认。
consumer.commitAsync();这个关键的offset偏移量是保存在Broker中的但是却是由“不靠谱”的客户端来主导推进的。Kafka服务端有以下的一些机制来保证服务端的稳定性。 如果客户端消费消息了一直不调用上方的commit方法岂不是broker中的offset一直得不到推进 kafka提供了一种自动提交的参数 // 如果为true则消费者的偏移量将在后台定期提交
public static final String ENABLE_AUTO_COMMIT_CONFIG enable.auto.commit;如果客户端瞎指定一个offset就往kafka服务端发请求这个offset在broker中根本就不存在 /**当Kafka中没有初始偏移量或者当前偏移量在服务器上不存在时该怎么办(例如因为数据已被删除):earliest:自动将偏移量重置为最早的偏移量latest:自动将偏移量重置为最近的偏移量none:如果没有找到消费者组的先前偏移量则向消费者抛出异常anything else:向消费者抛出异常
*/
public static final String AUTO_OFFSET_RESET_CONFIG auto.offset.reset;消费者消费消息Broker不会一直等待消费者的提交如果消费者长时间不提交Broker就会认为这个消费者挂了此时就会把这个partition中的消息往同组的其他消费者进行投递 消息重复消费问题
消费者业务处理时间较长此时消费者正常处理消息的过程中Broker端就已经等不下去了认为这个消费者处理失败了。这时就会往同组的其他消费者实例投递消息这就造成了消息重复处理。
所有我们可以换一种思路将Offset从Broker端抽取出来放到第三方存储比如Redis里自行管理。这样就可以自己控制用业务的处理进度推进Offset往前更新。
我们在消费消息之前判断当前消息的offset是否 redis中保存的offset如果是那么就表示这一条消息已经被消费过了就不要去消费了
redis中的key可以是消费者组名 Topic partition 组成 value就是offset
伪代码如下
// 每执行一条消息都更新一次redis中的offset
public class RedisConsumer {private final static String BOOTSTRAP_SERVER worker1:9092,worker2:9092,worker3:9092;private final static String TOPIC_NAME disTopic;private final static String CONSUMER_GROUP test;public static void main(String[] args) {// 设置消费者相关的属性Properties properties new Properties();properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP);properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer);properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer);// 创建消费者ConsumerString, String consumer new KafkaConsumer(properties);consumer.subscribe(Arrays.asList(TOPIC_NAME));while (true) {// 消费者拉取消息ConsumerRecordsString, String consumerRecords consumer.poll(Duration.ofMillis(100));// 处理消息for (ConsumerRecordString, String record : consumerRecords) {int partition record.partition();long offset record.offset();String topic record.topic();String redisKey redisKeyPrefex topic partition CONSUMER_GROUP;String key record.key();String message record.value();// TODO 通过redisKey 查询redis中的offset如果 redisOffset offset 就表示重复消费了// TODO 消费消息// TODO 存入redis中 redisKey offset}//异步提交。消费业务多时异步提交有可能造成消息重复消费通过Redis中的Offset就可以过滤掉这一部分重复的消息。consumer.commitAsync();}}
}改进一些的版本按照partition来每次去处理一个partition中的消息一个partition中的消息处理完成后就更新一次redis中的offset
while (true) {// 消费者拉取消息ConsumerRecordsString, String consumerRecords consumer.poll(Duration.ofMillis(100));// 先获取所有的partition然后每次取partition中的消息SetTopicPartition partitions consumerRecords.partitions();partitions.forEach(partition - {String redisKey redisKeyPrefex partition.topic() partition.partition() CONSUMER_GROUP;// TODO 根据redisKey 从redis中 获取这个消费者组下 这个partition对应的offset// 获取当前partition中所有的消息ListConsumerRecordString, String records consumerRecords.records(partition);for (ConsumerRecordString, String record : records) {long offset record.offset();// TODO 如果 redisOffset offset 就表示重复消费了// TODO 消费消息}// 获取当前partition 多个消息中的最后一个offsetlong offset records.get(records.size() - 1).offset();// TODO 根据redisKey将上方的offset存入redis中});//异步提交。消费业务多时异步提交有可能造成消息重复消费通过Redis中的Offset就可以过滤掉这一部分重复的消息。consumer.commitAsync();
}继续改进添加线程池去消费消息
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisTemplate;import javax.annotation.Resource;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;/*** 基于Redis管理消费者Offset,防止消息重复处理。* 伪代码就不去具体实现了。*/
public class RedisConsumer {Resourceprivate RedisTemplate redisTemplate;Logger logger LoggerFactory.getLogger(RedisConsumer.class);//计算密集型任务private final static int CORES 2* Runtime.getRuntime().availableProcessors();public static final String REDIS_PREFEX myoffset_;private volatile boolean IF_SLEEP false;private volatile boolean RUNNING true;private final ThreadPoolExecutor executorService;private String servers;private String topic;private String group;private final KafkaConsumerString,String consumer;public RedisConsumer(String servers,String topic,String group){this.servers servers;this.topic topic;this.group group;executorService (ThreadPoolExecutor) Executors.newFixedThreadPool(CORES,new ThreadFactory(){private final AtomicInteger threadNumber new AtomicInteger();Overridepublic Thread newThread(Runnable r) {return new Thread(null,r,RedisConsumer_threadNumber.getAndIncrement());}});Properties props new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 45000);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer);props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);//props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1);props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 64 * 1024);props.put(ConsumerConfig.GROUP_ID_CONFIG,group);consumer new KafkaConsumer(props);consumer.subscribe(Arrays.asList(topic.split(,)));}public void doTask(){try{while (RUNNING){try{if(!IF_SLEEP){ConsumerRecordsString, String records consumer.poll(Duration.ofSeconds(1));records.partitions().forEach(partition -{//从redis获取偏移量String redisKafkaOffset redisTemplate.opsForHash().get(REDIS_PREFEX partition.topic(), partition.partition()).toString();long redisOffset (nullredisKafkaOffset||.equals(redisKafkaOffset))?-1:Long.valueOf(redisKafkaOffset);ListConsumerRecordString, String partitionRecords records.records(partition);logger.info([pull partition] topic:{}, partition:{}, records size:{}, partition.topic(),partition.partition(), partitionRecords.size());partitionRecords.forEach(record -{//redis记录的偏移量kafka实际的偏移量表示已经消费过了则丢弃。if(redisOffset record.offset()){logger.error([pool discard] group id:{}, offset:{}, redisOffset:{} ,value:{}, group, record.offset(), redisOffset, record.value());return;}executorService.execute(()-{doMessage(record.topic(),record.value());});});//保存Redis偏移量long saveRedisOffset partitionRecords.get(partitionRecords.size()-1).offset();redisTemplate.opsForHash().put(REDIS_PREFEX partition.topic(), partition.partition(),saveRedisOffset);});//异步提交。消费业务多时异步提交有可能造成消息重复消费通过Redis中的Offset就可以过滤掉这一部分重复的消息。consumer.commitAsync();}}catch (Throwable e) {logger.warn([consumer exception] {}, e);}}}catch (Throwable e) {logger.warn([huge exception] to finish. {}, e);} finally {executorService.shutdown();try {executorService.awaitTermination(5, TimeUnit.SECONDS);logger.warn([wait finish] RedisConsumer time beyond {}., 5);} catch (InterruptedException e) {logger.warn([wait finish exception] RedisConsumer e:{}., e);}executorService.shutdownNow();consumer.close();logger.warn([finish consumer] topic:{}, groupId:{}., topic, group);}}//实际处理请求。通常可以交给子实现类去做。private void doMessage(String topic,String value){System.out.println([deal message] topic : topic ; value value);}
}生产者拦截器
生产者拦截机制允许客户端在生产者在消息发送到Kafka集群之前对消息进行拦截甚至可以修改消息内容。
// 我们自定义的拦截器需要实现org.apache.kafka.clients.producer.ProducerInterceptor接口默认情况下是没有拦截器的
public static final String INTERCEPTOR_CLASSES_CONFIG interceptor.classes;自定义一个拦截器
public class MyInterceptor implements ProducerInterceptor {Overridepublic ProducerRecord onSend(ProducerRecord record) {System.out.println(send(ProducerRecord, Callback)方法在key和value被序列化和分配分区partition之前调用);// 我们可以对record做相应的处理return record;}Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {System.out.println(当发送到服务器的记录已被确认/发送记录在发送到服务器之前失败/KafkaProducer.send()抛出异常时也会调用。);}Overridepublic void close() {System.out.println(连接关闭时会被调用);}Overridepublic void configure(MapString, ? configs) {System.out.println(整理配置项);System.out.println(config start);for (Map.EntryString, ? entry : configs.entrySet()) {System.out.println(entry.key:entry.getKey() /t entry.value: entry.getValue());}System.out.println(config end);}
}然后在生产者中指定拦截器类多个拦截器类用逗号隔开
Properties properties new Properties();
properties.setProperty(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, com.hs.kfk.intercepter.MyInterceptor);测试发送消息控制台打印结果如下
整理配置项
config start
entry.key:interceptor.classes entry.value: com.hs.kfk.intercepter.MyInterceptor
entry.key:bootstrap.servers entry.value: 192.168.75.61:9092,192.168.75.62:9092,192.168.75.63:9092
entry.key:value.serializer entry.value: org.apache.kafka.common.serialization.StringSerializer
entry.key:key.serializer entry.value: org.apache.kafka.common.serialization.StringSerializer
entry.key:client.id entry.value: producer-1
config endsend(ProducerRecord, Callback)方法在key和value被序列化和分配分区partition之前调用
当发送到服务器的记录已被确认/发送记录在发送到服务器之前失败/KafkaProducer.send()抛出异常时也会调用。
topic:disTopic partition:0 offset: 11send(ProducerRecord, Callback)方法在key和value被序列化和分配分区partition之前调用
当发送到服务器的记录已被确认/发送记录在发送到服务器之前失败/KafkaProducer.send()抛出异常时也会调用。
topic:disTopic partition:3 offset: 11send(ProducerRecord, Callback)方法在key和value被序列化和分配分区partition之前调用
当发送到服务器的记录已被确认/发送记录在发送到服务器之前失败/KafkaProducer.send()抛出异常时也会调用。
topic:disTopic partition:0 offset: 12send(ProducerRecord, Callback)方法在key和value被序列化和分配分区partition之前调用
当发送到服务器的记录已被确认/发送记录在发送到服务器之前失败/KafkaProducer.send()抛出异常时也会调用。
topic:disTopic partition:3 offset: 12send(ProducerRecord, Callback)方法在key和value被序列化和分配分区partition之前调用
当发送到服务器的记录已被确认/发送记录在发送到服务器之前失败/KafkaProducer.send()抛出异常时也会调用。
topic:disTopic partition:1 offset: 5连接关闭时会被调用消息序列化
之前的入门案例中就用到了这两个参数
// 消息生产者方 全类名 org.apache.kafka.clients.producer.ProducerConfig
// Serializer class for key that implements the codeorg.apache.kafka.common.serialization.Serializer/code interface.
public static final String KEY_SERIALIZER_CLASS_CONFIG key.serializer;// Serializer class for value that implements the codeorg.apache.kafka.common.serialization.Serializer/code interface.
public static final String VALUE_SERIALIZER_CLASS_CONFIG value.serializer;// 消息消费者方 全类名 org.apache.kafka.clients.consumer.ConsumerConfig
// Deserializer class for key that implements the codeorg.apache.kafka.common.serialization.Deserializer/code interface.
public static final String KEY_DESERIALIZER_CLASS_CONFIG key.deserializer;// Deserializer class for value that implements the codeorg.apache.kafka.common.serialization.Deserializer/code interface.
public static final String VALUE_DESERIALIZER_CLASS_CONFIG value.deserializer;消息生产者再发送消息时就是通过这里指定的key 和 value的序列化类对我们指定的消息keyvalue和 进行序列化转换为二进制数组进行网络传输。
如果生产者没有指定消息的key那么Kafka默认按照轮训的方式选择该消息应该发送到哪一个partition中如果指定了key就会可以的hash再去选择partition
消息消费方拉取消息后就会通过指定的key 和 value的反序列化类将字节数组转换为原始类型 在大部分的场景下传输String就已经能够满足业务需求了当然也可以自己定制序列化与反序列化类。
因为最终传输的数据是字节数组对于一个POJO类型的对象我们就可以分为定长的基础类型和不定长的引用类型来分别处理不定长类型我们可以先保存该数据的实际长度再保存该数据。比如User类
public class User {private Long uId;private String username;private Integer age;
}创建一个序列化类
package com.hs.kfk.serializer;import org.apache.kafka.common.serialization.Serializer;import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;/*** Description: User序列化类* Author 胡尚* Date: 2024/8/8 12:41*/
public class UserSerializer implements SerializerUser {Overridepublic byte[] serialize(String topic, User data) {// 简单粗暴的方式 直接转json// byte[] bytes JSON.toJSON(data).toString().getBytes(StandardCharsets.UTF_8);// 传输效率更高的方式byte[] userNameBytes data.getUsername().getBytes(StandardCharsets.UTF_8);// id:Long8字节 4字节保存不定长长度 username不定长长度 age int 4字节int cap 8 4 userNameBytes.length 4;ByteBuffer byteBuffer ByteBuffer.allocate(cap);byteBuffer.putLong(data.getuId());byteBuffer.putInt(userNameBytes.length);byteBuffer.put(userNameBytes);byteBuffer.putInt(data.getAge());return byteBuffer.array();}
}反序列化类
package com.hs.kfk.serializer;import org.apache.kafka.common.serialization.Deserializer;import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;/*** Description: User对象的反序列化类按位取值* Author 胡尚* Date: 2024/8/8 12:52*/
public class UserDeserializer implements DeserializerUser {Overridepublic User deserialize(String topic, byte[] data) {// 简单粗暴的序列化
// return JSON.parseObject(data, User.class);ByteBuffer byteBuffer ByteBuffer.wrap(data);long uid byteBuffer.getLong();int userNameSize byteBuffer.getInt();String username new String(byteBuffer.get(data, 8 4, userNameSize).array(), StandardCharsets.UTF_8).trim();int age byteBuffer.getInt();return new User(uid, username, age);}
}消息生产者和消费者再指定相应的序列号与反序列化类
// 消息生产者 配置value的序列化类
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, com.hs.kfk.serializer.UserSerializer);// 消息消费者 配置value的反序列化类
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, com.hs.kfk.serializer.UserDeserializer);消息分区路由机制
两个问题
producer生产消息是如何根据key选择partition的consumer消费者端是否也提供了选择partition的机制。 首先是producer端
public static final String PARTITIONER_CLASS_CONFIG partitioner.class;如果我们想自己实现partition选择策略的话就需要实现org.apache.kafka.clients.producer.Partitioner接口
如下图所示Kafka提供了三种实现RoundRobinPartitionerDefaultPartitioner和UniformStickyPartitioner Kafka默认机制是给一个生产者分配了一个分区后会尽可能一直使用这个分区。直到该分区至少产生String BATCH_SIZE_CONFIG batch.size(默认16KB)。它的工作策略是: 如果没有指定分区但存在一个键则根据键的散列选择一个分区 如果没有分区或键则选择在至少向分区生成batch.size字节后在往其他partition发送 RoundRobinPartitioner是在各个Partition中进行轮询发送这种方式没有考虑到消息大小以及各个Broker性能差异用得比较少。 consumer消费者端
public static final String PARTITION_ASSIGNMENT_STRATEGY_CONFIG partition.assignment.strategy;如果想自定义则需要实现ConsumerPartitionAssignor接口或者是继承AbstractPartitionAssignor抽象类
Kafka默认提供的消费者的分区分配策略
range策略 比如一个Topic有10个Partiton(partition 0~9) 一个消费者组下有三个Consumer(consumer13)。Range策略就会将分区03分给一个Consumer46给一个Consumer79给一个Consumer。round-robin策略轮询分配策略可以理解为在Consumer中一个一个轮流分配分区。比如0369分区给一个Consumer147分区给一个Consumer然后258给一个Consumersticky策略粘性策略。 在开始分区时尽量保持分区的分配均匀。比如按照Range策略分分区的分配尽可能的与上一次分配的保持一致。比如在range分区的情况下第三个Consumer的服务宕机了那么按照sticky策略就会保持consumer1和consumer2原有的分区分配情况。然后将consumer3分配的7~9分区尽量平均的分配到另外两个consumer上。这种粘性策略可以很好的保持Consumer的数据稳定性。
默认采用的是RangeAssignorCooperativeStickyAssignor分配策略 生产者消息缓存机制
消息并不是一条一条的往Kafka服务端发送的producer端存在一个高速缓存将消息集中到缓存中后批量进行发送。
其中涉及到了RecordAccumulator和Sender
int batchSize Math.max(1, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG));
this.accumulator new RecordAccumulator(......);其中RecordAccumulator就是生产者这边的消息累加器它会针对每一个partition维护一个Dqueue双端队列每个Dequeue里会放入若干个ProducerBatch数据。生产者生产的消息经过分区路由机制后会被分配到对应的Dqueue中的某一个ProducerBatch中
// 发送消息缓冲总内存默认32M
public static final String BUFFER_MEMORY_CONFIG buffer.memory;
// ProducerBatch大小默认16KB前文消息分区路由机制中也涉及到了该参数
public static final String BATCH_SIZE_CONFIG batch.size;
// 如果生产消息速度 缓存发送消息给服务器速度那么生产者将阻塞 MAX_BLOCK_MS_CONFIG 后抛异常 默认60秒
public static final String MAX_BLOCK_MS_CONFIG max.block.ms;每一个消息生产者都有一个 Sender发送消息的线程该线程将RecordAccumulator中的Dqueue中的ProducerBatch发送给Kafka。
Sender什么时候会从RecordAccumulator中取消息Sender读取ProducerBatch后以Broker为key放入队列中队列能放多少ProducerBatch
Sender只会从RecordAccumulator中获取内存达到String BATCH_SIZE_CONFIG batch.size大小的ProducerBatch当然也有可能消息生产频率不高比较长时间都达不到batch.sizeSender也不会一直等待最多等待String LINGER_MS_CONFIG linger.ms时长就会去将ProducerBatch中的消息读取出来。linger.ms默认值是0表示不会有等待时间基本上生产一条消息就发一条消息。
Sender读取ProducerBatch后以Broker为key缓存到一个对应的队列当中。这些队列当中的消息就称为InflightRequest。接下来这些Inflight就会发往Kafka对应的Broker中直到收到Broker的ack应答才会从队列中移除。这些队列也并不会无限缓存最多缓存String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION max.in.flight.requests.per.connection; 默认5
// Sender 等待ProducerBatch这一批消息的最大时长
public static final String LINGER_MS_CONFIG linger.ms;// 保存未ack确定批量消息的最大个数默认是5该没配置项必须1 并且必须开启幂等性如果开启幂等性该配置项的取值范围是(1,5]
public static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION max.in.flight.requests.per.connection;最后Sender会通过其中的一个Selector组件完成与Kafka的IO请求并接收Kafka的响应。
// org.apache.kafka.clients.producer.KafkaProducer#doSend
if (result.batchIsFull || result.newBatchCreated) {log.trace(Waking up the sender since topic {} partition {} is either full or getting a new batch, record.topic(), appendCallbacks.getPartition());this.sender.wakeup();
}这一章就涉及到了两个知识点Broker的ack应答、消息的幂等性 发送应答机制
Producer生产的消息发送到Broker后Broker会发送一个ack应答给producerproducer才确认这一批ProducerBatch消息发送成功。
这其中涉及到了下面这个参数
public static final String ACKS_CONFIG acks;acks0 producer 将不等待Kafka的ack应答producer也就拿不到partition等数据offset还一直返回-1 acks1 表示Leader partition将消息写入到了本地即可不需等待其他follower partition的应答。缺点是Leader在同步该条消息之前宕机了那么这条消息就不会同步到其他Follower上 acksall 或者 acks-1 等待整个partition副本集的ack应答。 这里可以和Kafka服务端的一个参数配合使用min.insync.replicas控制Leader Partition在完成多少个Partition的消息写入后往Producer返回响应。 这个参数需要再config/server.properties文件中指定 生产者消息幂等性
上方生产者消息缓存机制中下面这个参数必须开启幂等性idempotence
// 保存未ack确定消息的最大个数默认是5该没配置项必须1 并且必须开启幂等性如果开启幂等性该配置项的取值范围是(1,5]
public static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION max.in.flight.requests.per.connection;我们生产者发送应答机制acks1或-1时就一定要等待Broker端的应答如下图所示存在两次网络io一次发送消息一次ack应答
如果因为网络问题producer没有接收到Broker的ack应答它就认为这批消息发送失败了就重新发送默认重试次数String RETRIES_CONFIG retries 默认值是Integer.MAX。。那么Broker端应该如何不重复保存多条消息呢? 这就涉及到了幂等性问题了 幂等性相关的参数如下
// 值为 true 或 false
// 如果要开启幂等性那么生产者消息缓存机制中的 max.in.flight.requests.per.connection 5 并且 重试次数retries0 并且 应答机制中的acks必须为all
public static final String ENABLE_IDEMPOTENCE_CONFIG enable.idempotence;如果没有设置冲突配置则默认启用幂等性。如果设置了冲突的配置并且幂等性没有显式启用则幂等性被禁用。即显示开启的幂等性又有冲突配置就抛异常 分布式数据传递过程中的三个数据语义
at-leatst-once 最少一次at-most-once 最多一次exactly-once 精确一次
案例介绍
你往银行中存100元数据存入MQ时这一步不能出现消息丢失不然你肯定不满意如果没有收到服务端的ack应答就一直重试直到收到ack应答这就是at-leatst-once 最少一次你只存了100元肯定不能有多条数据多了银行肯定不满意这就是at-most-once 最多一次最终为了让双方都满意也就必须是exactly-once 精确一次才行。
再对应到Kafka中acks0这就保证了at-most-once 最多一次acks1或-1这就保证了at-leatst-once 最少一次可是acks就只有一个不管这么设置都不行所以最终通过幂等性才能保证exactly-once 精确一次 Kafka保证消息幂等性的概念
PID。为每一个producer都生成一个PID这个PID对producer是不可见的Sequence Number。对于每一个PID也就是producer针对Topic下的partition都会维护一个Sequence Number。初始值0当要往同一个partition发送消息时1。 producer每次发送消息都会携带PID和Sequence Number。SN。Broker会针对PID, Sequence Number 维护一个序列号SN。Broker接收到消息时就会先进行比较 Sequence Number SN 1 才会去进行消息保存的逻辑并且对应SN1。如果Sequence Number SN 1 就表示消息重复发送重新应答即可如果Sequence Number SN 1就表示中间有消息丢失给producer抛异常OutOfOrderSequenceException 生产者消息事务
消息的幂等性只能保证一个producer往一个partition写入消息的幂等性。而我们从生产者消息缓存机制中可知producer每次发送的是一批消息ProducerBatch。而这一批消息的key是不同的也就是说这里会往多个partition中发送消息而多个partition是会分布在不同的Broker中的那么现在就需要producer和多个Broker都要保证消息幂等性。
进而就引申出来消息事务的概念。Kafka中消息事务相关的api方法如下所示
// 1 初始化事务
void initTransactions();
// 2 开启事务
void beginTransaction() throws ProducerFencedException;
// 3 提交事务
void commitTransaction() throws ProducerFencedException;
// 4 放弃事务类似于回滚事务的操作
void abortTransaction() throws ProducerFencedException;案例
package com.hs.kfk.transaction;import org.apache.kafka.clients.producer.*;import java.util.Properties;
import java.util.concurrent.ExecutionException;/*** Description: 消息事务机制* Author 胡尚* Date: 2024/8/7 18:33*/
public class TransactionProducer {private final static String BOOTSTRAP_SERVER 192.168.75.61:9092,192.168.75.62:9092,192.168.75.63:9092;private final static String TOPIC_NAME disTopic;public static void main(String[] args) {Properties properties new Properties();properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer);properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer);// 指定一个消息事务id, 可以随意指定properties.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, 123);// 创建一个消息生产者对象ProducerString, String producer new KafkaProducer(properties);// 初始化事务 并 开启事务producer.initTransactions();producer.beginTransaction();try {for (int i 0; i 5; i) {// 异步发送消息当发送到id3的消息时 抛异常ProducerRecordString, String record new ProducerRecord(TOPIC_NAME, Integer.toString(i), MyProducer i);producer.send(record);if (i 3) {throw new NullPointerException();}}// 提交事务producer.commitTransaction();} catch (Exception e) {// 回滚事务System.out.println(出异常啦);producer.abortTransaction();} finally {producer.close();}}
}我们可以进行测试此时再启动一个消费者它也是不会消费到 i0 i1 i2 这三条消息的。这一批消息都回滚了。 一个事务id只会对应一个PID 如果当前一个Producer的事务没有提交而另一个新的Producer保持相同的TransactionId这时旧的生产者会立即失效无法继续发送消息。跨会话事务对齐如果某个Producer实例异常宕机了事务没有被正常提交。那么新的TransactionId相同的Producer实例会对旧的事务进行补齐。保证旧事务要么提交要么终止。这样新的Producer实例就可以以一个正常的状态开始工作。
生产者的事务消息机制保证了Producer发送消息的安全性但是他并不保证已经提交的消息就一定能被所有消费者消费。 客户端流程总结
高清在线流程图