国外做化学申报的网站,个人与公司网站备案,做企业网站项目,高端网站建设设计公司目录 一、简介1.1 背景1.2 定义1.3 如何查看确认/未确认的消息数#xff1f; 二、消息确认机制的分类2.1 消息发送确认1#xff09;ConfirmCallback方法2#xff09;ReturnCallback方法3#xff09;代码实现方式一#xff1a;统一配置a.配置类a.生产者c.消费者d.测试结果 … 目录 一、简介1.1 背景1.2 定义1.3 如何查看确认/未确认的消息数 二、消息确认机制的分类2.1 消息发送确认1ConfirmCallback方法2ReturnCallback方法3代码实现方式一统一配置a.配置类a.生产者c.消费者d.测试结果 4代码实现方式二单独配置 2.2 消息接收确认1basicAck() 方法2basicReject() 方法3basicNack() 方法4代码实现a.配置方式一代码配置b.配置方式二配置文件c.生产者d.消费者e.测试结果 当我们在项目中引入了新的中间件之后数据的风险性就要多一层考虑。那么RabbitMQ 的消息是怎么知道有没有被消费者消费的呢生产者又怎么确保自己发送成功了呢这些问题将在文章中进行解答。 一、简介
1.1 背景
在 MQ 中消费者和生产者并不直接进行通信生产者只负责把消息发送到队列消费者只负责从队列获取消息。
消费者从队列 获取到消息后这条消息就不在队列中了。如果此时消费者所在的信道 因为网络中断没有消费到那这条消息就 被永远地丢失了。所以我们希望等待消费者 成功消费掉这条消息之后再删除消息。生产者向交换机 发送消息后也 不能保证消息准确发送过去了消息就像 石沉大海 一样所以 发送消息也需要进行消息确认。
1.2 定义
为了保证消息从队列可靠地到达消费者RabbitMQ 提供了 消息确认机制Message Acknowledgement。
消费者在订阅队列时可以指定 autoAck 参数
autoAckfalseRabbitMQ 会 等待消费者显式地回复确认信号 后才从内存或磁盘中移除消息实际上时先打上删除标记之后再删除。autoAcktrueRabbitMQ 会 自动把发送出去的消息置为确认然后内存或磁盘中删除而 不管消费者是否真正地消费到了这些消息。
采用消息确认机制后只要设置 autoAck 参数为 false消费者就有足够的时间处理消息任务不用担心处理消息过程中消费者进程挂掉后消息丢失的问题因为 RabbitMQ 会一直等待持有消息知道消费者显式调用 Basic.Ack 命令为止。
对于 RabbitMQ 服务器端而言当 autoAck 参数为 false 时队列中的消息分成了两部分
一部分是 等待投递给消费者的消息另一部分是 已经投递给消费者但是还没有收到消费者确认信号的消息。 如果 RabbitMQ 服务器端 一直没有收到消费者的确认信息并且 消费此消息的消费者已经断开连接则服务器端会安排 该消息重新进入队列等待投递给下一个消费者也可能还是原来的那个消费者。
RabbitMQ 不会为未确认的消息设置过期时间它 判断此消息是否需要重新投递给消费者的唯一依据是该消息连接是否已经断开这个设计的原因是 RabbitMQ 允许消费者消费一条消息的时间可以很久很久。
1.3 如何查看确认/未确认的消息数
RabbitMQ 的 Web 管理平台上可以看到当前队列中的 “Ready” 状态和 “Unacknowledged” 状态的消息数
Read 状态 等待投递给消费者的消息数。Unacknowledged 状态 已经投递给消费者但是未收到确认信号的消息树。 二、消息确认机制的分类
RabbitMQ 消息确认机制分为两大类
消息发送确认又分为 生产者到交换机的确认交换机到队列的确认。 消息接收确认。
2.1 消息发送确认
RabbitMQ 的消息发送确认有两种实现方式ConfirmCallback 方法、ReturnCallback 方法。
1ConfirmCallback方法
ConfirmCallback 是一个回调接口用于确认消息否是到达交换机中。
配置方式
spring.rabbitmq.publisher-confirm-typecorrelated它有三个值
none禁用发布确认模式默认值。correlated发布消息成功到交换机后触发回调方法。simple经测试有两种效果一是和 correlated 一样会触发回调方法二是在发布消息成功后使用 rabbitTemplate 调用 waitForConfirm 或 waitForConfirmsOrDie方法等待 broker 节点返回发送结果根据返回结果来判定下一步的逻辑。要注意的是 waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel则接下来无法发送消息到 broker。
2ReturnCallback方法
ReturnCallback 也是一个回调接口用于确认消息是否在交换机中路由到了队列。
该方法可以不使用因为交换机和队列是在代码里面绑定的如果消息成功投递到 Broker 后几乎不存在绑定队列失败除非代码写错了。
配置方式
spring.rabbitmq.publisher-returnstrue3代码实现方式一统一配置
a.配置类
RabbitDirectConfig.java
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** p Title RabbitDirectConfig* p Description 直连交换机配置* Direct Exchange是RabbitMQ默认的交换机模式也是最简单的模式根据key全文匹配去寻找队列。** author ACGkaka* date 2023/1/12 15:09*/
Slf4j
Configuration
public class RabbitDirectConfig {public static final String DIRECT_EXCHANGE_NAME TEST_DIRECT_EXCHANGE;public static final String DIRECT_ROUTING_NAME TEST_DIRECT_ROUTING;public static final String DIRECT_QUEUE_NAME TEST_DIRECT_QUEUE;Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);// 设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数rabbitTemplate.setMandatory(true);//设置message序列化方法rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());// 设置消息发送到交换机(Exchange)回调rabbitTemplate.setConfirmCallback((correlationData, ack, cause) - {if (ack) {log.info(【INFO】消息发送到交换机(Exchange)成功, 相关数据: {}, correlationData);} else {log.error(【ERROR】消息发送到交换机(Exchange)失败, 错误原因: {}, 相关数据: {}, cause, correlationData);}});// 设置消息发送到队列(Queue)回调经测试只有失败才会调用rabbitTemplate.setReturnsCallback((returnedMessage) - {log.error(【ERROR】消息发送到队列(Queue)失败响应码: {}, 响应信息: {}, 交换机: {}, 路由键: {}, 消息内容: {},returnedMessage.getReplyCode(), returnedMessage.getReplyText(), returnedMessage.getExchange(), returnedMessage.getRoutingKey(), returnedMessage.getMessage());});return rabbitTemplate;}/*** 消息监听-反序列化*/Beanpublic RabbitListenerContainerFactory? rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setMessageConverter(new Jackson2JsonMessageConverter());return factory;}/*** 队列命名testDirectQueue** return 队列*/Beanpublic Queue testDirectQueue() {// durable:是否持久化默认是false持久化队列会被存储在磁盘上当消息代理重启时仍然存在暂存队列当前连接有效// exclusive:默认false只能被当前创建的连接使用而且当连接关闭后队列即被删除。此参考优先级高于durable。// autoDelete:是否自动删除当没有生产者或消费者使用此队列该队列会自动删除。// 一般设置一下队列的持久化就好其余两个默认falsereturn new Queue(DIRECT_QUEUE_NAME, true);}/*** Direct交换机命名testDirectExchange* return Direct交换机*/BeanDirectExchange testDirectExchange() {return new DirectExchange(DIRECT_EXCHANGE_NAME, true, false);}/*** 绑定 将队列和交换机绑定并设置用于匹配键testDirectRouting* return 绑定*/BeanBinding bindingDirect() {return BindingBuilder.bind(testDirectQueue()).to(testDirectExchange()).with(DIRECT_ROUTING_NAME);}
}a.生产者
SendMessageController.java
import com.demo.config.RabbitDirectConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;/*** p Title SendMessageController* p Description 推送消息接口** author ACGkaka* date 2023/1/12 15:23*/
Slf4j
RestController
public class SendMessageController {/*** 使用 RabbitTemplate这提供了接收/发送等方法。*/Autowiredprivate RabbitTemplate rabbitTemplate;GetMapping(/sendDirectMessage)public String sendDirectMessage() {String messageId String.valueOf(UUID.randomUUID());String messageData Hello world.;String createTime LocalDateTime.now().format(DateTimeFormatter.ofPattern(yyyy-MM-dd HH:mm:ss));MapString, Object map new HashMap();map.put(messageId, messageId);map.put(messageData, messageData);map.put(createTime, createTime);// 将消息携带绑定键值TEST_DIRECT_ROUTING发送到交换机TEST_DIRECT_EXCHANGErabbitTemplate.convertAndSend(RabbitDirectConfig.DIRECT_EXCHANGE_NAME, RabbitDirectConfig.DIRECT_ROUTING_NAME, map);return OK;}}c.消费者
DirectReceiver.java
import com.demo.config.RabbitDirectConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Map;/*** p Title DirectReceiver* p Description 直连交换机监听类** author ACGkaka* date 2023/1/12 15:59*/
Slf4j
Component
public class DirectReceiver {RabbitListener(queues RabbitDirectConfig.DIRECT_QUEUE_NAME)public void process(MapString, Object testMessage) {System.out.println(DirectReceiver消费者收到消息 testMessage.toString());}}d.测试结果
成功发送时执行结果 交换机错误时执行结果 路由键错误时执行结果 4代码实现方式二单独配置
除了在配置类里面统一设置回调方法外还可以在每次推送消息到队列时手动使用 CorrelationData 指定回调方法。
GetMapping(/sendDirectMessage2)
public String sendDirectMessage2() {String messageId String.valueOf(UUID.randomUUID());String messageData Hello world.;String createTime LocalDateTime.now().format(DateTimeFormatter.ofPattern(yyyy-MM-dd HH:mm:ss));MapString, Object map new HashMap();map.put(messageId, messageId);map.put(messageData, messageData);map.put(createTime, createTime);//生成唯一标识CorrelationData correlationData new CorrelationData(messageId);//不管成功失败都会调用confirm或者throwable,这是异步调用correlationData.getFuture().addCallback(confirm - {// 设置消息发送到交换机(Exchange)回调if (confirm ! null confirm.isAck()) {log.info(【INFO】发送成功ACKmsgId: {}, message: {}, correlationData.getId(), map);} else {log.error(【ERROR】发送失败NACKmsgId: {}, message: {}, correlationData.getId(), map);}},throwable - {//发生错误链接mq异常mq未打开等...报错回调System.out.println(发送失败throwable throwable , id: correlationData.getId());});// 将消息携带绑定键值TEST_DIRECT_ROUTING发送到交换机TEST_DIRECT_EXCHANGErabbitTemplate.convertAndSend(RabbitDirectConfig.DIRECT_EXCHANGE_NAME, RabbitDirectConfig.DIRECT_ROUTING_NAME, map, correlationData);return OK;
}2.2 消息接收确认
消费者确认发生在 监听队列的消费者处理业务失败如发生了异常、不符合要求的数据等。这些场景就 需要我们手动处理消息比如重新发送消息或者丢弃消息。
RabbitMQ 的 消息确认机制ACK 默认是自动确认的。自动确认会 在消息发送给消费者后立即确认但 存在丢失消息的可能。如果消费端消费逻辑抛出了异常假如我们使用了事务的回滚也只是保证了数据的一致性消息还是丢失了。也就是消费端没有处理成功这条消息那么就相当于丢失了消息。
消息的确认模式有三种
AcknowledgeMode.NONE自动确认。默认AcknowledgeMode.AUTO根据情况确认。AcknowledgeMode.MANUAL手动确认。推荐
消费者收到消息后手动调用 Channel 的 basicAck()/basicReject()/basicNack() 方法后RabbitMQ 收到消息后才认为本次投递完成。
basicAck()用于确认当前消息。basicReject()用于拒绝当前消息可以自定义是否重回队列。basicNack()用于批量拒绝消息这是 AMPQ 0-9-1 的 RabbitMQ 扩展。
1basicAck() 方法
basicAck() 方法 用于确认当前消息Channel 类中的方法定义如下
void basicAck(long deliveryTag, boolean multiple) throws IOException;参数说明
long deliveryTag 当一个消费者向 RabbitMQ 注册后会建立起一个 Channel。RabbitMQ 会用 basic.deliver 方法向消费者推送消息这个方法携带了一个 deliveryTag它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识ID是一个单调递增的正整数deliveryTag 的范围仅限于当前 Channel。boolean multiple 是否批处理一般为 false当该参数为 true 时则可以一次性确认 deliveryTag 小于等于传入值的所有消息。
2basicReject() 方法
basicReject() 方法 用于明确拒绝当前的消息。RabbitMQ 在 2.0.0 版本开始引入Channel 类中的方法定义如下
void basicReject(long deliveryTag, boolean requeue) throws IOException;参数说明
long deliveryTag 当一个消费者向 RabbitMQ 注册后会建立起一个 Channel。RabbitMQ 会用 basic.deliver 方法向消费者推送消息这个方法携带了一个 deliveryTag它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识ID是一个单调递增的正整数deliveryTag 的范围仅限于当前 Channel。boolean requeue 是否重新放回队列。 如果参数为 true则 RabbitMQ 会重新将这条消息存入队列以便发送给下一个订阅的消费者。如果参数为 false则 RabbitMQ 会立即把消息从队列中移除不会把它发送给新的消费者。
3basicNack() 方法
basicNack() 方法 用于批量拒绝消息。由于 basicReject() 方法一次只能拒绝一条消息如果想批量拒绝消息则可以使用 basicNack() 方法。Channel 类中的方法定义如下
参数说明
long deliveryTag 当一个消费者向 RabbitMQ 注册后会建立起一个 Channel。RabbitMQ 会用 basic.deliver 方法向消费者推送消息这个方法携带了一个 deliveryTag它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识ID是一个单调递增的正整数deliveryTag 的范围仅限于当前 Channel。boolean multiple 是否批处理一般为 false当该参数为 true 时则可以一次性确认 deliveryTag 小于等于传入值的所有消息。boolean requeue 是否重新放回队列。 如果参数为 true则 RabbitMQ 会重新将这条消息存入队列以便发送给下一个订阅的消费者。如果参数为 false则 RabbitMQ 会立即把消息从队列中移除不会把它发送给新的消费者。
4代码实现
a.配置方式一代码配置
如果我们之前配置了 Jackson2JsonMessageConverter.java 的序列化方式那么我们可以接着指定消费方的消息确认模式为 AcknowledgeMode.MANUL。
/*** 消息监听配置*/
Bean
public RabbitListenerContainerFactory? rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory new SimpleRabbitListenerContainerFactory();// 设置连接工厂factory.setConnectionFactory(connectionFactory);// 设置消息确认模式factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);// 设置反序列化factory.setMessageConverter(new Jackson2JsonMessageConverter());return factory;
}b.配置方式二配置文件
我们可以直接在 application.yml 中进行如下配置
# 确认模式默认auto自动确认manual手动确认
spring.rabbitmq.listener.simple.acknowledge-modemanual注意 yaml中指定的是消费端容器的默认配置如果我们在代码中有自定义注入 RabbitListenerContainerFactory 示例之后还需要使用默认配置需要在代码中进行设置如下所示 Autowired
private SimpleRabbitListenerContainerFactoryConfigurer configurer;/*** 消息监听配置*/
Bean
public RabbitListenerContainerFactory? rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory new SimpleRabbitListenerContainerFactory();// 设置连接工厂factory.setConnectionFactory(connectionFactory);// 采用yaml中的配置configurer.configure(factory, connectionFactory);// 设置反序列化factory.setMessageConverter(new Jackson2JsonMessageConverter());return factory;
}c.生产者
SendMessageController.java
import com.demo.config.RabbitDirectConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;/*** p Title SendMessageController* p Description 推送消息接口** author ACGkaka* date 2023/1/12 15:23*/
Slf4j
RestController
public class SendMessageController {/*** 使用 RabbitTemplate这提供了接收/发送等方法。*/Autowiredprivate RabbitTemplate rabbitTemplate;GetMapping(/sendDirectMessage)public String sendDirectMessage() {String messageId String.valueOf(UUID.randomUUID());String messageData Hello world.;String createTime LocalDateTime.now().format(DateTimeFormatter.ofPattern(yyyy-MM-dd HH:mm:ss));MapString, Object map new HashMap();map.put(messageId, messageId);map.put(messageData, messageData);map.put(createTime, createTime);// 将消息携带绑定键值TEST_DIRECT_ROUTING发送到交换机TEST_DIRECT_EXCHANGErabbitTemplate.convertAndSend(RabbitDirectConfig.DIRECT_EXCHANGE_NAME, RabbitDirectConfig.DIRECT_ROUTING_NAME, map);return OK;}}d.消费者
DirectReceiver.java
import com.demo.config.RabbitDirectConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.util.Map;/*** p Title DirectReceiver* p Description 直连交换机监听类** author ACGkaka* date 2023/1/12 15:59*/
Slf4j
Component
public class DirectReceiver {RabbitListener(queues RabbitDirectConfig.DIRECT_QUEUE_NAME)public void process(MapString, Object testMessage, Message message, Channel channel) throws IOException {try {log.info(DirectReceiver消费者收到消息: {}, testMessage.toString());// 手动答应消费完成从队列中删除该消息channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);} catch (Exception e) {log.error(DirectReceiver消费者消费失败原因: {}, e.getMessage(), e);// 手动答应消费完成从队列中删除该消息不重回队列channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);}}}e.测试结果
场景一消费者进行手动确认生产者推送2条消息
可以看到生产者推送2条消息后立马被消费了。 场景二消费者不进行手动确认生产者推送2条消息
虽然消费者消费完毕但是由于没有进行手动确认所以2条消息会一直处于 Unacked 状态直到消费者下线。 关闭 SpringBoot 程序消费者下线后消息由 Unacked 状态转为 Ready 状态等待下一个消费者上线后重新进行消费。 整理完毕完结撒花~ 参考地址
1.RabbitMQ(4):消息确认机制详解https://juejin.cn/post/7029232312197840904
2.RabbitMQ消息确认机制ACKhttps://blog.csdn.net/pan_junbiao/article/details/112956537
3.RabbitMQ高级https://blog.csdn.net/hnhroot/article/details/125921527
4.关于rabbitMQ在yml配置手动ack不生效重复答应的问题https://blog.csdn.net/love_Saber_Archer/article/details/109111088