当前位置: 首页 > news >正文

我的世界做封面网站北京门头沟山洪暴发

我的世界做封面网站,北京门头沟山洪暴发,wordpress 推荐环境,wordpress换域名搬家持续学习持续更新中… 守破离 【雷丰阳-谷粒商城 】【分布式高级篇-微服务架构篇】【22】【RabbitMQ】 Message Queue 消息队列异步处理应用解耦流量控制 消息中间件概念RabbitMQ概念MessagePublisherExchangeQueueBindingConnectionChannelConsumerVirtual HostBroker图… 持续学习持续更新中… 守破离 【雷丰阳-谷粒商城 】【分布式高级篇-微服务架构篇】【22】【RabbitMQ】 Message Queue 消息队列异步处理应用解耦流量控制 消息中间件概念RabbitMQ概念MessagePublisherExchangeQueueBindingConnectionChannelConsumerVirtual HostBroker图示 安装RabbitMQ运行机制AMQP 中的消息路由Exchange类型练习 RabbitMQ整合AmqpAdmin 管理组件RabbitTemplate 消息发送处理组件监听消息注意 RabbitMQ消息确认机制-可靠抵达发送端—ConfirmCallback发送端—ReturnCallback发送端—代码配置消费端 消息的TTLTime To Live什么是死信延时队列延时队列的实现如何保证消息可靠性消息丢失消息重复消息积压 MQ对比参考 Message Queue 消息队列 异步处理 应用解耦 这样不管库存系统的接口会不会发生改变订单系统都不关心 流量控制 把用户请求流量存到消息队列中后台服务根据它自身的处理能力去来进行消费处理不会导致后台服务宕机 消息中间件概念 大多应用中可通过消息服务中间件来提升系统异步通信、扩展解耦能力消息服务中两个重要概念消息代理message broker和目的地destination 当消息发送者发送消息以后将由消息代理接管消息代理保证消息传递到指定目的地。 消息队列主要有两种形式的目的地 队列queue点对点消息通信point-to-point主题topic发布publish/订阅subscribe消息通信 点对点式 消息发送者发送消息消息代理将其放入一个队列中消息接收者从队列中获取消息内容消息读取后被移出队列消息只有唯一的发送者和接收者很多人都可以监听队列但是消息谁抢到就是谁的 发布订阅式 发送者发送消息到主题多个接收者订阅者监听订阅这个主题那么就会在消息到达时同时收到消息订阅者都会收到消息 JMSJava Message ServiceJAVA消息服务基于JVM消息代理的规范。ActiveMQ、HornetMQ是JMS实现 AMQPAdvanced Message Queuing Protocol高级消息队列协议也是一个消息代理的规范兼容JMS RabbitMQ是AMQP的实现 RabbitMQ概念 RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue Protocol)的开源实现。 Message 消息消息是不具名的它由消息头和消息体组成。消息体是不透明的而消息头则由一系列的可选属性组成 这些属性包括routing-key路由键、priority相对于其他消息的优先权、delivery-mode指出该消息可能需要持久性存储等。 Publisher 消息的生产者也是一个向交换器发布消息的客户端应用程序。 Exchange 交换器用来接收生产者发送的消息并将这些消息路由给服务器中的队列。 Exchange有4种类型direct(默认)fanout, topic, 和headers不同类型的Exchange转发消息的策略有所区别 Queue 消息队列用来保存消息直到发送给消费者。它是消息的容器也是消息的终点。一个消息可投入一个或多个队列。消息一直 在队列里面等待消费者连接到这个队列将其取走。 Binding 绑定用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则所以可以将交 换器理解成一个由绑定构成的路由表。 Exchange 和Queue的绑定可以是多对多的关系。 Connection 网络连接比如一个TCP连接。 客户端和消息中间件之间一直保持一个长连接。1个客户端只会建立1条连接 长连接的好处客户端宕机或下线该长连接就会断开RabbitMQ就会感知到就不会再继续派发消息可以防止大面积消息丢失问题。 Channel 1条Connection上建立多个Channel收发数据通过Channel进行 信道多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接AMQP 命令都是通过信道 发出去的不管是发布消息、订阅队列还是接收消息这些动作都是通过信道完成。 因为对于操作系统来说建立和销毁 TCP 都 是非常昂贵的开销所以引入了信道的概念以复用一条 TCP 连接。 Consumer 消息的消费者表示一个从消息队列中取得消息的客户端应用程序。 Virtual Host 虚拟主机表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个Vhost 本质上就是一个 mini 版的 RabbitMQ 服务器拥有自己的队列、交换器、绑定、配置和权限机制。VHost 是 AMQP 概念的基础必须在连接时指定RabbitMQ 默认的 vhost 是 / 。 虚拟主机跟虚拟主机之间是相互隔离的。可以用于生产环境和开发环境隔离等 类比RabbitMQ类似一个多租户系统各个用户相互隔离。每一个 RabbitMQ 服务器都能创建 虚拟的消息服务器称之为 虚拟主机virtual host简称 vhost。vhost 本质上是一个独立的小型 RabbitMQ 服务器vhost 可避免队列和交换器等命名冲突vhost 之间是绝对隔离的 Broker 表示消息队列服务器实体 图示 安装 docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:managementdocker update rabbitmq --restartalways https://www.rabbitmq.com/networking.html4369, 25672 (Erlang发现集群端口)5672, 5671 (AMQP端口)15672 (web管理后台端口)61613, 61614 (STOMP协议端口)1883, 8883 (MQTT协议端口) RabbitMQ运行机制 AMQP 中的消息路由 AMQP 中消息的路由过程和 Java 开发者熟悉的 JMS 存在一些差别AMQP 中增加了 Exchange 和Binding 的角色。生产者把消息发布到 Exchange 上消息最终到达队列并被消费者接收而 Binding 决定交换器的消息应该发送到那个队列。 Exchange类型 Exchange分发消息时根据类型的不同分发策略有区别目前共四种类型direct、fanout、topic、headers 。headers 匹配 AMQP 的消息 header 而不是路由键headers 交换器和 direct 交换器完全一致但性能差很多目前几乎用不到了所以直接看另外三种类型 消息中的路由键routing key如果和 Binding 中的 binding key 一致 交换器就将消息发到对应的队列中。 路由键与队列名完全匹配如果一个队列绑定到交换机要求路由键为“dog”则只转发 routing key 标记为“dog”的消息不会转发“dog.puppy”也不会转发“dog.guard” 等等。它是完全匹配的模式。 每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键只是简单的将队列绑定到交换器上每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播每台子网内的主机都获得了一份复制的消息。 fanout 类型转发消息是最快的。 topic 交换器通过模式匹配分配消息的路由键属性将路由键和某个模式进行匹配此时队列需要绑定到一个模式上。 它将路由键和绑定键的字符串切分成单词这些单词之间用点隔开。 它同样也会识别两个通配符符号“#”和符号“*”#匹配0个或多个单词*匹配1个单词。 练习 RabbitMQ整合 dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId /dependencyspring.rabbitmq.host192.168.56.10 # 都有默认配置 #spring.rabbitmq.port5672 #spring.rabbitmq.virtual-host/ #spring.rabbitmq.usernameguest #spring.rabbitmq.passwordguest/*** 使用RabbitMQ* 1、引入amqp场景启动器RabbitAutoConfiguration 就会自动生效** 2、给容器中自动配置了* RabbitTemplate、AmqpAdmin、CachingConnectionFactory、RabbitMessagingTemplate;* 所有的属性都是 spring.rabbitmq* ConfigurationProperties(prefix spring.rabbitmq)* public class RabbitProperties** 3、给配置文件中配置 spring.rabbitmq 信息** 4、EnableRabbit: EnableXxxxx开启功能** 5、监听消息使用RabbitListener必须有EnableRabbit* RabbitListener: 类方法上监听哪些队列即可* RabbitHandler配合RabbitListener标在方法上重载区分不同的消息*/EnableRabbitConfiguration public class MyRabbitConfig {/*** 使用JSON序列化机制进行消息转换*/Beanpublic MessageConverter messageConverter() {return new Jackson2JsonMessageConverter();}AmqpAdmin 管理组件 Testpublic void createExchange() {/*** DirectExchange(String name, boolean durable, boolean autoDelete, MapString, Object arguments)*/Exchange directExchange new DirectExchange(hello-java-exchange, true, false);amqpAdmin.declareExchange(directExchange);log.info(Exchange[{}]创建成功, hello-java-exchange);}Testpublic void createQueue() {/*** public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, MapString, Object arguments)*/Queue queue new Queue(hello-java-queue, true, false, false);amqpAdmin.declareQueue(queue);log.info(Queue[{}]创建成功, hello-java-queue);}Testpublic void createBinding() {/*** String destination【目的地】,* DestinationType destinationType【目的地类型】,* String exchange【交换机】,* String routingKey【路由键】,* MapString, Object arguments【自定义参数】** 将exchange指定的交换机和destination目的地进行绑定使用routingKey作为指定的路由键*/Binding binding new Binding(hello-java-queue, Binding.DestinationType.QUEUE, hello-java-exchange, hello.java, null);amqpAdmin.declareBinding(binding);log.info(Binding[{}]创建成功, hello-java-binding);}RabbitTemplate 消息发送处理组件 Testpublic void sendMessageTest() {//1、发送消息如果发送的消息是个对象我们会使用序列化机制将对象写出去。对象必须实现SerializableString msg Hello World!;rabbitTemplate.convertAndSend(hello-java-exchange, hello.java, msg);// 2、发送的对象类型的消息可以自动转成一个json【自定义MessageConverter即可】 // OrderReturnReasonEntity reasonEntity new OrderReturnReasonEntity(); // reasonEntity.setId(1L); // reasonEntity.setCreateTime(new Date()); // reasonEntity.setName(哈哈- 666); // rabbitTemplate.convertAndSend(hello-java-exchange, hello.java, // reasonEntity);log.info(消息发送完成{});}CorrelationData消息的唯一ID【可以放在数据库或者Redis中方便之后判断哪些消息没有可靠抵达】 RestController public class RabbitController {AutowiredRabbitTemplate rabbitTemplate;GetMapping(/sendOneMq)public String sendOneMq(RequestParam(value num, defaultValue 10) Integer num) {OrderReturnReasonEntity reasonEntity new OrderReturnReasonEntity();reasonEntity.setId(1L);reasonEntity.setCreateTime(new Date());reasonEntity.setName(哈哈); // CorrelationData消息的唯一ID【可以放在数据库或者Redis中方便之后判断哪些消息没有可靠抵达】rabbitTemplate.convertAndSend(hello-java-exchange, hello.java, reasonEntity, new CorrelationData(UUID.randomUUID().toString()));return ok;}监听消息 RabbitListener RabbitHandler 重载区分不同的消息 GetMapping(/sendMq)public String sendMq(RequestParam(value num, defaultValue 10) Integer num) {for (int i 0; i num; i) {if (i % 2 0) {OrderReturnReasonEntity reasonEntity new OrderReturnReasonEntity();reasonEntity.setId(1L);reasonEntity.setCreateTime(new Date());reasonEntity.setName(哈哈- i);rabbitTemplate.convertAndSend(hello-java-exchange, hello.java, reasonEntity, new CorrelationData(UUID.randomUUID().toString()));} else {OrderEntity entity new OrderEntity();entity.setOrderSn(UUID.randomUUID().toString());rabbitTemplate.convertAndSend(hello-java-exchange, hello22.java, entity, new CorrelationData(UUID.randomUUID().toString()));}}return ok;}RabbitListener(queues {hello-java-queue}) Service(orderItemService) public class OrderItemServiceImpl extends ServiceImplOrderItemDao, OrderItemEntity implements OrderItemService {RabbitHandlerpublic void receiveMessage(Message message,OrderReturnReasonEntity content,Channel channel) throws InterruptedException { // 接收到消息...(Body:{id:1,name:哈哈-666,sort:null,status:null,createTime:1720438632794} MessageProperties [headers{__TypeId__com.atguigu.gulimall.order.entity.OrderReturnReasonEntity}, contentTypeapplication/json, contentEncodingUTF-8, contentLength0, receivedDeliveryModePERSISTENT, priority0, redeliveredfalse, receivedExchangehello-java-exchange, receivedRoutingKeyhello.java, deliveryTag1, consumerTagamq.ctag-ltQ3IQ4H1lvLGqYNyEH3nQ, consumerQueuehello-java-queue])System.out.println(RabbitListener接收到消息... message); // System.out.println(接收到消息...contentcontent); // byte[] body message.getBody(); // //消息头属性信息 // MessageProperties properties message.getMessageProperties(); // Thread.sleep(3000); // System.out.println(消息处理完成content.getName());}RabbitHandlerpublic void receiveMessage2(OrderEntity content) throws InterruptedException {System.out.println(RabbitHandler接收到消息... content);}}RabbitListener写在方法上 Service(orderItemService) public class OrderItemServiceImpl extends ServiceImplOrderItemDao, OrderItemEntity implements OrderItemService {RabbitListener(queues {hello-java-queue})public void receiveMessage(OrderEntity content) throws InterruptedException {System.out.println(接收到消息... content);}}注意 Queue、Exchange、Binding可以 Bean 注入进去监听消息的方法可以有三种参数不分数量顺序Object content, Message message, Channel channelchannel可以用来拒绝或者签收消息 RabbitMQ消息确认机制-可靠抵达 保证消息不丢失可靠抵达可以使用事务消息这样性能会下降250倍为此引入确认机制 publisher confirmCallback 确认模式publisher returnCallback 未投递到 queue 退回模式consumer 手动 ack 机制 发送端—ConfirmCallback spring.rabbitmq.publisher-confirmstrue 在创建 connectionFactory 的时候设置 PublisherConfirms(true) 选项开启confirmcallback 。 CorrelationData用来表示当前消息唯一性。 消息只要被 broker 接收到就会执行 confirmCallback如果是 cluster 模式需要所有 broker 接收到才会调用confirmCallback。 被 broker 接收到只能表示 message 已经到达服务器并不能保证消息一定会被投递到目标 queue 里。所以需要用到接下来的 returnCallback 。 发送端—ReturnCallback spring.rabbitmq.publisher-returnstrue spring.rabbitmq.template.mandatorytrue confrim 模式只能保证消息到达 broker不能保证消息准确投递到目标 queue 里。在有些业务场景下我们需要保证消息一定要投递到目标 queue 里此时就需要用到return 退回模式。 这样如果消息未能投递到目标 queue 里将调用 returnCallback 可以记录下详细到投递数据定期的巡检或者自动纠错都需要这些数据。 发送端—代码配置 #服务端确认 # 开启消息正确抵达RabbitMQ确认 spring.rabbitmq.publisher-confirmstrue # 开启RabbitMQ中的消息正确的从交换机投递到队列确认 spring.rabbitmq.publisher-returnstrue # 只要消息抵达队列以异步方式优先回调ReturnCallback spring.rabbitmq.template.mandatorytrueConfiguration public class MyRabbitConfig {/*** 使用JSON序列化机制进行消息转换*/Beanpublic MessageConverter messageConverter() {return new Jackson2JsonMessageConverter();}private RabbitTemplate rabbitTemplate;PrimaryBeanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate new RabbitTemplate(connectionFactory);this.rabbitTemplate rabbitTemplate;rabbitTemplate.setMessageConverter(messageConverter());initRabbitTemplate();return rabbitTemplate;}/*** 定制RabbitTemplate* 做好消息确认机制publisherconsumer【手动ack】* 每一个发送的消息都在数据库做好记录。定期将失败的消息再次发送一遍** 发送端确认* 1、Broker收到消息就回调【P——B】* 1、spring.rabbitmq.publisher-confirmstrue* 2、设置确认回调ConfirmCallback* 2、消息没有正确抵达队列进行回调【E——Q】* 1、spring.rabbitmq.publisher-returnstrue* spring.rabbitmq.template.mandatorytrue* 2、设置确认回调ReturnCallback*** 消费端确认保证每个消息被正确消费此时broker才可以删除这个消息。* spring.rabbitmq.listener.simple.acknowledge-modemanual #手动签收* 默认是自动确认的只要消息接收到客户端会自动确认服务端就会移除这个消息* 问题* 我们收到很多消息如果自动回复给服务器ack当消息由于各种原因没有成功处理完成就会发生消息丢失* 消费者手动确认模式* 只要我们没有明确告诉MQ消息被签收消息就一直是unacked状态。* 即使Consumer宕机消息也不会丢失会重新变为Ready状态下一次有新的Consumer连接进来就发给他* 如何签收:* channel.basicAck(deliveryTag,false);业务成功签收* channel.basicNack(deliveryTag,false,true);业务失败拒签并让消息重新入队*/ // PostConstruct //MyRabbitConfig对象创建完成以后也就是构造器执行完成后执行这个方法public void initRabbitTemplate() {rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {/*** P —— B [Publisher —— Broker]** 只要消息抵达Broker这里的ack就为true* param correlationData 当前消息的唯一关联数据这个是消息的唯一id发布者发消息的时候传递的* param ack 消息是否成功抵达Broker* param cause 失败的原因*/Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {//Broker收到了:修改消息的状态——Broker接收到消息 // System.out.println(confirm...correlationData[ correlationData ]ack[ ack ]cause[ cause ]);}});rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {/*** E —— Q [Exchange —— Queue]** 只要消息没有正确投递给指定的队列就会触发这个失败回调* param message 投递失败的消息的详细信息* param replyCode 回复的状态码* param replyText 回复的文本内容* param exchange 当时这个消息发给了哪个交换机* param routingKey 当时这个消息指定的路由键*/Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {//报错了:修改数据库当前消息的状态——队列接收消息发生错误 // System.out.println(Fail Message[ message ]replyCode[ replyCode ]replyText[ replyText ]exchange[ exchange ]routingKey[ routingKey ]);}});}}消费端 消费者获取到消息成功处理可以回复Ack给Broker basic.ack用于肯定确认broker将移除此消息basic.nack用于否定确认可以指定broker是否丢弃此消息可以批量basic.reject用于否定确认可以指定broker是否丢弃此消息但不能批量 默认自动ack消息被消费者收到就会从broker的queue中移除 queue无消费者消息依然会被存储直到消费者消费。消费者收到消息默认会自动ack。但是如果无法确定此消息是否被处理完成 或者成功处理。我们可以开启手动ack模式spring.rabbitmq.listener.simple.acknowledge-modemanual 消息处理成功ack()接受下一个消息此消息broker就会移除消息处理失败nack()/reject()重新发送给其他人进行处理或者容错处理后ack或者丢弃消息一直没有调用ack/nack方法比如宕机/程序出异常broker认为此消息正在被处理不会投递给别人此时客户端断开MQ感知到后消息不会被broker移除会重新入队并投递给别人 # 客户端确认开启手动ack消息 spring.rabbitmq.listener.simple.acknowledge-modemanual/*** 消费端确认保证每个消息被正确消费此时broker才可以删除这个消息。* spring.rabbitmq.listener.simple.acknowledge-modemanual #手动签收* 默认是自动确认的只要消息接收到客户端会自动确认服务端就会移除这个消息* 问题* 我们收到很多消息如果自动回复给服务器ack当消息由于各种原因没有成功处理完成就会发生消息丢失* 消费者手动确认模式* 只要我们没有明确告诉MQ消息被签收消息就一直是unacked状态。* 即使Consumer宕机消息也不会丢失会重新变为Ready状态下一次有新的Consumer连接进来就发给他* 如何签收:* channel.basicAck(deliveryTag,false);业务成功签收* channel.basicNack(deliveryTag,false,true);业务失败拒签并让消息重新入队*/RabbitListener(queues {hello-java-queue})public void receiveMessage0(Message message,Channel channel) { // multiple是否批量 // requeuefalse 丢弃 requeuetrue 发回服务器让服务器重新入队该消息。// long deliveryTag, boolean multiple // channel.basicAck( deliveryTag, false); 只签收当前货物不批量签收// long deliveryTag, boolean multiple, boolean requeue // channel.basicNack( deliveryTag, false, true); 拒签当前货物【是否将该消息让MQ重新放入队列看自己的业务需求】// long deliveryTag, boolean requeue // channel.basicReject( deliveryTag, true); 拒签当前货物【是否将该消息重新放回MQ看自己的业务需求】//DeliveryTag在channel内按顺序自增long deliveryTag message.getMessageProperties().getDeliveryTag();System.out.println(deliveryTag deliveryTag);//签收货物消息非批量模式try {if (deliveryTag % 2 0) {//收货channel.basicAck(deliveryTag, false);System.out.println(签收了货物 message ...deliveryTag... deliveryTag);} else {//退货 requeuefalse 丢弃 requeuetrue 发回服务器服务器重新入队。channel.basicNack(deliveryTag, false, true); // channel.basicReject(deliveryTag, true);// channel.basicNack(deliveryTag, false, false);System.out.println(不签收货物 message ...deliveryTag... deliveryTag);}} catch (Exception e) {//网络中断签收信息未成功发送给Broker}}消息的TTLTime To Live 消息的TTL就是消息的存活时间。 RabbitMQ给消息设置 TTL 通过队列设置队列中的消息都有相同的过期时间给消息本身设置每条消息的 TTL 可以不同 如果队列设置了消息也设置了则最小的 TTL 生效。所以一个消息如果被路由到不同的队列中这个消息死亡的时间有可能不一样不同的队列设置。 消息在队列中生存时间一旦超过 TTL就会变成死信Dead Message 什么是死信 一个消息被Consumer拒收了并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里被其他消费者使用。basic.reject/ basic.nackrequeuefalse消息的TTL到了消息过期了。没有人消费它队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上 延时队列 当消息在一个队列中变成死信之后它能被发送到一个指定的交换机中这个交换机就是 DLX Dead Letter Exchange可称为死信交换机。 绑定在 DLX 上的队列就称为死信队列。 我们既可以控制消息在一段时间后变成死信又可以控制变成死信的消息被路由到某一个指定的交换机这个交换机又可以绑定队列去消费死信结合它们就可以实现一个延时队列 不推荐使用给消息设置过期时间这种方式实现延时队列 延时队列的实现 import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;import java.util.HashMap; import java.util.Map;Configuration public class MyMQConfig {//Bean BindingQueueExchange/*** 容器中的 BindingQueueExchange 都会自动创建RabbitMQ没有的情况* RabbitMQ中已有的话 Bean中声明属性发生了变化也不会覆盖*/Beanpublic Queue orderDelayQueue() {MapString,Object arguments new HashMap();/*** x-dead-letter-exchange: order-event-exchange* x-dead-letter-routing-key: order.release.order* x-message-ttl: 60000*/arguments.put(x-dead-letter-exchange,order-event-exchange);arguments.put(x-dead-letter-routing-key,order.release.order);arguments.put(x-message-ttl,60000);//String name, boolean durable, boolean exclusive, boolean autoDelete, MapString, Object argumentsreturn new Queue(order.delay.queue, true, false, false,arguments);}Beanpublic Queue orderReleaseOrderQueue() {return new Queue(order.release.order.queue, true, false, false);}Beanpublic Exchange orderEventExchange() {//String name, boolean durable, boolean autoDelete, MapString, Object argumentsreturn new TopicExchange(order-event-exchange,true,false);}Beanpublic Binding orderCreateOrderBinding() {//String destination, DestinationType destinationType, String exchange, String routingKey,// MapString, Object argumentsreturn new Binding(order.delay.queue,Binding.DestinationType.QUEUE,order-event-exchange,order.create.order,null);}Beanpublic Binding orderReleaseOrderBinding() {return new Binding(order.release.order.queue,Binding.DestinationType.QUEUE,order-event-exchange,order.release.order,null);}}import com.atguigu.gulimall.order.entity.OrderEntity; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.ResponseBody;import java.io.IOException; import java.util.Date; import java.util.UUID;Controller public class HelloController {AutowiredRabbitTemplate rabbitTemplate;RabbitListener(queues {order.release.order.queue})public void testListenOrderRelease(OrderEntity order, Message message, Channel channel) throws IOException {System.out.println(listenOrderRelease order : order);System.out.println(listenOrderRelease message : message);System.out.println(listenOrderRelease channel : channel);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}ResponseBodyGetMapping(/test/createOrder)public String createOrderTest() {//订单下单成功OrderEntity entity new OrderEntity();entity.setOrderSn(UUID.randomUUID().toString());entity.setModifyTime(new Date());//给MQ发送消息。rabbitTemplate.convertAndSend(order-event-exchange, order.create.order, entity);return ok;}}如何保证消息可靠性 消息丢失 消息丢失出现的原因 消息发送出去由于网络问题没有抵达MQ服务器(Broker) 发送者做好容错方法try-catch发送消息可能会网络失败失败后要有重试机制做好记录每个消息发送出去后都应该记录到数据库 消息抵达BrokerBroker要将消息写入磁盘持久化才算成功。此时Broker尚未持久化完成就宕机。 publisher必须加入确认回调机制回调后根据是否成功修改数据库消息的发送状态。做好定期重发定期去数据库扫描未成功的消息进行重发 自动ACK的状态下。消费者收到消息但没来得及处理完成消息宕机 一定开启手动ACK消费成功才移除失败或者没来得及处理就noAck并重新入队 防止消息丢失 做好消息确认机制publisherconsumer【手动ack】publisher将发送的消息都在数据库做好记录。定期从数据库扫描发送失败的消息将它们再次发送 比如给数据库创建如下表用来记录消息的发送状态 CREATE TABLE mq_message (message_id char(32) NOT NULL,content text,to_exchange varchar(255) DEFAULT NULL,routing_key varchar(255) DEFAULT NULL,class_type varchar(255) DEFAULT NULL,message_status int(1) DEFAULT 0 COMMENT 0-新建 1-已发送 2-错误抵达 3-已抵达,create_time datetime DEFAULT NULL,update_time datetime DEFAULT NULL,PRIMARY KEY (message_id) ) ENGINE InnoDBDEFAULT CHARSET utf8mb4消息重复 消息消费失败由于重试机制自动又将消息发送出去。【这种情况允许重复】成功消费手动ack时宕机或者网络原因等等消息由unack变为readyBroker又重新发送 将消费者的业务消费接口设计为幂等的即可。比如要解锁库存先判断状态使用防重表redis/mysql发送消息每一个都有业务的唯 一标识处理过就不用处理rabbitMQ的每一个消息都有redelivered字段可以获取是否是被重新投递过来的而不是第一次投递过来的Boolean redelivered message.getMessageProperties().getRedelivered(); 消息积压 消费者宕机积压消费者消费能力不足积压发送者发送流量太大 上线更多的消费者进行正常消费上线专门的消费服务将消息先批量取出来记录数据库离线慢慢处理 MQ对比 参考 雷丰阳: Java项目《谷粒商城》Java架构师 | 微服务 | 大型电商项目. 本文完感谢您的关注支持
http://www.tj-hxxt.cn/news/133033.html

相关文章:

  • 做网站后端要什么技术推广app赚佣金平台有哪些
  • 个人网站空间怎么做centum wordpress
  • 学什么专业可以做网站一个网站建立团队大概要多少钱
  • 建设网站用什么好家装设计师培训课程
  • 网站宣传wordpress书籍推荐
  • 网站关键词作用深圳搭建p2p网站
  • 科技网站 网站建设WordPress批量修改用户
  • 临沧市住房和城乡建设局门户网站青少年编程培训教育
  • 推荐十个国外网站购物网站开发价格
  • 建设厅网站上的信息采集表附近哪里需要招人
  • 泉州自助建站系统群晖配置wordpress 80端口
  • 智能建站加盟电话wordpress 获取页面
  • 企业在公司做的网站遇到的问题班级网站建设的系统概述
  • 网站全站搜索代码江门市智企互联网站建设
  • 周口规划建设局网站我的网站被黑了
  • 开o2o网站需要什么手续网站建设中搜索引擎
  • 局域网站建设模版校园网站建设报价
  • 深圳制作手机网站网站年费怎么做分录
  • 贵阳网站建设公司排行谷歌字体wordpress主题
  • 下载官方网站wordpress默认首页是什么
  • 一个虚拟主机可以放几个网站莆田个人仿牌外贸网站建设
  • 网站建设需要什么技能网店推广方式有哪些
  • 推广公司网站有哪些方式wordpress常用的插件
  • 大港油田建设网站重庆在线课程开放平台
  • 做新闻类网站还有市场吗0453牡丹江信息网官网
  • 上海技术做网站编程软件python下载
  • 用php做网站的新闻做个app商城类的要多少钱
  • 乐山做网站cms 类网站
  • 成都淮州新城建设投资有限公司网站注册深圳公司不在深圳经营
  • 做外贸网站怎么访问外国网站网站建设瀑布流