怎么做网站添加二维码,男科医院咨询免费,工程公司,wordpress域名重定向前言
Spring Kafka 无缝集成了 Spring Boot、Spring Framework 及其生态系统中的其他项目#xff0c;如 Spring Cloud。通过与 Spring Boot 的自动配置结合#xff0c;开发者可以快速启动和配置 Kafka 相关的功能。无需编写大量样板代码即可实现 Kafka 的生产和消费功能如 Spring Cloud。通过与 Spring Boot 的自动配置结合开发者可以快速启动和配置 Kafka 相关的功能。无需编写大量样板代码即可实现 Kafka 的生产和消费功能在公司内部微服务项目中大量使用了该框架以下是个人的一些记录。
笔者环境springboot-2.7.14, java version “1.8.0_401” 一、Spring-kafka是什么
Spring Kafka 是 Spring 框架的一个项目旨在简化与 Apache Kafka 的集成。它提供了一组用于与 Kafka 通信的高级抽象和便利功能使开发人员可以更轻松地在 Spring 应用程序中使用 Kafka 进行消息传递。
1. 什么是 Apache Kafka
Apache Kafka 是一个分布式流处理平台广泛用于构建实时数据管道和流应用程序。它具有以下几个关键功能
发布和订阅记录流类似于消息队列或企业消息传递系统。存储记录流存储数据流容错。处理记录流实时或批处理方式处理数据流。
2. 什么是 Spring Kafka
Spring Kafka 是 Spring 框架的一个子项目提供了用于与 Kafka 集成的便捷工具和抽象。Spring Kafka 包含在 Spring Boot 项目中使得 Kafka 消息处理在 Spring 应用程序中变得更加容易和直观。
3. Spring Kafka 的主要功能
KafkaTemplate用于发送消息的模板类。 KafkaListener用于消费消息的注解支持自动并发和分区分配。 KafkaMessageListenerContainer低级别的消息监听容器提供了更大的灵活性。 事务支持支持与 Kafka 的事务集成确保消息的原子性和一致性。 错误处理支持各种错误处理机制包括重试、死信队列等。
4. 典型使用场景
消息驱动的微服务使用 Kafka 作为微服务之间的消息传递中介。实时数据处理处理流式数据如日志收集、监控和分析。事件溯源使用Kafka 记录所有事件支持事件溯源和审计。数据集成连接不同的数据源和目标系统实现数据集成和同步。
二、如何使用进行消息消费
只需要简单四步步即可引入依赖配置消费者配置文件配置 kafka configuration , 配置消费者监听器
1. 引入maven依赖 dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactId/dependency2. 配置文件
以下仅用于参考
server:port: 8080
logging:level:root: info
spring:datasource:url: jdbc:mysql://localhost:3306/testusername: rootpassword: rootdriver-class-name: com.mysql.cj.jdbc.Driver# 开启事务管理transaction:management:enabled: truekafka:bootstrap-servers: localhost:9092# 消费监听接口监听的主题不存在时默认会报错listener:missing-topics-fatal: falsetemplate:default-topic: eventsproducer:# 当retris为0时produce不会重复。retirs重发此时repli节点完全成为leader节点不会产生消息丢失。retries: 0#procedure要求leader在考虑完成请求之前收到的确认数用于控制发送记录在服务端的持久化其值可以为如下#acks 0 如果设置为零则生产者将不会等待来自服务器的任何确认该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下无法保证服务器已收到记录并且重试配置将不会生效因为客户端通常不会知道任何故障为每条记录返回的偏移量始终设置为-1。#acks 1 这意味着leader会将记录写入其本地日志但无需等待所有副本服务器的完全确认即可做出回应在这种情况下如果leader在确认记录后立即失败但在将数据复制到所有的副本服务器之前则记录将会丢失。#acks all 这意味着leader将等待完整的同步副本集以确认记录这保证了只要至少一个同步副本服务器仍然存活记录就不会丢失这是最强有力的保证这相当于acks -1的设置。#可以设置的值为all, -1, 0, 1acks: 1# 指定消息key和消息体的编解码方式key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer# 每次批量发送消息的数量,produce积累到一定数据一次发送#batch-size: 16384# produce积累数据一次发送缓存大小达到buffer.memory就发送数据#buffer-memory: 33554432consumer:# 指定默认消费者group id -- 由于在kafka中同一组中的consumer不会读取到同一个消息依靠groud.id设置组名group-id: events-group-1# smallest和largest才有效如果smallest重新0开始读取如果是largest从logfile的offset读取。一般情况下我们都是设置smallestauto-offset-reset: earliest# enable.auto.commit:true -- 设置自动提交offsetenable-auto-commit: true#如果enable.auto.commit为true则消费者偏移自动提交给Kafka的频率以毫秒为单位默认值为5000。auto-commit-interval: 100# 指定消息key和消息体的编解码方式key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproperties:spring.json.value.default.type: com.tan.kafka.model.Event# 配置消费主题和消费者组2
kafka:topic: events_testgroup-2: events-group-2group-3: events-group-3group-5: events-group-5
里面的注释已经非常清晰了相信都能看懂。
3. 配置kafka 消费者配置并使用注解启用 EnableKafka
Configuration
Slf4j
public class KafkaConsumerConfig {/*** 创建一个Kafka消费者工厂用于生产特定配置的Kafka消费者。* param consumerFactory* return*/Beanpublic ConcurrentKafkaListenerContainerFactoryString, Event kafkaListenerContainerFactory(ConsumerFactoryString, Event consumerFactory) {ConcurrentKafkaListenerContainerFactoryString, Event factory new ConcurrentKafkaListenerContainerFactory();factory.setConsumerFactory(consumerFactory);log.info(KafkaListenerContainerFactory created.);return factory;}/*** 创建一个Kafka消费者工厂用于生产特定配置的Kafka消费者。* 这个方法的主要作用是配置消费者的各项参数包括连接服务器、消费者组ID、序列化器等* 以便消费者能够正确地从Kafka主题中消费消息。** return ConsumerFactoryString, Event 返回一个配置好的消费者工厂用于创建字符串键和Event值的消费者实例。*/Beanpublic ConsumerFactoryString, Event consumerFactory() {// 初始化配置属性映射用于设置消费者的配置参数。MapString, Object configProps new HashMap();// 配置Kafka服务器的连接地址和端口。configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092);// 配置消费者的组ID用于标识消费者属于哪个消费组。configProps.put(ConsumerConfig.GROUP_ID_CONFIG, events-group);// 配置键和值的反序列化器这里使用StringDeserializer和JsonDeserializer。// StringDeserializer用于反序列化键JsonDeserializer用于反序列化值并且信任所有包设置默认值类型为Event类。configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);configProps.put(JsonDeserializer.TRUSTED_PACKAGES, *);configProps.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Event.class);// 使用配置属性映射创建并返回一个默认的Kafka消费者工厂。return new DefaultKafkaConsumerFactory(configProps);}
}
⚠️ 这里的EnableKafka 必不可少因为它的主要作用是启用Kafka相关的注解例如 KafkaListener。当你在Spring Boot应用程序中使用 EnableKafka 注解时Spring会扫描你的项目找到所有标注了 KafkaListener 的方法并创建相应的Kafka监听器容器来处理Kafka消息。
4. 配置监听器
/*** 同主题配置不同的消费组测试每个消费者都可以获取到相同的消息* 同主题一个分区只能被多个相同消费组中的一个消费者消费*/
Service
public class EventConsumer {Value(${kafka.topic})public String topic;Value(${kafka.group-2})public String groupId;/*** 写法 1指定具体消费者 topic,消费者组这里消费者组可以覆盖消费者工厂中配置的消费者组** param event*/KafkaListener(topics events, groupId events-group-1)public void consume(Event event) {System.out.println(Consumed event: event);}/*** 写法 2使用 SpELSpring Expression Language进行动态属性解析* Spring 使用 SpEL 来解析注解中的表达式。在 Spring Kafka 中* __listener 提供了一种方式来引用当前监听器 bean 的上下文从而可以动态地访问和配置监听器的属性。* * param message*/KafkaListener(topics #{__listener.topic}, groupId #{__listener.groupId})public void consume2(Event message) {System.out.println(Consumed message: message);}/*** 写法 3指定不同的消费者组** param event*/KafkaListener(topics ${kafka.topic}, groupId ${kafka.group-3})public void consume3(Event event) {System.out.println(Consumer group-3 consumed event: event);}/*** 写法 4* 1. 设置 ID 方便启动、和停止消费者* 2.可以指定 autoStartup 属性为 false手动启动消费者,默认情况下是 true* param event*/KafkaListener(id eventListener, topics events_test, groupId default, autoStartup true)public void consume5(Event event) {System.out.println([event-listener] consumed event: event);}
}
注意 __listener 特殊引用 __listener 是一个指向当前 Kafka 监听器的特殊 bean 引用。 通过使用 #{__listener.propertyName} 的方式可以访问当前监听器 bean 的属性和方法。
以上不同的写法都可以根据需要可以直接写消费者相关信息也可以通过配置文件变量等方式获取选择适合自己的即可。一般用配置变量的方式。
启动 springboot项目然后发送消息即可查看消息是否已经消费到了。
三、如何控制消费者启动和暂停
上文中我们可以在消费者的监听中加入autoStartup 来控制监听器是否在启动的时候开始进行消费监听当然这种方式不够灵活我们可以写一个接口在需要的时候进行启动和暂停以下是个示例。
编写控制器 import com.tan.kafka.service.KafkaListenerControlService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;/*** Description 控制消费者是否启动消费或者暂停* Author zhengqiang.tan* Date 2024/6/16 09:39*/
RestController
RequestMapping(/kafka)
public class KafkaController {Autowiredprivate KafkaListenerControlService kafkaListenerControlService;PostMapping(/pause)public String pause() {kafkaListenerControlService.pauseListener();return Paused;}PostMapping(/resume)public String resume() {kafkaListenerControlService.resumeListener();return Resumed;}
}编写服务类 /*** Kafka监听器控制服务*/
Service
Slf4j
public class KafkaListenerControlService {Autowiredprivate KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;PostConstructpublic void checkListeners() {log.info(KafkaListenerControlService.checkListeners);for (String listenerId : kafkaListenerEndpointRegistry.getListenerContainerIds()) {MessageListenerContainer container kafkaListenerEndpointRegistry.getListenerContainer(listenerId);if (container ! null) {System.out.println(Registered Kafka listener: listenerId);} else {System.out.println(No Kafka listener found with id: listenerId);}}}/*** 暂停指定名称的Kafka监听器。* p* 本方法旨在提供一种方式来暂停应用程序中特定的Kafka监听器。这可能在需要临时停止处理新消息* 例如进行系统维护或升级时的场景下非常有用。* 这里使用 eventListener 仅用于测试。** see KafkaListenerEndpointRegistry#getListenerContainer(String) 用于获取指定名称的监听器容器* see MessageListenerContainer#pause() 用于暂停监听器容器使其停止接收和处理新消息*/public void pauseListener() {// 通过监听器端点注册表获取名为eventListener的监听器容器MessageListenerContainer container kafkaListenerEndpointRegistry.getListenerContainer(eventListener);// 检查容器是否为空如果非空则调用其pause方法暂停监听器if (container ! null) {container.pause();log.info(Paused Kafka listener,id {}, container.getListenerId());}}/*** 恢复指定名称的Kafka监听器。* 此方法用于暂停后的监听器重新启动以便它可以继续处理消息。* 它首先从注册表中获取名为eventListener的监听器容器* 然后检查容器是否为空。如果容器存在则调用其resume方法来恢复监听器的运行。* 最后记录一条信息表明监听器已恢复。** see KafkaListenerEndpointRegistry#getListenerContainer(String)* see MessageListenerContainer#resume()*/public void resumeListener() {// 从注册表中获取名为eventListener的监听器容器MessageListenerContainer container kafkaListenerEndpointRegistry.getListenerContainer(eventListener);// 检查容器是否存在如果存在则恢复容器的运行if (container ! null) {container.resume();// 记录恢复操作的信息包括监听器的IDlog.info(Resumed Kafka listener,id {}, container.getListenerId());}}
}
通过以上方式实测中符合预期先暂停然后发送消息后未进行消费等到 resume之后继续消费。 如下所示
curl --location --request POST http://localhost:8080/kafka/resume \
--header User-Agent: Apifox/1.0.0 (https://apifox.com) \
--header Content-Type: application/json \
--header Accept: */* \
--header Host: localhost:8080 \
--header Connection: keep-alive \
--data-raw 四、如何保证消费者启停过程的线程安全问题
如果pauseListener()方法被多个线程同时访问可能会出现竞态条件或导致未定义的行为。尽管在这个特定的代码片段中不容易直接判断线程安全性问题。因此我在这里进行了完善确保多线程并发访问下的安全性以上写法太过繁琐可以参考如下代码方式。
Slf4j
RestController
RequestMapping(/consumer)
public class KafkaController2 {private final ReentrantLock lock new ReentrantLock();Autowiredprivate KafkaListenerEndpointRegistry registry;/*** 开启消费者监听线程安全版本* param listenerName*/GetMapping(/start/{listenerName})public void start(PathVariable String listenerName) {try {lock.lock();if (!isListenerRunning(listenerName)) {log.info(start {} ... , listenerName);startListener(listenerName);}resumeListener(listenerName);log.info(resume {} is done. , listenerName);} finally {lock.unlock();}}/*** 暂停指定名称的监听器。这有助于提高代码的灵活性和可维护性* 允许在不修改代码的情况下暂停不同的监听器。** param listenerName 监听器的名称。*/GetMapping(/pause/{listenerName})public void pauseListener(PathVariable String listenerName) {if (registry null) {log.error(Registry is not available.);return;}try {MessageListenerContainer listenerContainer registry.getListenerContainer(listenerName);if (listenerContainer null) {log.warn(No listener container found with name: listenerName);return;}log.info(Attempting to pause listener: listenerName);listenerContainer.pause();log.info(listenerName pause is done.);} catch (Exception e) {log.error(Failed to pause listener listenerName : e.getMessage(), e);}}Asyncprotected void startListener(String listenerName) {try {Objects.requireNonNull(registry.getListenerContainer(listenerName)).start();} catch (Exception e) {log.error(Failed to start the eventListener, e);}}Asyncprotected void resumeListener(String listenerName) {try {Objects.requireNonNull(registry.getListenerContainer(listenerName)).resume();} catch (Exception e) {log.error(Failed to resume the eventListener, e);}}private boolean isListenerRunning(String listenerName) {try {return Objects.requireNonNull(registry.getListenerContainer(listenerName)).isRunning();} catch (Exception e) {log.error(Error checking if the eventListener is running, e);return false;}}
}
最终我在测试完后结果是符合预期的先暂停然后持续发送消息观察有无收到消息然后在重新启动监听即可。
curl --location --request GET http://localhost:8080/consumer/pause/eventListener 其他
批量消息发送脚本以下是个参考
#!/bin/bash
cd /Users/mac/apps/kafka
# 主题名称
TOPICevents_test# Kafka 主机和端口
BROKERlocalhost:9092# 生产随机消息的数量只需修改这里进行控制发送消息数量
NUM_MESSAGES1# 生成一个 5 位随机数
generate_random_number() {RANDOM_NUMBER$(shuf -i 10000-99999 -n 1)echo $RANDOM_NUMBER
}# 生成随机消息并发送到 Kafka 主题
for i in $(seq 1 $NUM_MESSAGES); doID$(date %s) # 生成Long类型的唯一IDMSG_ID$(uuidgen)MESSAGERandom message $(generate_random_number)JSON_MESSAGE{\id\:\$ID\, \msgId\:\$MSG_ID\, \message\:\$MESSAGE\}echo $JSON_MESSAGE | bin/kafka-console-producer.sh --broker-list $BROKER --topic $TOPIC
doneecho Produced $NUM_MESSAGES JSON messages to the topic $TOPIC.总结
以上内容介绍了使用 spring和 kafka集成进行消费的使用过程对消费者的配置和以及对消费者的启动和暂停实践详细介绍了配置各个步骤。不足之处未提及异常处理批量消费消费者动态扩缩提升消费能力的问题这些后面在陆续补充。
如果觉得有用不妨点赞关注一波下期再会