苏州网站关键词优化推广,沈阳不孕不育医院前三名,深圳猎头公司,高端网页设计欣赏科技感前言
在项目中#xff0c;引入了RabbitMQ这一中间件#xff0c;必然也需要在业务中增加对数据安全性的一层考虑#xff0c;来保证RabbitMQ消息的可靠性#xff0c;否则一个个消息丢失可能导致整个业务的数据出现不一致等问题#xff0c;对系统带来巨大的影响#xff0c;…前言
在项目中引入了RabbitMQ这一中间件必然也需要在业务中增加对数据安全性的一层考虑来保证RabbitMQ消息的可靠性否则一个个消息丢失可能导致整个业务的数据出现不一致等问题对系统带来巨大的影响消息的可靠性可以主要在三个方面去考虑生产者消息确认消费者消息确认消息持久化这篇文件说明生产者消息确认的。 一、消息确认流程图 由图可知消息确认是分为生产者确认和消费者确认的生产者和MQ之间的消息确认机制为生产者消息确认MQ和消费者之间的消息确认机制为消费者消息确认
消息丢失的情景有三种情况
发送消息过程中出现网络问题生产者以为发送成功但MQ没有收到需要生产者消息确认接收到消息后由于MQ服务器宕机或重启等原因消息默认存在内存中导致消息丢失需要消息持久化消费者接收到消息后处理消息出错没有完成消息的处理但是自动返回ack这时候需要开启手动确认模式消费者消息确认 二、生产者消息确认
RabbitMQ提供了publisher confirm机制来避免消息投递到MQ过程中丢失。这种机制下每个message都必须要有一个独一无二的ID来区分开不同的消息避免ack消息确认参数冲突。每当消息发送到MQ成功后MQ都会返回一个结果给生产者以保证生产者消息确认。在生产者消息确认时又有两种返回结果方式通常两个都要实现来确保消息投递可靠性分别为publisher-confirm和publisher-return以下作出说明。
1、publisher-confirm发送者确认
消息成功投递到交换机返回ack
消息未投递到交换机返回nack
2、publisher-return发送者回执
消息投递到交换机了但是没有路由到队列。返回ACK及路由失败原因。 三、代码实现
1、配置文件
spring:rabbitmq:host: localhostport: 5672username: guestpassword: guest#确认消息已发送到交换机(Exchange)publisher-confirm-type: correlated#确认消息已发送到队列(Queue)publisher-returns: true
publish-confirm-type有三个值
none禁用发布确认模式是默认值simple同步等待confirm结果直到超时correlated异步回调定义ConfirmCallbackMQ返回结果时会回调这个ConfirmCallback
publisher-returns开启消息失败回调回调函数ReturnCallback
2、配置ConfirmCallback函数和ReturnCallback函数
/*** 生产者消息回调配置类*/
Configuration
Slf4j
public class ProviderCallBackConfig {Resourceprivate CachingConnectionFactory cachingConnectionFactory;Beanpublic RabbitTemplate rabbitTemplate() {RabbitTemplate rabbitTemplate new RabbitTemplate(cachingConnectionFactory);// 当mandatory设置为true时若exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息//那么broker会调用basic.return方法将消息返还给生产者。// 当mandatory设置为false时出现上述情况broker会直接将消息丢弃。rabbitTemplate.setMandatory(true);/*** TODO RabbitMQ生产者发送消息确认回调解决消息可靠性问题* 消息确认回调确认消息是否到达broker* data消息唯一标识* ack确认结果* cause失败原因*/rabbitTemplate.setConfirmCallback((data, ack, cause) - {if (ack) {//消息发送成功后更新数据库消息状态等逻辑log.info(消息发送至exchange成功------消息唯一标识: {}, 确认状态: {}, 造成原因: {},data, ack, cause);} else {//信息发送失败打印日志后可以根据业务选择是否重发消息log.info(消息发送至exchange失败------消息唯一标识: {}, 确认状态: {}, 造成原因: {}, data, ack, cause);}});/*** TODO RabbitMQ生产者发送消息失败回调解决消息可靠性问题* message 消息* replyCode 回应码* replyText 回应信息* exchange 交换机* routingKey 路由键*/rabbitTemplate.setReturnsCallback((res) - {//若发送失败打印错误信息然后可以根据业务选择重发消息log.error(消息发送至queue失败--------res: {}, JSON.toJSONString(res));});return rabbitTemplate;}}
到这里生产者推送消息的消息确认调用回调函数已经完毕。 可以看到上面写了两个回调函数一个叫 ConfirmCallback 一个叫 RetrunCallback 那么以上这两种回调函数都是在什么情况会触发呢
先从总体的情况分析推送消息存在3种情况
①消息推送到server但是在server里找不到交换机 ②消息推送到server找到交换机了但是没找到队列 ③消息推送成功
①消息推送到server但是在server里找不到交换机 写个测试接口把消息推送到名为‘non-existent-exchange’的交换机上这个交换机是没有创建没有配置的
GetMapping(/testProviderMessageBack)ApiOperation(value 测试生产者消息回调)ApiOperationSupport(order 5)public String testProviderMessageBack() {CorrelationData data new CorrelationData();data.setId(111);rabbitTemplate.convertAndSend(non-existent-exchange, TestDirectRouting, 测试生产者消息回调,data);return ok;}
调用接口查看项目的控制台输出情况原因里面有说没有找到交换机non-existent-exchange
结论 ①这种情况触发的是 ConfirmCallback 回调函数 消息发送至exchange失败------ 消息唯一标识: CorrelationData [id111], 确认状态: false, 造成原因: channel error; protocol method: #methodchannel.close(reply-code404, reply-textNOT_FOUND - no exchange non-existent-exchange in vhost /, class-id60, method-id40) ②消息推送到server找到交换机了但是没找到队列 这种情况就是需要新增一个交换机但是不给这个交换机绑定队列我来简单地在DirectRabitConfig里面新增一个直连交换机名叫‘lonelyDirectExchange’但没给它做任何绑定配置操作 BeanDirectExchange lonelyDirectExchange() {return new DirectExchange(lonelyDirectExchange);}
然后写个测试接口把消息推送到名为‘lonelyDirectExchange’的交换机上这个交换机是没有任何队列配置的
GetMapping(/testProviderMessageBack2)ApiOperation(value 测试生产者消息回调2)ApiOperationSupport(order 6)public String testProviderMessageBack2() {CorrelationData data new CorrelationData();data.setId(222);rabbitTemplate.convertAndSend(lonelyDirectExchange, TestLonelyDirectRouting, 测试生产者消息回调2,data);return ok;} 消息发送至exchange成功------ 消息唯一标识: CorrelationData [id222], 确认状态: true, 造成原因: null 消息发送至queue失败-------- res: {exchange:lonelyDirectExchange,message:{body:5rWL6KV55Sf5Lqn6ICF5raI5oGv5Zue6LCDMg,messageProperties:{contentEncoding:UTF-8,contentLength:0,contentType:text/plain,deliveryTag:0,finalRetryForMessageWithNoId:false,headers:{spring_returned_message_correlation:222},lastInBatch:false,priority:0,projectionUsed:false,publishSequenceNumber:0,receivedDeliveryMode:PERSISTENT}},replyCode:312,replyText:NO_ROUTE,routingKey:TestLonelyDirectRouting} 这种情况下两个函数都被调用了
消息是推送成功到交换机了的所以ConfirmCallback对消息确认情况是true 而在RetrunCallback回调函数的打印参数里面可以看到在路由分发给队列的时候找不到队列所以报了错误 NO_ROUTE 。 结论②这种情况触发的是 ConfirmCallback和RetrunCallback两个回调函数。
③消息推送成功 那么测试下按照正常调用之前消息推送的接口就行就调用下 /sendDirectMessage接口可以看到控制台输出
结论这种情况触发的是 ConfirmCallback 回调函数。