当前位置: 首页 > news >正文

网站的建设与维护网络与智能媒体设计 干什么?

网站的建设与维护,网络与智能媒体设计 干什么?,外国 网站模板,四川网站seo1. 问题引入 消息从发送#xff0c;到消费者接收#xff0c;会经理多个过程#xff1a; 其中的每一步都可能导致消息丢失#xff0c;常见的丢失原因包括#xff1a; 发送时丢失#xff1a; 生产者发送的消息未送达exchange消息到达exchange后未到达queue MQ宕机…1. 问题引入 消息从发送到消费者接收会经理多个过程 其中的每一步都可能导致消息丢失常见的丢失原因包括 发送时丢失 生产者发送的消息未送达exchange消息到达exchange后未到达queue MQ宕机queue将消息丢失consumer接收到消息后未消费就宕机 针对这些问题RabbitMQ分别给出了解决方案 生产者确认机制mq持久化消费者确认机制失败重试机制 2. 生产者消息确认 RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。这种机制必须给每个消息指定一个唯一ID。消息发送到MQ以后会返回一个结果给发送者表示消息是否处理成功。 返回结果有两种方式 publisher-confirm发送者确认 消息成功投递到交换机返回ack消息未投递到交换机返回nack publisher-return发送者回执 消息投递到交换机了但是没有路由到队列。返回ACK及路由失败原因。 注意 2.1 修改配置 首先修改publisher服务中的application.yml文件添加下面的内容 spring:rabbitmq:publisher-confirm-type: correlatedpublisher-returns: truetemplate:mandatory: true 说明 publish-confirm-type开启publisher-confirm这里支持两种类型 simple同步等待confirm结果直到超时correlated异步回调定义ConfirmCallbackMQ返回结果时会回调这个ConfirmCallback publish-returns开启publish-return功能同样是基于callback机制不过是定义ReturnCallbacktemplate.mandatory定义消息路由失败时的策略。true则调用ReturnCallbackfalse则直接丢弃消息 2.2 定义Return回调 每个RabbitTemplate只能配置一个ReturnCallback因此需要在项目加载时配置 修改publisher服务添加一个 import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.annotation.Configuration;Slf4j Configuration public class CommonConfig implements ApplicationContextAware {Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {// 获取RabbitTemplateRabbitTemplate rabbitTemplate applicationContext.getBean(RabbitTemplate.class);// 设置ReturnCallbackrabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) - {// 投递失败记录日志log.info(消息发送失败应答码{}原因{}交换机{}路由键{},消息{},replyCode, replyText, exchange, routingKey, message.toString());// 如果有业务需要可以重发消息});} }2.3 定义ConfirmCallback ConfirmCallback可以在发送消息时指定因为每个业务处理confirm成功或失败的逻辑不一定相同。 在publisher服务的测试类中定义一个单元测试方法 public void testSendMessage2SimpleQueue() throws InterruptedException {// 1.消息体String message hello, spring amqp!;// 2.全局唯一的消息ID需要封装到CorrelationData中CorrelationData correlationData new CorrelationData(UUID.randomUUID().toString());// 3.添加callbackcorrelationData.getFuture().addCallback(result - {if(result.isAck()){// 3.1.ack消息成功log.debug(消息发送成功, ID:{}, correlationData.getId());}else{// 3.2.nack消息失败log.error(消息发送失败, ID:{}, 原因{},correlationData.getId(), result.getReason());}},ex - log.error(消息发送异常, ID:{}, 原因{},correlationData.getId(),ex.getMessage()));// 4.发送消息rabbitTemplate.convertAndSend(task.direct, task, message, correlationData);// 休眠一会儿等待ack回执Thread.sleep(2000); }3. 消息持久化 生产者确认可以确保消息投递到RabbitMQ的队列中但是消息发送到RabbitMQ以后如果突然宕机也可能导致消息丢失。 要想确保消息在RabbitMQ中安全保存必须开启消息持久化机制。 交换机持久化队列持久化消息持久化 3.1 交换机持久化 RabbitMQ中交换机默认是非持久化的mq重启后就丢失。 SpringAMQP中可以通过代码指定交换机持久化 Bean public DirectExchange simpleExchange(){// 三个参数交换机名称、是否持久化、当没有queue与其绑定时是否自动删除return new DirectExchange(simple.direct, true, false); }事实上默认情况下由SpringAMQP声明的交换机都是持久化的。 可以在RabbitMQ控制台看到持久化的交换机都会带上D的标示 3.2 队列持久化 RabbitMQ中队列默认是非持久化的mq重启后就丢失。 SpringAMQP中可以通过代码指定交换机持久化 Bean public Queue simpleQueue(){// 使用QueueBuilder构建队列durable就是持久化的return QueueBuilder.durable(simple.queue).build(); }事实上默认情况下由SpringAMQP声明的队列都是持久化的。 可以在RabbitMQ控制台看到持久化的队列都会带上D的标示 3.3 消息持久化 利用SpringAMQP发送消息时可以设置消息的属性MessageProperties指定delivery-mode 1非持久化2持久化 用Java代码指定 默认情况下SpringAMQP发出的任何消息都是持久化的不用特意指定。 4. 消费者消息确认 RabbitMQ是阅后即焚机制RabbitMQ确认消息被消费者消费后会立刻删除。 而RabbitMQ是通过消费者回执来确认消费者是否成功处理消息的消费者获取消息后应该向RabbitMQ发送ACK回执表明自己已经处理消息。 设想这样的场景 1RabbitMQ投递消息给消费者2消费者获取消息后返回ACK给RabbitMQ3RabbitMQ删除消息4消费者宕机消息尚未处理 这样消息就丢失了。因此消费者返回ACK的时机非常重要。 而SpringAMQP则允许配置三种确认模式 •manual手动ack需要在业务代码结束后调用api发送ack。 •auto自动ack由spring监测listener代码是否出现异常没有异常则返回ack抛出异常则返回nack •none关闭ackMQ假定消费者获取消息后会成功处理因此消息投递后立即被删除 由此可知 none模式下消息投递是不可靠的可能丢失auto模式类似事务机制出现异常时返回nack消息回滚到mq没有异常返回ackmanual自己根据业务情况判断什么时候该ack 一般我们都是使用默认的auto即可。 4.1 none模式 修改consumer服务的application.yml文件添加下面内容 spring:rabbitmq:listener:simple:acknowledge-mode: none # 关闭ack修改consumer服务的SpringRabbitListener类中的方法模拟一个消息处理异常 RabbitListener(queues simple.queue) public void listenSimpleQueue(String msg) {log.info(消费者接收到simple.queue的消息【{}】, msg);// 模拟异常System.out.println(1 / 0);log.debug(消息处理完成); }测试可以发现当消息处理抛异常时消息依然被RabbitMQ删除了。 4.2 auto模式 再次把确认机制修改为auto: spring:rabbitmq:listener:simple:acknowledge-mode: auto # 关闭ack在异常位置打断点再次发送消息程序卡在断点时可以发现此时消息状态为unack未确定状态 抛出异常后因为Spring会自动返回nack所以消息恢复至Ready状态并且没有被RabbitMQ删除 5. 消费失败重试机制 当消费者出现异常后消息会不断requeue重入队到队列再重新发送给消费者然后再次异常再次requeue无限循环导致mq的消息处理飙升带来不必要的压力 怎么办呢 5.1 本地重试 我们可以利用Spring的retry机制在消费者出现异常时利用本地重试而不是无限制的requeue到mq队列。 修改consumer服务的application.yml文件添加内容 spring:rabbitmq:listener:simple:retry:enabled: true # 开启消费者失败重试initial-interval: 1000 # 初识的失败等待时长为1秒multiplier: 1 # 失败的等待时长倍数下次等待时长 multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态false有状态。如果业务中包含事务这里改为false重启consumer服务重复之前的测试。可以发现 在重试3次后SpringAMQP会抛出异常AmqpRejectAndDontRequeueException说明本地重试触发了查看RabbitMQ控制台发现消息被删除了说明最后SpringAMQP返回的是ackmq删除消息了 结论 开启本地重试时消息处理过程中抛出异常不会requeue到队列而是在消费者本地重试重试达到最大次数后Spring会返回ack消息会被丢弃 5.2 失败策略 在之前的测试中达到最大重试次数后消息会被丢弃这是由Spring内部机制决定的。 在开启重试模式后重试次数耗尽如果消息依然失败则需要有MessageRecovery接口来处理它包含三种不同的实现 RejectAndDontRequeueRecoverer重试耗尽后直接reject丢弃消息。默认就是这种方式 ImmediateRequeueMessageRecoverer重试耗尽后返回nack消息重新入队 RepublishMessageRecoverer重试耗尽后将失败消息投递到指定的交换机 比较优雅的一种处理方案是RepublishMessageRecoverer失败后将消息投递到一个指定的专门存放异常消息的队列后续由人工集中处理。 1在consumer服务中定义处理失败消息的交换机和队列 Bean public DirectExchange errorMessageExchange(){return new DirectExchange(error.direct); } Bean public Queue errorQueue(){return new Queue(error.queue, true); } Bean public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with(error); }2定义一个RepublishMessageRecoverer关联队列和交换机 Bean public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, error.direct, error); }完整代码 import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.retry.MessageRecoverer; import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer; import org.springframework.context.annotation.Bean;Configuration public class ErrorMessageConfig {Beanpublic DirectExchange errorMessageExchange(){return new DirectExchange(error.direct);}Beanpublic Queue errorQueue(){return new Queue(error.queue, true);}Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with(error);}Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, error.direct, error);} }6. 总结 如何确保RabbitMQ消息的可靠性 开启生产者确认机制确保生产者的消息能到达队列开启持久化功能确保消息未消费前在队列中不会丢失开启消费者确认机制为auto由spring确认消息处理成功后完成ack开启消费者失败重试机制并设置MessageRecoverer多次重试失败后将消息投递到异常交换机交由人工处理
http://www.tj-hxxt.cn/news/229097.html

相关文章:

  • 电子商务网站开发基本流程图北京网站设计开发公司
  • 视频教学互动网站建设企业展示设计公司
  • 微网站开发制作微信多账号管理系统
  • 南宁建站免费模板绍兴网站建设专业的公司4000-262-
  • 建设官方网站请示建设银行网站如何查询开户行
  • 证书兼职的正规平台哪里有网站创建设计SEO优化象客
  • 教人做辐射4mod的网站wordpress跟换域名
  • 禅城建网站哪个网站跨境电商做的最好
  • 网站设计跟网页制作网站建设启动大会
  • 门户网站时代网站手机客户端如何开发
  • 个人注册域名可以做网站么如何免费网络营销推广
  • 网站的优化策略南宁网站建设云尚网络
  • 大连营销型网站建设crm管理系统架构
  • 怎样到国外做合法网站法网站wordpress 最新教程视频
  • 做视频网站需要流媒体吗微信公众号制作平台
  • 阳信网站建设免费企业wordpress主题
  • aspx网站架设教程有口碑的坪山网站建设
  • 西部数码网站管理助手搭建织梦seo网络搜索引擎优化
  • 免费word文档模板下载网站天津网站优化流程
  • 佛山做网站-准度科技公司旅游网站改版方案
  • 专用车网站建设哪家好国内虚拟主机WordPress
  • 专门做视频的网站吗网站seo优化总结
  • 珍爱网建设网站的目的搜索引擎营销方法
  • 实实通信的视频网站怎么做uc网页浏览器网页版
  • 个人网站建设培训wordpress id开发者
  • 有了云服务器怎么建设网站中国工商黄页
  • 常用的网站建设技术有什么软件坦桑尼亚网站域名后缀
  • 如何免费建立官方网站正规淘宝店铺交易平台
  • 网站可以给pdf做笔记名者观看网站
  • 郑州网站建设伟置做网站资料准备