当前位置: 首页 > news >正文

wordpress手机网站插件十大正规兼职平台

wordpress手机网站插件,十大正规兼职平台,网站轮播怎么做,网站上那些兼职网页怎么做文章目录 前言#xff1a;消息的可靠性问题一、生产者消息的确认1.1 生产者确认机制1.2 实现生产者消息的确认1.3 验证生产者消息的确认 二、消息的持久化2.1 演示消息的丢失2.2 声明持久化的交换机和队列2.3 发送持久化的消息 三、消费者消息的确认3.1 配置消费者消息确认3.2… 文章目录 前言消息的可靠性问题一、生产者消息的确认1.1 生产者确认机制1.2 实现生产者消息的确认1.3 验证生产者消息的确认 二、消息的持久化2.1 演示消息的丢失2.2 声明持久化的交换机和队列2.3 发送持久化的消息 三、消费者消息的确认3.1 配置消费者消息确认3.2 演示 none 模式3.3 演示 auto 模式 四、消息消费失败的重试机制4.1 本地重试机制4.2 失败消息的处理策略 前言消息的可靠性问题 在现代分布式应用程序中消息队列扮演了至关重要的角色允许系统中的各个组件之间进行异步通信。这种通信模式提供了高度的灵活性和可伸缩性但也引入了一系列的挑战其中最重要的之一是消息的可靠性。 首先让我们来了解一下在消息队列中消息从生产者发送到交换机再到队列最后到消费者有哪些情况会导致消息的丢失 发送时丢失 生产者发送的消息未送达交换机消息到达交换机后未到达队列 MQ 宕机队列中的消息会丢失 消费者接收到消息后未消费就宕机了。 确保消息队列的可靠性是分布式系统中不可或缺的一部分因此我们需要采取措施来应对这些挑战。为了解决上述消息可靠性问题RabbitMQ提供了一系列的机制和最佳实践以确保消息在整个传递过程中得到妥善处理和保护。 本文将深入探讨如何应对这些挑战介绍消息队列中的关键概念并详细讨论 RabbitMQ 提供的解决方案包括生产者消息的确认、消息的持久化、消费者消息的确认以及消息消费失败的重试机制。这些措施将有助于确保消息队列在应用程序中的可靠性和稳定性。 一、生产者消息的确认 1.1 生产者确认机制 RabbitMQ 提供了 publisher confirm 机制这是一种用于解决消息发送过程中可能出现的丢失问题的机制。当消息发送到 RabbitMQ 后系统会返回一个结果给消息的发送者以指示消息的处理状态。这个结果有两种可能的值 publisher-confirm发送者确认 消息成功投递到交换机系统返回 ack确认。消息未能成功投递到交换机系统返回 nack未确认。 publisher-return发送者回执 消息成功投递到交换机但是没有成功路由到队列系统返回 ACK同时提供路由失败的原因。 这个确认机制的目的是确保消息在发送到消息队列后发送者能够获得有关消息处理状态的明确反馈从而可以采取适当的措施例如重发消息或记录失败信息。 需要注意的是为了实现这一机制需要为每条消息设置一个全局唯一的标识符以便区分不同的消息避免在确认过程中出现冲突。 例如下图所示 确保消息生产者能够获得有关消息状态的反馈是确保消息可靠性的关键一步因为它有助于解决消息可能在发送期间丢失的问题。这是构建可靠的消息队列系统中的重要组成部分。 1.2 实现生产者消息的确认 下面将通过一个 Java 的 Spring Boot 项目来演示如何实现生产者消息的确认。这个项目的结构如下 这个项目有两个模块其中 consumer 负责对消息的消费而 publisher 负责发送消息。下面是在 publisher 模块中实现消息确认的具体步骤 在 publisher 服务中的 application.yml 文件中添加如下配置 spring:rabbitmq:publisher-confirm-type: correlatedpublisher-returns: truetemplate:mandatory: true对这个配置的详细说明 publish-confirm-type开启 publisher-confirm 功能这里支持两种类型 simple同步等待 confirm 结果直到超时correlated异步回调定义ConfirmCallbackMQ返回结果时会回调这个 ConfirmCallback。 publish-returns开启 publish-return 功能同样是基于 callback 机制不过是定义 ReturnCallbacktemplate.mandatory定义消息路由失败时的策略。true则调用 ReturnCallbackfalse则直接丢弃消息。 给 RabbitTemplate 配置 ReturnCallback Configuration Slf4j public class CommonConfig implements ApplicationContextAware {Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {// 获取 RabbitTemplate 对象RabbitTemplate rabbitTemplate applicationContext.getBean(RabbitTemplate.class);// 配置 ReturnCallBackrabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) - {// 记录日志log.error(消息发送到队列失败响应码{}失败原因{}交换机{}路由 Key{}消息{},replyCode, replyText, exchange, routingKey, message.toString());// 如果有需要接下来可以重发消息或者执行其他通知逻辑});} }由于每个 RabbitTemplate 只能配置一个 ReturnCallback并且 RabbitTemplate 在Spring 中是一个全局对象因此需要在项目启动过程中配置。 上述代码就是一个 Spring Boot 的配置类通常用于在项目启动时配置一些全局的设置。在这个配置类中实现了 ApplicationContextAware 接口用于获取 Spring 应用上下文ApplicationContext对象。主要作用是配置 RabbitMQ 的 ReturnCallback以处理消息发送到队列失败的情况。 发送消息指定消息的 ID以及消息的 ConfirmCallback Test public void testSendMessage2SimpleQueue() throws InterruptedException {String routingKey simple.test;// 1. 准备消息String message hello, spring amqp!;// 2. 准备 CorrelationDate// 2.1.消息IDCorrelationData correlationData new CorrelationData(UUID.randomUUID().toString());// 2.2.准备 ConfirmCallbackcorrelationData.getFuture().addCallback(confirm - {// 消息发送成功// 判断结果if(confirm ! null confirm.isAck()){// ACKlog.debug(消息投递到交换机成功消息 ID: {}, correlationData.getId());} else {// NACKlog.error(消息投递到交换机失败消息 ID: {}, correlationData.getId());}}, throwable - {// 发送失败// 记录日志log.error(消息发送失败, throwable);// 重发消息...});// 3. 发送消息rabbitTemplate.convertAndSend(amq.topic, routingKey, message, correlationData); }这是一个 Java 测试方法用于发送消息到 RabbitMQ 队列并指定消息的 ID 以及 ConfirmCallback确认回调。以下是对这段代码的详细解释 testSendMessage2SimpleQueue: 这是一个测试方法用于演示如何发送消息到名为 “simple.test” 的 RabbitMQ 队列。 String routingKey simple.test;: 定义了消息的路由键这是用于将消息路由到特定队列的关键。 准备消息将要发送的消息内容存储在 message 变量中。 准备 CorrelationData CorrelationData 用于关联消息的 ID。使用 UUID.randomUUID().toString() 生成一个全局唯一的消息 ID。 准备 ConfirmCallback CorrelationData.getFuture().addCallback(confirm - { ... }, throwable - { ... }) 定义了 ConfirmCallback该回调会在消息的发送状态发生变化时触发。在 ConfirmCallback 中判断了消息是否成功投递到交换机 如果 confirm 不为 null 且 confirm.isAck() 为 true则表示消息成功到达交换机记录一条成功的日志。否则如果消息未成功到达交换机则记录一条失败的日志。 在 throwable 回调中处理了发送失败的情况记录了失败的日志可以在这里添加重发消息或其他失败处理逻辑。 发送消息 使用 rabbitTemplate.convertAndSend(amq.topic, routingKey, message, correlationData); 发送消息到 RabbitMQ。参数包括交换机名称、路由键、消息内容和关联的 CorrelationData。 这段代码演示了如何发送消息并在消息状态变化时使用 ConfirmCallback 处理消息的确认情况。通过关联消息 ID 和 ConfirmCallback可以确保消息的可靠性根据确认情况采取适当的措施。 1.3 验证生产者消息的确认 下面通过可以运行上述测试代码来查看生产者的消息确认情况 正常发送消息 直接执行测试方法可以发现消息成功投递到交换机 发送消息失败 此时将交换机的名称改成一个错误不存在的 然后再次执行测试方法 发现此时消息投递到交换机失败说明此时返回的是 NACK并且提示了错误的原因是找不到名为 aamq.topic的交换机。 成功发送消息但是路由失败 此时将交换机的名称修改回来但是将路由 Key 修改成错误的 然后执行测试方法 通过输出的日志可以发现消息成功投递到了交换机但是由于路由 Key 不正确导致路由不到 simple,queue从而触发调用了上文配置的ReturnCallback。 二、消息的持久化 在通过上文的生产者消息确认机制之后确保了消息能够正确的发送到队列中但是这并不意味着消息就安全了。因为 RabbitMQ 默认是内存储存的如果出现了 RabbitMQ 宕机的情况那么此时队列中的消息还是会丢失。要确保消息能够真正的安全我们还需要实现消息的持久化。 2.1 演示消息的丢失 例如现在 simple.queue 中存在 3 条消息 这些消息是通过 RabbitMQ 自带的交换机 amp.topic 进行转发的 然后我们重启一下 RabbitMQ 服务看一看队列中的消息是否还存在 此时我们重新服务 RabbitMQ 的控制台发现连 simple.queue 都消失了 但是RabbitMQ自带的 amp.topic 交换机还存在 说明这个交换机是持久化储存的如果仔细观察可以发现这些所有的交换机的 Features 都带有一个 D 即持久化 Durable。 因此要让我们自己创建的队列或者交换机也能持久存在就可以否选上 Durable 这个选项 2.2 声明持久化的交换机和队列 通过上文我们知道了可以在 RabbitMQ 的控制台创建交换机和队列的时候可以勾选 Durable 来达到持久化的目的但是如果使用代码来创建持久化的交换机和队列呢下面我将使用 Java 代码来演示这个过程 由于消费者comsumer在启动的时候可以帮我们创建交换机和队列因此将交换机和队列的声明交给 consumer 来完成。 声明持久化的交换机 Configuration public class CommonConfig {Beanpublic DirectExchange simpleDirect(){// DirectExchange的构造方法有三个参数交换机名称、是否持久化、当没有 queue 与其绑定时是否自动删除return new DirectExchange(simple.direct, true, false);} }声明持久化的队列持久化 Configuration public class CommonConfig {// ...Beanpublic Queue simpleQueue(){// 使用QueueBuilder构建队列其中使用 durable 方法就是持久化的return QueueBuilder.durable(simple.queue).build();} }当完成了上面两步之后我们可以启动 consumer 服务 此时我们发现成功创建了simple.direct交换机和 simple.queue 队列并且它们都是持久的。然后停止consumer 服务在 RabbitMQ 的控制台中向 simple.queue 添加一条消息 然后再次重启 RabbitMQ 服务发现刚才创建的交换机和队列都还在但是消息却没有了 因为我刚才添加的是非持久化的消息 2.3 发送持久化的消息 同样在控制台添加消息的时候可以设置消息的持久化和非持久化下面让我来演示然后在使用 Java 代码发送持久化的消息 Test public void testDurableMessage() {// 1. 准备消息Message message MessageBuilder.withBody(hello, simple.queue.getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();// 2. 发送消息rabbitTemplate.convertAndSend(simple.queue, message); }在发送持久化的消息需要使用MessageBuilder来构建消息其中withBody用于指定消息体setDeliveryMode用来设置消息的发送类型可以是持久化的也可以是非持久化的build 与构建消息。 完成上述代码之后我们可以执行这个测试方法 查看 RabbitMQ 的控制台发现成功发送了消息并且其中的 delivery_mode 为 2代表的就是持久化 再次重启 RabbitMQ 服务 此时发现刚才的消息并没有丢失至此我们就完成了持久化消息的发送进一步确保了消息的可靠性。另外其实在使用 Spring AMQP 创建的交换机队列和发送的消息都是持久化的。 三、消费者消息的确认 3.1 配置消费者消息确认 RabbitMQ 同样也支持消费者确认机制即当消费者处理消息后可以向 MQ 发送 ack 回执当 MQ 收到 ack 回执后才会删除该消息。而Spring AMQP 则允许配置三种确认模式 manual在代码中手动 ack需要在业务代码结束后调用Spring AMQP 提供的 API 发送 ack但是这种情况存在代码侵入的问题。auto基于 AOP 自动发送 ack由 Spring 监测 listener 代码是否出现异常没有异常则返回 ack抛出异常则返回 nacknone关闭 ackMQ 假定消费者获取消息后会成功处理因此消息投递后立即被删除。 实现消费者的确认机制的方式就是是修改application.yml文件添加下面配置 spring:rabbitmq:listener:simple:prefetch: 1acknowledge-mode: auto # none关闭ackmanual手动ackauto自动 ack3.2 演示 none 模式 此时我们将消费者的确认模式改为 none 消息处理逻辑 Slf4j Component public class SpringRabbitListener {RabbitListener(queues simple.queue)public void listenSimpleQueue(String msg) {System.out.println(消费者接收到simple.queue的消息【 msg 】);// 模拟异常System.out.println(1/0);log.info(消费者处理消息成功);} }在这里使用 System.out.println(1/0)来模拟异常的产生。 此时在 simple.queue 中存在一条消息 然后我们将断点设置到如下位置 调试运行可以发现在 none 模式下只有消费者接收到了消息RabbitMQ 就会立即删除队列中的消息。 在这种none模式下队列中的消息并不可靠当消费者消费消息失败的时候不应该理解删除而是应该重新发送或者采取其他措施来保证消息的可靠性。 3.3 演示 auto 模式 接下来让我们演示一下 auto 模式 同样在simple.queue中准备一条消息 然后调试运行刚才的代码 此时发现consumer成功接收到了消息 并且此时 simple.queue 中消息的状态变成了 Unacked 如果此时放行代码发现消费者还是会继续接收到这条消息 此时如果取消断点并放开代码会发现此时的消费者就会一直死循环的接收到这条消息。 通过上面的演示可以发现尽管在 auto 模式下保证了消息的不丢失但是此时如果消费者出现了异常就会死循环的接收并尝试处理同一条消息。面对这个问题还需要采取其他措施来进行处理例如下文消费者消费失败的重试机制。 四、消息消费失败的重试机制 4.1 本地重试机制 当消费者出现异常后消息会不断 requeue重新入队到队列再重新发送给消费者然后再次异常再次 requeue无限循环导致 MQ的消息处理的压力大大提高给 MQ 服务器带来不必要的压力 我们可以利用 Spring 的 retry 机制在消费者出现异常时利用本地重试而不是无限制的 requeue 到 MQ 队列使用这个重试机制需要在 application.yml 添加如下配置 spring:rabbitmq:listener:simple:retry:enabled: true # 开启消费者失败重试initial-interval: 1000 # 初识的失败等待时长为1秒multiplier: 1 # 失败的等待时长倍数下次等待时长 multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态false 有状态。如果业务中包含事务这里改为 false完成了上面的配置之后再次重启 consumer 发现消费者在本地重试了三次最终还是失败然后就放弃重试并且simple.queue 中的消息也删除了。 4.2 失败消息的处理策略 在开启重试模式后重试次数耗尽如果消息依然失败则需要有 MessageRecoverer 接口来处理它包含三种不同的实现 RejectAndDontRequeueRecoverer重试次数耗尽后直接 reject丢弃消息这是默认采取的方式ImmediateRequeueMessageRecoverer重试次数耗尽后返回 nack消息重新入队RepublishMessageRecoverer重试耗尽后将失败消息投递到指定的交换机。 下面演示一下 RepublishMessageRecoverer 处理模式 首先定义接收失败消息的交换机、队列及其绑定关系 Bean public DirectExchange errorMessageExchange() {return new DirectExchange(error.direct); }Bean public Queue errorQueue() {return new Queue(error.queue, true); }Bean public Binding errorBinding() {return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with(error); } 然后定义RepublishMessageRecoverer Bean public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate) {return new RepublishMessageRecoverer(rabbitTemplate, error.direct, error); }当我们注册了 RepublishMessageRecoverer Bean 对象之后就会自动覆盖 Spring 提供的默认的 RejectAndDontRequeueRecoverer 的 Bean 对象。 当完成了上述的所有配置之后首先在 simple.queue 中准备一条消息然后再启动 consumer 最终发现处理失败的消息最终转发到了 error.queue 队列了。
http://www.tj-hxxt.cn/news/225067.html

相关文章:

  • 做画册好的国外网站推荐安亭公司网站建设
  • 域名没过期 网站打不开怎么办黄页88可信吗
  • 最新获取网站访客qq接口网站导航栏动效怎么做的
  • 浏览器网站大全免费wordpress实用插件
  • 网站建设 核算专业做网站电话
  • iis网站筛选器被挂马网站建站 在线制作
  • 仿淘宝网站制作高端的丹阳网站建设
  • 常用的做网站的工具都有哪些网页游戏推广网站怎么做
  • 地产广告设计网站做全景的网站
  • 微信怎么做一些微网站网站建设要懂哪些技术
  • 西宁做网站_君博相约怎么做网站规划
  • 北京市建设局网站秦皇岛网络
  • 网站搭建与网站建设wordpress post link
  • 手机平板购物网站的设计背景新余企业网站建设
  • 晋中做网站安装wordpress
  • 搭建网站什么意思网站页面链接怎么做的
  • 南京网站公司seo搜索引擎优化名词解释
  • Editplus做网站网络营销课程总结ppt
  • 网站标题字体免费logo设计网站推荐
  • wap网站价格济南网站建设专业
  • 外贸网站外链怎么做开发网站赚钱
  • 建设银行个人网站显示不了铁岭建设网站
  • 网盘做网站建筑培训网排行榜
  • 纸箱 技术支持 东莞网站建设新媒体包不包括公司网站的建设
  • 为什么网站打开老是提示建设中北京优质网站制作
  • 企业类网站模板平面设计师需要会什么软件
  • 做网站的教科书福州市住房和城乡建设部网站
  • 网站正在建设中 模版八年级学生做的简易网站
  • 南昌门户网站wordpress 备案号插件
  • 南京网站做的好的公司表白网站制作器