域名过户后怎么做网站,做网站硬件工程是什么,网站怎么做支付接口,开发公司 网站建设价格提示#xff1a;文章写完后#xff0c;目录可以自动生成#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、对RabbitMQ管理界面深入了解1、在这个界面里面我们可以做些什么#xff1f; 二、编码练习#xff08;1#xff09;使用direct exchange(直连型交换机)文章写完后目录可以自动生成如何生成可参考右边的帮助文档 文章目录 前言一、对RabbitMQ管理界面深入了解1、在这个界面里面我们可以做些什么 二、编码练习1使用direct exchange(直连型交换机)2使用Topic Exchange 主题交换机。3使用Fanout Exchang 扇型交换机。 三、消息确认种类A消息发送确认B 消费接收确认方式一通过配置类的方式实现方式二通过yml配置来完成消费者确认 前言
该篇文章内容较多包括有RabbitMQ一些理论介绍provider消息推送实例consumer消息消费实例Direct、Topic、Fanout多种交换机的使用同时简单介绍对消息回调、手动确认等。 这里面的每一种使用都包含实际编码示例供大家理解共同进步如有不足。还请指教。
一、对RabbitMQ管理界面深入了解
装完rabbitMq启动MQ后本地浏览器输入http://ip:15672/ 看到一个简单后台管理界面 对于其中的一些具体指标的解释
Ready 待消费的消息总数。Unacked 待应答的消息总数。Total总数 ReadyUnacked。Publish producter pub消息的速率。Publisher confirm broker确认pub消息的速率。Deliver(manual ack) customer手动确认的速率。Deliver( auto ack) customer自动确认的速率。Consumer ack customer正在确认的速率。Redelivered 正在传递’redelivered’标志集的消息的速率。Get (manual ack) 响应basic.get而要求确认的消息的传输速率。Get (auto ack) 响应于basic.get而发送不需要确认的消息的速率。Return 将basic.return发送给producter的速率。Disk read queue从磁盘读取消息的速率。Disk write queue从磁盘写入消息的速率。
Connectionsclient的tcp连接的总数。 Channels通道的总数。 Exchange交换器的总数。 Queues队列的总数。 Consumers消费者的总数。
更详细的可见 版权声明本文为博主原创文章遵循 CC 4.0 BY-SA 版权协议转载请附上原文出处链接和本声明。 本文链接https://blog.csdn.net/qq_19343089/article/details/135724659
1、在这个界面里面我们可以做些什么
可以手动创建虚拟host创建用户分配权限创建交换机创建队列等等还有查看队列消息消费效率推送效率等等。
以上这些管理界面的操作在这篇暂时不做扩展描述我想着重介绍后面实例里会使用到的。
首先先介绍一个简单的一个消息推送到接收的流程提供一个简单的图 黄色的圈圈就是我们的消息推送服务将消息推送到 中间方框里面也就是 rabbitMq的服务器然后经过服务器里面的交换机、队列等各种关系后面会详细讲将数据处理入列后最终右边的蓝色圈圈消费者获取对应监听的消息。
常用的交换机有以下三种因为消费者是从队列获取信息的队列是绑定交换机的一般所以对应的消息推送/接收模式也会有以下几种
- Direct Exchange
直连型交换机根据消息携带的路由键将消息投递给对应队列。
大致流程有一个队列绑定到一个直连交换机上同时赋予一个路由键 routing key 。 然后当一个消息携带着路由值为X这个消息通过生产者发送给交换机时交换机就会根据这个路由值X去寻找绑定值也是X的队列。
- Fanout Exchange
扇型交换机这个交换机没有路由键概念就算你绑了路由键也是无视的。 这个交换机在接收到消息后会直接转发到绑定到它上面的所有队列。
- Topic Exchange
主题交换机这个交换机其实跟直连交换机流程差不多但是它的特点就是在它的路由键和绑定键之间是有规则的。 简单地介绍下规则
(星号) * 用来表示一个单词 (必须出现的) (井号) # 用来表示任意数量零个或多个单词 通配的绑定键是跟队列进行绑定的举个小例子 队列Q1 绑定键为 .TT. 队列Q2绑定键为 TT.# 如果一条消息携带的路由键为 A.TT.B那么队列Q1将会收到 如果一条消息携带的路由键为TT.AA.BB那么队列Q2将会收到
主题交换机是非常强大的为啥这么膨胀 当一个队列的绑定键为 “#”井号 的时候这个队列将会无视消息的路由键接收所有的消息。 当 * (星号) 和 # (井号) 这两个特殊字符都未在绑定键中出现的时候此时主题交换机就拥有的直连交换机的行为。 所以主题交换机也就实现了扇形交换机的功能和直连交换机的功能。
另外还有 Header Exchange 头交换机 Default Exchange 默认交换机Dead Letter Exchange 死信交换机这几个该篇暂不做讲述。
好了一些简单的介绍到这里为止 接下来我们来一起编码。
二、编码练习
本次实例教程需要创建2个springboot项目一个 rabbitmq-provider 生产者一个rabbitmq-consumer消费者。【补充说明我这里模块名称创建错了其中生产者我创建成了rabbitmq-consumer消费者我这里叫做 rabbitmq-consumer-true】
首先创建 rabbitmq-provider
pom.xml里用到的jar依赖
?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0 xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersionparentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion2.6.3/versionrelativePath/ !-- lookup parent from repository --/parentgroupIdcom.atguigu.gulimall/groupIdartifactIdrabbitmq-consumer/artifactIdversion0.0.1-SNAPSHOT/versionnamerabbitmq-consumer/namedescriptionRabbitMQ生产者模块/descriptionurl/licenseslicense//licensesdevelopersdeveloper//developersscmconnection/developerConnection/tag/url//scmpropertiesjava.version1.8/java.version!-- spring-cloud.version2021.0.4/spring-cloud.version--spring-cloud.version2021.0.1/spring-cloud.version/propertiesdependenciesdependencygroupIdcom.atguigu.gulimall/groupIdartifactIdgulimall-common/artifactIdversion0.0.1-SNAPSHOT/versionexclusionsexclusionartifactIdservlet-api/artifactIdgroupIdjavax.servlet/groupId/exclusion/exclusions/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdscopetest/scope/dependencydependencygroupIdorg.springframework.amqp/groupIdartifactIdspring-rabbit-test/artifactIdscopetest/scope/dependency/dependenciesbuildpluginsplugingroupIdorg.springframework.boot/groupIdartifactIdspring-boot-maven-plugin/artifactId/plugin/plugins/build/project
然后application.yml
server:port: 8021#数据源配置
spring:datasource:username: rootpassword: rooturl: jdbc:mysql://192.168.56.10:3306/gulimall_umsdriver-class-name: com.mysql.cj.jdbc.Driver#注册到注册中心cloud:nacos:discovery:server-addr: 127.0.0.1:8848application:name: rabbitmq-consumer#配置rabbitMq 服务器rabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guest#虚拟host 可以不设置,使用server默认hostvirtual-host: /
# publisher-returns: true #确认消息已发送到队列(Queue) 这个在生产者模块配置 这个后期再配置这会还用不到
# publisher-confirm-type: correlated #确认消息已发送到交换机(Exchange) 这个在生产者模块配置 这个后期再配置这会还用不到logging:level:com.atguigu.gulimall: debug #调整product模块日志的输出模式是debug级别这样就能在控制台看到dao包下的输出日志了。
一定要注意 要注意 要注意 里面的virtual-host 是指RabbitMQ控制台中的下面的位置我理解是指你的队列和交换机在哪个分组下面可以为每一个项目创建单独的分组但是在此我没有单独创建直接放到了 / 下面 那么怎么建一个单独的host呢 假如我就是想给某个项目接入使用一个单独host顺便使用一个单独的账号就好像我文中配置的 root 这样。
其实也很简便
virtual-host的创建
账号user的创建
然后记得给账号分配权限指定使用某个virtual host 指定给自己刚刚为某个项目单独创建的virtual host。
其实还可以特定指定交换机使用权等等
1使用direct exchange(直连型交换机)
创建DirectRabbitConfig.java对于队列和交换机持久化以及连接使用设置在注释里有说明后面的不同交换机的配置就不做同样说明了
package com.atguigu.gulimall.rabbitmqconsumer.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.amqp.core.Queue;/*** 这里使用的是direct exchange(直连型交换机), 也就是交换机和队列是一对一关系* 模拟 rabbitmq-provider 生产者,这里模块名字写错了。这个是消息生产者** author: jd* create: 2024-06-24*/
Configuration
public class DirectRabbitConfig {// 声明需要使用的交换机/路由Key/队列的名称public static final String DEFAULT_EXCHANGE TestDirectExchange;public static final String DEFAULT_ROUTE TestDirectRouting;public static final String DEFAULT_QUEUE TestDirectQueue;// 声明交换机需要几个声明几个这里就一个Beanpublic DirectExchange directExchange(){return new DirectExchange(DEFAULT_EXCHANGE);}//创建队列//队列 起名TestDirectQueueBeanpublic Queue TestDirectQueue(){// durable:是否持久化,默认是false,持久化队列会被存储在磁盘上当消息代理重启时仍然存在暂存队列当前连接有效// exclusive:默认也是false只能被当前创建的连接使用而且当连接关闭后队列即被删除。此参考优先级高于durable// autoDelete:是否自动删除当没有生产者或者消费者使用此队列该队列会自动删除。// return new Queue(TestDirectQueue,true,true,false);//一般设置一下队列的持久化就好,其余两个就是默认falsereturn new Queue(DEFAULT_QUEUE,true);}//绑定交换机和队列并指定路由键//绑定 将队列和交换机绑定, 并设置用于匹配键TestDirectRoutingBinding bindingDirect(){return BindingBuilder.bind(TestDirectQueue()).to(directExchange()).with(DEFAULT_ROUTE);}/*** 这个是做什么用的 ,为了后面 生产者确认那找到交换机找不到队列用的* return*/BeanDirectExchange lonelyDirectExchange() {return new DirectExchange(lonelyDirectExchange);}}
然后写个简单的接口进行消息推送根据需求也可以改为定时任务等等具体看需求SendMessageController.java
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;/*** 模拟 rabbitmq-provider 生产者 这里模块名字写错了。这个是消息生产者一般消息的生产者会直接在业务层调用* 不会单独的搞一个消息生产者这里因为没有业务调用去调用这个MQ的生产者所以这里直接创建一个模块模拟消息生产者** 发送消息控制器MQ入消息的入口* //原文链接https://blog.csdn.net/qq_35387940/article/details/100514134* author: jd* create: 2024-06-24*/
RestController
public class SendMessageController {AutowiredRabbitTemplate rabbitTemplate; //使用RabbitTemplate,这提供了接收/发送等等方法/*** 通过postman发送消息给消息队列-直流交换机* return*/GetMapping(/sendDirectMessage)String sendDirectMessage(){String messageId String.valueOf(UUID.randomUUID());String messageData test message, hello!;String createTime LocalDateTime.now().format(DateTimeFormatter.ofPattern(yyyy-MM-dd HH:mm:ss));MapString,Object mapnew HashMap();map.put(messageId,messageId);map.put(messageData,messageData);
// map.put(messageData,666666);map.put(createTime,createTime);//将消息携带绑定键值TestDirectRouting 发送到交换机TestDirectExchangerabbitTemplate.convertAndSend(TestDirectExchange, TestDirectRouting, map);// //生产者发送字符串类型消息则后面的消息消费者也需要接受字符串类型的入参进行消费
// rabbitTemplate.convertAndSend(TestDirectExchange, TestDirectRouting, 77777);System.out.println(调用完毕);return ok;}}把rabbitmq-provider项目运行调用下接口: 因为我们目前还没弄消费者 rabbitmq-consumer消息没有被消费的我们去rabbitMq管理页面看看是否推送成功我这里发送了三次所以有三个消息积压了
再看看队列界面上的各个英文项代表什么意思可以自己查查哈对理解还是有帮助的
很好消息已经推送到rabbitMq服务器上面了。
接下来创建rabbitmq-consumer项目
pom.xml里的jar依赖
?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0 xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersionparentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion2.6.3/versionrelativePath/ !-- lookup parent from repository --/parentgroupIdcom.atguigu.gulimall/groupIdartifactIdrabbitmq-consumer-true/artifactIdversion0.0.1-SNAPSHOT/versionnamerabbitmq-consumer-true/namedescriptionRabbitMQ消费者模块/descriptionurl/licenseslicense//licensesdevelopersdeveloper//developersscmconnection/developerConnection/tag/url//scmpropertiesjava.version1.8/java.version!-- spring-cloud.version2021.0.4/spring-cloud.version--spring-cloud.version2021.0.1/spring-cloud.version/propertiesdependenciesdependencygroupIdcom.atguigu.gulimall/groupIdartifactIdgulimall-common/artifactIdversion0.0.1-SNAPSHOT/versionexclusionsexclusionartifactIdservlet-api/artifactIdgroupIdjavax.servlet/groupId/exclusion/exclusions/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdscopetest/scope/dependencydependencygroupIdorg.springframework.amqp/groupIdartifactIdspring-rabbit-test/artifactIdscopetest/scope/dependency/dependenciesbuildpluginsplugingroupIdorg.springframework.boot/groupIdartifactIdspring-boot-maven-plugin/artifactId/plugin/plugins/build/project
然后是 application.yml
server:port: 8022#数据源配置
spring:datasource:url: jdbc:mysql://192.168.56.10:3306/gulimall_umsusername: rootpassword: rootdriver-class-name: com.mysql.cj.jdbc.Driver#配置nacoscloud:nacos:discovery:server-addr: 127.0.0.1#配置服务名称application:name: rabbitmq-consumer-true# 配置rabbitMq 服务器#spring.application.namerabbitmq-consumer-truerabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guest#虚拟host 可以不设置,使用server默认hostvirtual-host: /
# listener: #这个在测试消费多个消息的时候不能有下面这些配置否则只能消费一个消息后就不继续消费了
# simple:
# acknowledge-mode: manual #指定MQ消费者的确认模式是手动确认模式 这个在消费者者模块配置
# prefetch: 1 #一次只能消费一条消息 这个在消费者者模块配置#配置日志输出级别
logging:level:com.atguigu.gulimall: debug#配置日志级别然后一样创建DirectRabbitConfig.java消费者单纯的使用其实可以不用添加这个配置直接建后面的监听就好使用注解来让监听器监听对应的队列即可。配置上了的话其实消费者也是生成者的身份也能推送该消息。
package com.atguigu.gulimall.consumertrue.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 消费者配置类** 原文链接https://blog.csdn.net/qq_35387940/article/details/100514134* 创建DirectRabbitConfig.java 关于队列的配置只是消息的生产者中配置即可。这个消费者不用配置配置了的话就也可以当成生产者了* 消费者单纯的使用其实可以不用添加这个配置直接建后面的监听就好* 使用注解来让监听器监听对应的队列即可。配置上了的话其实消费者也是生成者的身份也能推送该消息。** author: jd* create: 2024-06-25*/
Configuration
public class DirectRabbitConfig {// 声明需要使用的交换机/路由Key/队列的名称public static final String DEFAULT_EXCHANGE TestDirectExchange;public static final String DEFAULT_ROUTE TestDirectRouting;public static final String DEFAULT_QUEUE TestDirectQueue;//队列 起名TestDirectQueueBeanpublic Queue TestDirectQueue() {return new Queue(DEFAULT_QUEUE,true);}//Direct交换机 起名TestDirectExchangeBeanDirectExchange TestDirectExchange() {return new DirectExchange(DEFAULT_EXCHANGE);}//绑定 将队列和交换机绑定, 并设置用于匹配键TestDirectRoutingBeanBinding bindingDirect() {return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with(DEFAULT_ROUTE);}}
然后是创建消息接收监听类RabbitMQListener.java
package com.atguigu.gulimall.consumertrue.listener;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Map;/*** 消息消费监听类* author: jd* create: 2024-06-25*/
Component
Slf4j
RabbitListener(queues TestDirectQueue)//监听的队列名称 TestDirectQueue
public class RabbitMQListener {/*** 当消息发送者发送的是Map的时候通过这个消息处理器进行处理* param testMessage*/RabbitHandler(isDefault true)public void process(Map testMessage) {System.out.println(RabbitMQListener消费者收到消息 : testMessage.toString());}/*** 当消息发送者发送的是String类型的时候用这个监听处理器去接受消息并处理* param testMessage*//* RabbitHandler(isDefault true)public void process(String testMessage) {System.out.println(DirectReceiver消费者收到消息 : testMessage);//正常开发中会在消费到消息之后开始做一些业务处理//模拟业务处理//业务开始String str testMessage --消费成功;System.out.println(业务处理完毕str);//业务结束}*/}
然后将rabbitmq-consumer-true项目运行起来可以看到把之前推送的那条消息消费下来了 然后可以再继续调用rabbitmq-consumer项目的推送消息接口可以看到消费者即时消费消息 消费下来了 那么直连交换机既然是一对一那如果咱们配置多台监听绑定到同一个直连交互的同一个队列会怎么样 消费的结果如下
可以看到是实现了轮询的方式对消息进行消费而且不存在重复消费。
2使用Topic Exchange 主题交换机。
在rabbitmq-consume项目里面创建TopicRabbitConfig.java
package com.atguigu.gulimall.rabbitmqconsumer.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 使用Topic Exchange 主题交换机。** author: jd* create: 2024-06-25*/
Configuration
public class TopicRabbitConfig {//设置绑定键public static final String man topic.man;public static final String woman topic.woman;public static final String TOPIC_EXCHANGE topicExchange;//创建队列/*** 第一个主题队列** return*/Beanpublic Queue firstQueue() {return new Queue(man);}/*** 第二个主题队列** return*/Beanpublic Queue secondQueue() {return new Queue(woman);}/*** 创建一个主题交换机** return TopicExchange*/BeanTopicExchange exchange() {return new TopicExchange(TOPIC_EXCHANGE);}/*** //将firstQueue和topicExchange绑定,而且绑定的键值为topic.man* //这样只要是消息携带的路由键是topic.man,才会分发到该队列** return*/BeanBinding bindingExchangeMessageForFirstQueue() {return BindingBuilder.bind(firstQueue()).to(exchange()).with(man);}/*** //将secondQueue和topicExchange绑定,而且绑定的键值为用上通配路由键规则topic.#* // 这样只要是消息携带的路由键是以topic.开头,都会分发到该队列** return*/BeanBinding bindingExchangeMessageForSecondQueue() {return BindingBuilder.bind(secondQueue()).to(exchange()).with(topic.#);}}
然后添加多2个接口用于推送消息到主题交换机 // 然后添加多2个接口用于推送消息到主题交换机找那个再主题交换机中通过设置的路由键来推送到主题为topic.man的队列中以供消费
// https://blog.csdn.net/qq_35387940/article/details/100514134/*** 用于向MQ发送携带topic.man路由键的消息* return*/GetMapping(/sendTopicMessageToMan)public String sendTopicMessageToMan(){String messageId String.valueOf(UUID.randomUUID());String messageData send topic message to man;String createTime LocalDateTime.now().format(DateTimeFormatter.ofPattern(yyyy-MM-dd HH:mm:ss));MapString,Object mapnew HashMap();map.put(QueueConstant.MESSAGE_ID,messageId);map.put(QueueConstant.MESSAGE_DATA,messageData);map.put(QueueConstant.MESSAGE_TIME,createTime);rabbitTemplate.convertAndSend(TopicRabbitConfig.TOPIC_EXCHANGE,TopicRabbitConfig.man,map);System.out.println(sendTopicMessageToMan() 执行成功);return sendTopicMessageToMan is ok;}/*** 用于向MQ发送携带topic.woman路由键的消息。 这样会在exchange中去找绑定中这个路由键绑定的队列并向其中进行转发* topic.# 这个是通用的绑定规则只要是携带着topic.开头的就会转发到绑定的这个队列中* https://blog.csdn.net/qq_35387940/article/details/100514134* return*/GetMapping(/sendTopicMessageToTotal)public String sendTopicMessageToTotal(){String messageId String.valueOf(UUID.randomUUID());String messageData send topic message to woman;String createTime LocalDateTime.now().format(DateTimeFormatter.ofPattern(yyyy-MM-dd HH:mm:ss));MapString,Object mapnew HashMap();map.put(QueueConstant.MESSAGE_ID,messageId);map.put(QueueConstant.MESSAGE_DATA,messageData);map.put(QueueConstant.MESSAGE_TIME,createTime);
// rabbitTemplate.convertAndSend(TopicRabbitConfig.TOPIC_EXCHANGE,TopicRabbitConfig.woman,map);rabbitTemplate.convertAndSend(TopicRabbitConfig.TOPIC_EXCHANGE,topic.woman1,map); //测试携带路由键符合topic.#的是否能转发到topic.woman的队列System.out.println(sendTopicMessageToTotal() 执行成功);return sendTopicMessageToTotal is ok;}
生产者这边已经完事先不急着运行在rabbitmq-consumer-true项目上创建TopicManListener.java
package com.atguigu.gulimall.consumertrue.listener;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Map;/**
主题交换机 监听topic.man队列* author: jd* create: 2024-06-25*/
Component
Slf4j
RabbitListener(queues topic.man)//监听的队列名称 TestDirectQueue
public class TopicManListener {RabbitHandlerpublic void process(Map testMessage) {System.out.println(TopicManListener主题消费者收到消息 : testMessage.toString());}}
再创建一个TopicTotalListener.java
package com.atguigu.gulimall.consumertrue.listener;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Map;/*** author: jd* create: 2024-06-25*/Component
Slf4j
RabbitListener(queues topic.woman)
public class TopicTotalListener {RabbitHandlerpublic void process(Map testMessage){System.out.println(TopicTotalListener主题消费者收到消息 : testMessage.toString());}
}
同样加主题交换机的相关配置TopicRabbitConfig.java消费者一定要加这个配置吗 不需要的其实理由在前面已经说过了。
package com.atguigu.gulimall.rabbitmqconsumer.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 使用Topic Exchange 主题交换机。** author: jd* create: 2024-06-25*/
Configuration
public class TopicRabbitConfig {//设置绑定键public static final String man topic.man;public static final String woman topic.woman;public static final String TOPIC_EXCHANGE topicExchange;//创建队列/*** 第一个主题队列** return*/Beanpublic Queue firstQueue() {return new Queue(man);}/*** 第二个主题队列** return*/Beanpublic Queue secondQueue() {return new Queue(woman);}/*** 创建一个主题交换机** return TopicExchange*/BeanTopicExchange exchange() {return new TopicExchange(TOPIC_EXCHANGE);}/*** //将firstQueue和topicExchange绑定,而且绑定的键值为topic.man* //这样只要是消息携带的路由键是topic.man,才会分发到该队列** return*/BeanBinding bindingExchangeMessageForFirstQueue() {return BindingBuilder.bind(firstQueue()).to(exchange()).with(man);}/*** //将secondQueue和topicExchange绑定,而且绑定的键值为用上通配路由键规则topic.#* // 这样只要是消息携带的路由键是以topic.开头,都会分发到该队列** return*/BeanBinding bindingExchangeMessageForSecondQueue() {return BindingBuilder.bind(secondQueue()).to(exchange()).with(topic.#);}}
然后把rabbitmq-consumerrabbitmq-consumer-true两个项目都跑起来先调用/sendTopicMessage1 接口
然后看消费者rabbitmq-consumer的控制台输出情况 TopicManReceiver监听队列1绑定键为topic.man TopicTotalReceiver监听队列2绑定键为topic.# 而当前推送的消息携带的路由键为topic.man
所以可以看到两个监听消费者receiver都成功消费到了消息因为这两个recevier监听的队列的绑定键都能与这条消息携带的路由键匹配上。 接下来调用接口/sendTopicMessage2: 然后看消费者rabbitmq-consumer的控制台输出情况 TopicManReceiver监听队列1绑定键为topic.man TopicTotalReceiver监听队列2绑定键为topic.# 而当前推送的消息携带的路由键为topic.woman
所以可以看到两个监听消费者只有TopicTotalReceiver成功消费到了消息。
3使用Fanout Exchang 扇型交换机。
同样地先在rabbitmq-provider项目上创建FanoutRabbitConfig.java
package com.atguigu.gulimall.rabbitmqconsumer.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 使用Fanout Exchang 扇型交换机* author: jd* create: 2024-06-25*/
Configuration
public class FanoutRabbitConfig {//队列名称public static final String FANOUT_QUEUE_A fanout.a;public static final String FANOUT_QUEUE_B fanout.b;public static final String FANOUT_QUEUE_C fanout.c;public static final String FANOUT_EXCHANGE fanout.exchange;//创建队列 FANOUT_QUEUE_ABeanpublic Queue queueA(){return new Queue(FANOUT_QUEUE_A,true);}//创建队列 FANOUT_QUEUE_BBeanpublic Queue queueB(){return new Queue(FANOUT_QUEUE_B);}//创建队列 FANOUT_QUEUE_CBeanpublic Queue queueC(){return new Queue(FANOUT_QUEUE_C);}//创建交换机Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange(FANOUT_EXCHANGE);}//绑定将多有的队列都绑定到这个交换机BeanBinding bindingExchangeA() {return BindingBuilder.bind(queueA()).to(fanoutExchange());}BeanBinding bindingExchangeB() {return BindingBuilder.bind(queueB()).to(fanoutExchange());}BeanBinding bindingExchangeC() {return BindingBuilder.bind(queueC()).to(fanoutExchange());}}
然后是写一个接口用于推送消息 /*** 发送消息给扇形交换机 扇型交换机* return*/GetMapping(/sendFanoutMessage)public String sendFanoutMessage(){String messageId String.valueOf(UUID.randomUUID());String messageData message: testFanoutMessage ;String createTime LocalDateTime.now().format(DateTimeFormatter.ofPattern(yyyy-MM-dd HH:mm:ss));MapString, Object map new HashMap();map.put(QueueConstant.MESSAGE_ID,messageId);map.put(QueueConstant.MESSAGE_DATA,messageData);map.put(QueueConstant.MESSAGE_TIME,createTime);rabbitTemplate.convertAndSend(FanoutRabbitConfig.FANOUT_EXCHANGE,null,map);System.out.println(sendFanoutMessage() 执行成功);return sendFanoutMessage is ok;}
接着在rabbitmq-consumer-true项目里加上消息消费类
FanoutReceiverA.java FanoutReceiverB.java FanoutReceiverC.java
package com.atguigu.gulimall.consumertrue.listener;import com.atguigu.gulimall.consumertrue.config.FanoutRabbitConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Map;/*** 扇形交换机-队列A的监听器,及监听到消息后的处理器* author: jd* create: 2024-06-25*/
Component
Slf4j
RabbitListener(queues FanoutRabbitConfig.FANOUT_QUEUE_A)
public class FanoutReceiverA {RabbitHandlerpublic void process(Map message){System.out.println(FanoutReceiverA消费者收到消息 : message.toString());}}
package com.atguigu.gulimall.consumertrue.listener;import com.atguigu.gulimall.consumertrue.config.FanoutRabbitConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Map;/*** 扇形交换机-队列B的监听器,及监听到消息后的处理器* author: jd* create: 2024-06-25*/
Component
Slf4j
RabbitListener(queues FanoutRabbitConfig.FANOUT_QUEUE_B)
public class FanoutReceiverB {RabbitHandlerpublic void process(Map message){System.out.println(FanoutReceiverB消费者收到消息 : message.toString());}
}
package com.atguigu.gulimall.consumertrue.listener;/*** author: jd* create: 2024-06-25*/import com.atguigu.gulimall.consumertrue.config.FanoutRabbitConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Map;/*** 扇形交换机-队列B的监听器,及监听到消息后的处理器* author: jd* create: 2024-06-25*/
Component
Slf4j
RabbitListener(queues FanoutRabbitConfig.FANOUT_QUEUE_C)
public class FanoutReceiverC {RabbitHandlerpublic void process(Map message){System.out.println(FanoutReceiverC消费者收到消息 : message.toString());}
}
然后加上扇型交换机的配置类FanoutRabbitConfig.java消费者真的要加这个配置吗 不需要的其实理由在前面已经说过了
package com.atguigu.gulimall.consumertrue.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 使用Fanout Exchang 扇型交换机* author: jd* create: 2024-06-25*/
Configuration
public class FanoutRabbitConfig {//队列名称public static final String FANOUT_QUEUE_A fanout.a;public static final String FANOUT_QUEUE_B fanout.b;public static final String FANOUT_QUEUE_C fanout.c;public static final String FANOUT_EXCHANGE fanout.exchange;//创建队列 FANOUT_QUEUE_ABeanpublic Queue queueA(){return new Queue(FANOUT_QUEUE_A,true);}//创建队列 FANOUT_QUEUE_BBeanpublic Queue queueB(){return new Queue(FANOUT_QUEUE_B);}//创建队列 FANOUT_QUEUE_CBeanpublic Queue queueC(){return new Queue(FANOUT_QUEUE_C);}//创建交换机Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange(FANOUT_EXCHANGE);}//绑定将多有的队列都绑定到这个交换机BeanBinding bindingExchangeA() {return BindingBuilder.bind(queueA()).to(fanoutExchange());}BeanBinding bindingExchangeB() {return BindingBuilder.bind(queueB()).to(fanoutExchange());}BeanBinding bindingExchangeC() {return BindingBuilder.bind(queueC()).to(fanoutExchange());}}
最后将rabbitmq-provider和rabbitmq-consumer项目都跑起来调用下接口/sendFanoutMessage 可以看到只要发送到 fanoutExchange 这个扇型交换机的消息 三个队列都绑定这个交换机所以三个消息接收类都监听到了这条消息。 到了这里其实三个常用的交换机的使用我们已经完毕了那么接下来我们继续讲讲消息的回调其实就是消息确认生产者推送消息成功消费者接收消息成功。
三、消息确认种类
RabbitMQ的消息确认有两种。
一种是消息发送确认。这种是用来确认生产者将消息发送给交换器交换器传递给队列的过程中消息是否成功投递。发送确认分为两步一是确认是否到达交换器二是确认是否到达队列。
第二种是消费接收确认。这种是确认消费者是否成功消费了队列中的消息。
消息确认的作用是什么
为了防止消息丢失。消息丢失分为发送丢失和消费者处理丢失相应的也有两种确认机制。
先来一起学习一下
A消息发送确认
在rabbitmq-consumer项目的application.yml文件上加上消息确认的配置项后
server:port: 8021#数据源配置
spring:datasource:username: rootpassword: rooturl: jdbc:mysql://192.168.56.10:3306/gulimall_umsdriver-class-name: com.mysql.cj.jdbc.Driver#注册到注册中心cloud:nacos:discovery:server-addr: 127.0.0.1:8848application:name: rabbitmq-consumer#配置rabbitMq 服务器rabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guest#虚拟host 可以不设置,使用server默认hostvirtual-host: /publisher-returns: true #确认消息已发送到队列(Queue) 这个在生产者模块配置 这个后期再配置这会还用不到publisher-confirm-type: correlated #确认消息已发送到交换机(Exchange) 这个在生产者模块配置 这个后期再配置这会还用不到logging:level:com.atguigu.gulimall: debug #调整product模块日志的输出模式是debug级别这样就能在控制台看到dao包下的输出日志了。
然后是配置相关的消息确认回调函数RabbitConfig.java
package com.atguigu.gulimall.rabbitmqconsumer.config;import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 配置相关的消息确认回调函数RabbitConfig.java* https://blog.csdn.net/qq_35387940/article/details/100514134** 先从总体的情况分析推送消息存在四种情况** ①消息推送到server但是在server里找不到交换机* ②消息推送到server找到交换机了但是没找到队列* ③消息推送到sever交换机和队列啥都没找到* ④消息推送成功* 具体哪些会触发回调分别又会触发哪个函数看下面的测试** author: jd* create: 2024-06-25*/
Configuration
public class RabbitConfig {Beanpublic RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);//设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数rabbitTemplate.setMandatory(true);rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback(){Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println(ConfirmCallback: 相关数据correlationData);System.out.println(ConfirmCallback: 确认情况ack);System.out.println(ConfirmCallback: 原因cause);}});rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {System.out.println(ReturnCallback: 消息returnedMessage.getMessage());System.out.println(ReturnCallback: 回应码returnedMessage.getReplyCode());System.out.println(ReturnCallback: 回应信息returnedMessage.getReplyText());System.out.println(ReturnCallback: 交换机returnedMessage.getExchange());System.out.println(ReturnCallback: 路由键returnedMessage.getRoutingKey());}});return rabbitTemplate;}
}
到这里生产者推送消息的消息确认调用回调函数已经完毕。 可以看到上面写了两个回调函数一个叫 ConfirmCallback 一个叫 RetrunCallback 那么以上这两种回调函数都是在什么情况会触发呢
先从总体的情况分析推送消息存在四种情况
①消息推送到server但是在server里找不到交换机 ②消息推送到server找到交换机了但是没找到队列 ③消息推送到sever交换机和队列啥都没找到 ④消息推送成功
那么我先写几个接口来分别测试和认证下以上4种情况消息确认触发回调函数的情况
①消息推送到server但是在server里找不到交换机 是否到达交换机 写个测试接口把消息推送到名为‘non-existent-exchange’的交换机上这个交换机是没有创建没有配置的
/*** ①消息推送到server但是在server里找不到交换机** 写个测试接口把消息推送到名为‘non-existent-exchange’的交换机上这个交换机是没有创建没有配置的* 调用接口查看rabbitmq-provuder项目的控制台输出情况原因里面有说没有找到交换机non-existent-exchange*在控制台中* 调用后返回http://localhost:8021/TestMessageAck*ConfirmCallback: 相关数据null* ConfirmCallback: 确认情况false* ConfirmCallback: 原因channel error; protocol method: #methodchannel.close(reply-code404,* reply-textNOT_FOUND - no exchange non-existent-exchange in vhost /, class-id60, method-id40)** 结论 ①这种情况触发的是 ConfirmCallback 回调函数* return*/GetMapping(/TestMessageAck)public String TestMessageAck() {String messageId String.valueOf(UUID.randomUUID());String messageData message: non-existent-exchange test message ;String createTime LocalDateTime.now().format(DateTimeFormatter.ofPattern(yyyy-MM-dd HH:mm:ss));MapString, Object map new HashMap();map.put(messageId, messageId);map.put(messageData, messageData);map.put(createTime, createTime);rabbitTemplate.convertAndSend(non-existent-exchange, TestDirectRouting, map);return ok;}
调用接口查看rabbitmq-provuder项目的控制台输出情况原因里面有说没有找到交换机’non-existent-exchange’ 结论 ①这种情况触发的是 ConfirmCallback 回调函数。
②消息推送到server找到交换机了但是没找到队列 是否到达队列 这种情况就是需要新增一个交换机但是不给这个交换机绑定队列我来简单地在DirectRabitConfig里面新增一个直连交换机名叫‘lonelyDirectExchange’但没给它做任何绑定配置操作 BeanDirectExchange lonelyDirectExchange() {return new DirectExchange(lonelyDirectExchange);}然后写个测试接口把消息推送到名为‘lonelyDirectExchange’的交换机上这个交换机是没有任何队列配置的
/*** ②消息推送到server找到交换机了但是没找到队列* 这种情况就是需要新增一个交换机但是不给这个交换机绑定队列* 我来简单地在DirectRabitConfig里面新增一个直连交换机名叫‘lonelyDirectExchange’但没给它做任何绑定配置操作** 然后写个测试接口把消息推送到名为‘lonelyDirectExchange’的交换机上这个交换机是没有任何队列配置的**可以看到这种情况在控制台中 两个函数都被调用了* 这种情况下消息是推送成功到服务器了的所以ConfirmCallback对消息确认情况是true* 而在RetrunCallback回调函数的打印参数里面可以看到消息是推送到了交换机成功了但是在路由分发给队列的时候找不到队列所以报了错误 NO_ROUTE 。** 调用后返回http://localhost:8021/TestMessageAck2* ReturnCallback: 回应码312* ReturnCallback: 回应信息NO_ROUTE* ReturnCallback: 交换机lonelyDirectExchange* ReturnCallback: 路由键TestDirectRouting* ConfirmCallback: 相关数据null* ConfirmCallback: 确认情况true* ConfirmCallback: 原因null** 结论②这种情况触发的是 ConfirmCallback和RetrunCallback两个回调函数。* return*/GetMapping(/TestMessageAck2)public String TestMessageAck2() {String messageId String.valueOf(UUID.randomUUID());String messageData message: lonelyDirectExchange test message ;String createTime LocalDateTime.now().format(DateTimeFormatter.ofPattern(yyyy-MM-dd HH:mm:ss));MapString, Object map new HashMap();map.put(messageId, messageId);map.put(messageData, messageData);map.put(createTime, createTime);rabbitTemplate.convertAndSend(lonelyDirectExchange, TestDirectRouting, map); //lonelyDirectExchange这个交换机没有和任何队列做绑定return ok;}调用接口查看rabbitmq-provuder项目的控制台输出情况
ConfirmCallback: 相关数据null
ConfirmCallback: 确认情况true
ConfirmCallback: 原因null
ReturnCallback: 消息(Body:[serialized object] MessageProperties [headers{}, contentTypeapplication/x-java-serialized-object, contentLength0, receivedDeliveryModePERSISTENT, priority0, deliveryTag0])
ReturnCallback: 回应码312
ReturnCallback: 回应信息NO_ROUTE
ReturnCallback: 交换机lonelyDirectExchange
ReturnCallback: 路由键TestDirectRouting
可以看到这种情况两个函数都被调用了 这种情况下消息是推送成功到服务器了的所以ConfirmCallback对消息确认情况是true 而在RetrunCallback回调函数的打印参数里面可以看到消息是推送到了交换机成功了但是在路由分发给队列的时候找不到队列所以报了错误 NO_ROUTE 。 结论②这种情况触发的是 ConfirmCallback和RetrunCallback两个回调函数。
③消息推送到sever交换机和队列啥都没找到 这种情况其实一看就觉得跟①很像没错 ③和①情况回调是一致的所以不做结果说明了。 结论 ③这种情况触发的是 ConfirmCallback 回调函数。
④消息推送成功 那么测试下按照正常调用之前消息推送的接口就行就调用下 /sendFanoutMessage接口可以看到控制台输出
ConfirmCallback: 相关数据null
ConfirmCallback: 确认情况true
ConfirmCallback: 原因null结论 ④这种情况触发的是 ConfirmCallback 回调函数。
总结 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback(){}通过设置这个参数其中使用内部类进行实现来记录消息发送到交换器Exchange后触发回调。 使用该功能需要开启确认 publisher-confirm-type: correlated #确认消息已发送到交换机(Exchange) 这个在生产者模块配置
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback(){}通过设置这个参数如果消息从交换器发送到对应队列失败时触发比如根据发送消息时指定的routingKey找不到队列时会触发 publisher-returns: true #确认消息已发送到队列(Queue) 这个在生产者模块配置
以上是生产者推送消息的消息确认 回调函数的使用介绍可以在回调函数根据需求做对应的扩展或者业务数据处理。
B 消费接收确认
接下来我们继续 消费者接收到消息的消息确认机制。
1确认模式
AcknowledgeMode.NONE不确认 AcknowledgeMode.AUTO自动确认 AcknowledgeMode.MANUAL手动确认 spring-boot中配置方法
spring.rabbitmq.listener.simple.acknowledge-mode manual
2手动确认 未确认的消息数
上图为channel中未被消费者确认的消息数。
通过RabbitMQ的host地址加上默认端口号15672访问管理界面。
2.1成功确认
void basicAck(long deliveryTag, boolean multiple) throws IOException;
deliveryTag:该消息的index
multiple是否批量. true将一次性ack所有小于deliveryTag的消息。
消费者成功处理后调用channel.basicAck(message.getMessageProperties().getDeliveryTag(), false)方法对消息进行确认。
2.2失败确认
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
throws IOException;
deliveryTag:该消息的index。
multiple是否批量. true将一次性拒绝所有小于deliveryTag的消息。
requeue被拒绝的是否重新入队列。
void basicReject(long deliveryTag, boolean requeue) throws IOException;
deliveryTag:该消息的index。
requeue被拒绝的是否重新入队列。
channel.basicNack 与 channel.basicReject 的区别在于basicNack可以批量拒绝多条消息而basicReject一次只能拒绝一条消息。
①自动确认 这也是默认的消息确认情况。 AcknowledgeMode.NONE
RabbitMQ成功将消息发出即将消息成功写入TCP Socket中立即认为本次投递已经被正确处理不管消费者端是否成功处理本次投递。
所以这种情况如果消费端消费逻辑抛出异常也就是消费端没有处理成功这条消息那么就相当于丢失了消息。
一般这种情况我们都是使用try catch捕捉异常后打印日志用于追踪数据这样找出对应数据再做后续处理。② 根据情况确认 这个不做介绍
③ 手动确认 这个比较关键也是我们配置接收消息确认机制时多数选择的模式。
消费者收到消息后手动调用basic.ack/basic.nack/basic.reject后RabbitMQ收到这些消息后才认为本次投递成功。
basic.ack用于肯定确认
basic.nack用于否定确认注意这是AMQP 0-9-1的RabbitMQ扩展
basic.reject用于否定确认但与basic.nack相比有一个限制:一次只能拒绝单条消息 消费者端以上的3个方法都表示消息已经被正确投递但是basic.ack表示消息已经被正确处理。
而basic.nack,basic.reject表示没有被正确处理着重讲下reject因为有时候一些场景是需要重新入列的。channel.basicReject(deliveryTag, true); 拒绝消费当前消息如果第二参数传入true就是将数据重新丢回队列里那么下次还会消费这消息。设置false就是告诉服务器我已经知道这条消息数据了因为一些原因拒绝它而且服务器也把这个消息丢掉就行。 下次不想再消费这条消息了。使用拒绝后重新入列这个确认模式要谨慎因为一般都是出现异常的时候catch异常再拒绝入列选择是否重入列。但是如果使用不当会导致一些每次都被你重入列的消息一直消费-入列-消费-入列这样循环会导致消息积压。顺便也简单讲讲 nack这个也是相当于设置不消费某条消息。channel.basicNack(deliveryTag, false, true);
第一个参数依然是当前消息到的数据的唯一id;
第二个参数是指是否针对多条消息如果是true也就是说一次性针对当前通道的消息的tagID小于当前这条消息的都拒绝确认。
第三个参数是指是否重新入列也就是指不确认的消息是否重新丢回到队列里面去。同样使用不确认后重新入列这个确认模式要谨慎因为这里也可能因为考虑不周出现消息一直被重新丢回去的情况导致积压。看了上面这么多介绍接下来我们一起配置下看看一般的消息接收 手动确认是怎么样的。
方式一通过配置类的方式实现
此时还不需要加下面的配置因为这种方式是通过 配置类注解来配置的手动消费者确认再下面的方式二则是通过yml的配置来设置的消费者手动确认我们先来看方式一是怎么实现的
在消费者项目里 新建MessageListenerConfig.java上添加代码相关的配置代码
package com.atguigu.gulimall.consumertrue.config;import com.atguigu.gulimall.consumertrue.listener.MyAckReceiver;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 一般的消息接收 手动确认是怎么样的消费者的手动消息确认配置类* https://blog.csdn.net/qq_35387940/article/details/100514134* author: jd* create: 2024-06-25*/
//Configuration //注释掉这个注解这样第一种MQ消费者的确认模式就失效了以为你这个里面配置着对某个队列的监控呢。 第二种MQ的配置方式的话和这个的区别不用这种配置类而是在yml中配置东西
public class MessageListenerConfig {Autowiredprivate CachingConnectionFactory connectionFactory;Autowiredprivate MyAckReceiver myAckReceiver;//消息接收处理类Beanpublic SimpleMessageListenerContainer simpleMessageListenerContainer(){SimpleMessageListenerContainer container new SimpleMessageListenerContainer(connectionFactory);container.setConcurrentConsumers(1);container.setMaxConcurrentConsumers(1);container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // RabbitMQ默认是自动确认这里改为手动确认消息//设置一个队列,在这里设置了队列container.setQueueNames(TestDirectQueue);//如果同时设置多个如下 前提是队列都是必须已经创建存在的// container.setQueueNames(TestDirectQueue,TestDirectQueue2,TestDirectQueue3);//另一种设置队列的方法,如果使用这种情况,那么要设置多个,就使用addQueues//container.setQueues(new Queue(TestDirectQueue,true));//container.addQueues(new Queue(TestDirectQueue2,true));//container.addQueues(new Queue(TestDirectQueue3,true));//这里设置了监听器因为上面设置了队列所以在监听器中就不需要用监听器的注解了 。container.setMessageListener(myAckReceiver);return container;}
}
对应的手动确认消息监听类MyAckReceiver.java手动确认模式需要实现 ChannelAwareMessageListener //之前的相关监听器可以先注释掉以免造成多个同类型监听器都监听同一个队列。【比如我之前用的RabbitMQListener 、RabbitMQListener2 为了让其失效直接注释掉其中的//RabbitListener(queues “TestDirectQueue”)//监听的队列名称 TestDirectQueue】 这个注解即可这样这个监听器就无法监听相关队列了。 MyAckReceiver.java
package com.atguigu.gulimall.consumertrue.listener;import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.Map;/*** 对应的手动确认消息监听类MyAckReceiver.java手动确认模式需要实现 ChannelAwareMessageListener* //之前的相关监听器可以先注释掉以免造成多个同类型监听器都监听同一个队列。** 注意因为这里是在MessageListenerConfig 类中指定了是要监听哪个队列以及消息的确认机制所以这里不需要使用* RabbitListener(queues TestDirectQueue) 和 RabbitHandler(isDefault true)注解了* author: jd* create: 2024-06-25*/Component
public class MyAckReceiver implements ChannelAwareMessageListener {Overridepublic void onMessage(Message message, Channel channel) throws Exception {long deliveryTag message.getMessageProperties().getDeliveryTag();try {byte[] body message.getBody();ObjectInputStream objectInputStream new ObjectInputStream(new ByteArrayInputStream(body));MapString,String msgMap (MapString,String)objectInputStream.readObject();String messageId msgMap.get(messageId);String messageData msgMap.get(messageData);String createTime msgMap.get(createTime);objectInputStream.close();System.out.println( MyAckReceiver messageId:messageId messageData:messageData createTime:createTime);System.out.println(消费的主题队列来自message.getMessageProperties().getConsumerQueue());
// 消费者成功处理后调用channel.basicAck(message.getMessageProperties().getDeliveryTag(), false)方法对消息进行确认。channel.basicAck(deliveryTag, true); // deliveryTag:该消息的index multiple是否批量. true将一次性ack所有小于deliveryTag的消息。 第二个参数手动确认可以被批处理 当该参数为 true 时则可以一次性确认 delivery_tag 小于等于传入值的所有消息
// channel.basicReject(deliveryTag, true);//第二个参数true会重新放回队列所以需要自己根据业务逻辑判断什么时候使用拒绝} catch (Exception e) {channel.basicReject(deliveryTag, false);e.printStackTrace();}}
}
这时先调用接口/sendDirectMessage 给直连交换机TestDirectExchange 的队列TestDirectQueue 推送一条消息可以看到监听器正常消费了下来 第一次验证我们发现消费者没有消费掉直流交换机中的消息而且也在直流队列中积压了起来 这是由于我们的配置类忘记加了 Configuration 注解了所以此时这个不是配置类也就是这里对MQ的配置不会生效所以加上之后 我们再去试试 可看到下图 消费成功 配置类中 container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // RabbitMQ默认是自动确认这里改为手动确认消息 是发挥作用的关键
方式二通过yml配置来完成消费者确认
特别注意因为这里我们要使用yml配置来实现所以我们需要关闭配置类的作用使之失效我这里直接把Configuration 给注释掉 了这样配置类不会起作用了_ 第二种方式正式开始啦 (#.#) 首先我们来在yml中开启手动确认的配置
server:port: 8022#数据源配置
spring:datasource:url: jdbc:mysql://192.168.56.10:3306/gulimall_umsusername: rootpassword: rootdriver-class-name: com.mysql.cj.jdbc.Driver#配置nacoscloud:nacos:discovery:server-addr: 127.0.0.1#配置服务名称application:name: rabbitmq-consumer-true# 配置rabbitMq 服务器#spring.application.namerabbitmq-consumer-truerabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guest#虚拟host 可以不设置,使用server默认hostvirtual-host: /listener: #这个在测试消费多个消息的时候不能有下面这些配置否则只能消费一个消息后就不继续消费了simple:acknowledge-mode: manual #指定MQ消费者的确认模式是手动确认模式 这个在消费者者模块配置prefetch: 1 #一次只能消费一条消息 这个在消费者者模块配置#配置日志输出级别
logging:level:com.atguigu.gulimall: debug#配置日志级别其中的 几行是开启的关键 listener: #这个在测试消费多个消息的时候不能有下面这些配置否则只能消费一个消息后就不继续消费了 simple: acknowledge-mode: manual #指定MQ消费者的确认模式是手动确认模式 这个在消费者者模块配置 prefetch: 1 #一次只能消费一条消息 这个在消费者者模块配置
此处直接用接口来当生产者了 然后我们在生产者模块用于放消息的controller中增加一个放消息的请求方法用于往队列里面连续放入5个放消息 SendMessageController.java /*** 原文链接https://blog.csdn.net/weixin_45724872/article/details/119655638* 将信号放入MQ* param message* return*/PostMapping(/msg/muscle)public String receiveMuscleSign(RequestBody String message) {//处理业务for (int i 1; i 5; i) {rabbitTemplate.convertAndSend(muscle_fanout_exchange,,messagei);}return receiveMuscleSign ok;}
开发消费者 此处用一个类下的两个方法来模拟2个消费者
package com.atguigu.gulimall.consumertrue.listener;import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/****此处用一个类下的两个方法来模拟2个消费者*原文链接https://blog.csdn.net/weixin_45724872/article/details/119655638原文链接https://blog.csdn.net/weixin_45724872/article/details/119655638* author: jd* create: 2024-06-25*/
Component
public class MyConsumerListener {RabbitListener(bindings {QueueBinding(value Queue(consumer_queue_1),//绑定交换机exchange Exchange(value muscle_fanout_exchange, type fanout))})public void consumer1(String msg, Message message, Channel channel) throws Exception {long deliveryTag message.getMessageProperties().getDeliveryTag();try {System.out.println(消费者1 msg);//channel.basicAck(deliveryTag, false); // 因为 yml中 prefetch 设置为 1或未设置因为默认可能是 0表示无限制但这不是推荐的做法RabbitMQ 将只发送一个消息给消费者并等待该消息的确认。在这种情况下// 如果你注释掉了 channel.basicAck消费者将只能消费一个消息并且不会收到下一个消息直到你发送确认或关闭连接。 所以对于消息队列中的五个消息只能销费一个除非你手动确认否则不会再消费其他的消息} catch (Exception e) {channel.basicReject(deliveryTag, false);e.printStackTrace();}}RabbitListener(bindings {QueueBinding(value Queue(consumer_queue_2),//绑定交换机exchange Exchange(value muscle_fanout_exchange, type fanout))})public void consumer2(String msg,Message message, Channel channel) throws Exception {long deliveryTag message.getMessageProperties().getDeliveryTag();try {System.out.println(消费者2 msg);channel.basicAck(deliveryTag, false);} catch (Exception e) {channel.basicReject(deliveryTag, false);e.printStackTrace();}}}
注意一点消费者1的手动ACK我们是注释掉了
而消费者2的手动ACK我们是开着的
原因是为了对照试验
我们期望的情况是一共5条消息消费者1和2都一一处理
处理完毕后再取下一条否则不让取
那么按我们代码这样写
消费者1只能取一条 (只是处理一条的原因) 而消费者2则能取满5条因为消费者1的手动ACK被我们注释了此处又不是自动ACK
消费者1只是处理一条的原因下图中的perfetchCount有问题我们实际上配置的是prefetch: 1 我们直接按照这个配置来理解就行 消费者一就是注释了对消息消费之后的确认回馈给RabbitMQ的设置所以消费者对五条消息中消费到第一个之后因为我们在yml中又配置了每次消费一条而且也是手动确认的所以MQ消费到这一条之后就在那等着手动调用ack方法来完成的确认ack的反馈结果我们这里注释了所以就一直等不到第一条消息的回馈所以就会一直等待下面的4条消息也就无法继续消费了
相反消费者二就不一样了他有消费完每一条消息之后都调用了手动ack的回馈所以可以消费5条消息都消息完。
以下是实验截图 MQ 的初始状态
首先用postman发送请求
看下图生产者发送了5条消息并得到了成功推送到了交换机和队列的回馈
接下来我们步入正题看消费者里面消费者1只是消费了一条消费者2消费了全部的5条消息 结果和我们预想的是一致的
我们在看看MQ的管理页面来确认 可以看到消费者2已经搞完了而消费者1那边卡住了消费者一消费了一条但是在等待回馈还剩余4条都没被消费在等待消费
我在实验的过程中因为消费者1中的消息堆积了如果再次发送5条消息到扇形交换机中那队列1中会积累到9条待消费的1条等待反馈的10条总共的我们可以实验一下子 结果和我们预想的一样那我们如何将这些积压的消息给去掉呢 我自己试出了两种方式最初试的直接重启服务这样是无效的因为进入队列的不被消费会一直在队列里面 。 下面是2种处理方法 第一种是最直接的方法直接把确认那行的代码给放开这样这个消费者1 就会把队列1中积压的那些给消费掉了 第二种 我们将yml中的手动确认配置注释掉这样就默认是自动确认了这样我每次从postman中发送5条消息到扇形交换机分发到两个队列之后两个消费者都会一直可以消费因为没消费一个都会自动确认回馈不用等待了这样也是可以的 我们实验如下 实验1 我们先把消费者1中注释的手动回馈给放开 可见console中 对于积压的消息直接给消费掉了。
实验2 我们将消费者1中的手动反馈给继续注释掉发送2次 postman 造成积压 我把yml中的手动消费者确认改成自动的也就是注释掉可以看到重启消费者模块后积压的也被消费了 注释配置 重启后看控制台 很明显启动后积压的消息也被消费了 在MQ控制台中也可以看到积压消息被消费啦
关于手动确认的一些方法 细心的小伙伴可能发现了我们在消费者的catch处写了这样一行代码
channel.basicReject(deliveryTag, false);
以下是解释
一般是有3种确认的其中1种是正确确认另外2种是错误确认
reject:只能否定一条消息 nack:可以否定一条或者多条消息
而错误确认的这两个都有一个属性 boolean requeue
当它是true的时候表示重新入队 当它是false的时候则表示抛弃掉
使用拒绝后重新入列这个确认模式要谨慎因为触发错误确认一般都是出现异常的时候那么就可能导致死循环即不断的入队-消费-报错-重新入队…这将导致消息积压万一就炸了…
实验错误确认 我们将上述的消费者代码加一行代码
此处只改动了消费者1消费者2不变
新增一条抛异常的语句
int num 1/0;
package com.tubai;import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;Component
public class MyConsumer {RabbitListener(bindings {QueueBinding(value Queue(consumer_queue_1),//绑定交换机exchange Exchange(value muscle_fanout_exchange, type fanout))})public void consumer1(String msg,Message message, Channel channel) throws Exception {long deliveryTag message.getMessageProperties().getDeliveryTag();try {System.out.println(消费者1 msg);int num 1/0;channel.basicAck(deliveryTag, false); //第二个参数手动确认可以被批处理当该参数为 true 时} catch (Exception e) {channel.basicReject(deliveryTag, false);e.printStackTrace();}}RabbitListener(bindings {QueueBinding(value Queue(consumer_queue_2),//绑定交换机exchange Exchange(value muscle_fanout_exchange, type fanout))})public void consumer2(String msg,Message message, Channel channel) throws Exception {long deliveryTag message.getMessageProperties().getDeliveryTag();try {System.out.println(消费者2 msg);channel.basicAck(deliveryTag, false);} catch (Exception e) {channel.basicReject(deliveryTag, false);e.printStackTrace();}}
}
运行结果 可以看到我们的消费者1也正常了因为我们是先打印后确认因此1~5也会被打印出来
如果重复入队…那么我们的程序就会死循环了疯狂打印各位可以自己试试但是容易把内存占满O。。
本篇文章书写不易自己打了好久大家认可的话或者开启了新认知请给个点赞。收藏哦 (#.#) 谢谢大家 参考文章也写的超级好大家也可都学习学习一起进步 Springboot 整合RabbitMq 用心看完这一篇就够了 RabbitMQ的消息确认机制 SpringBoot集成RabbitMq 手动ACK RabbitMQ控制界面详解 文章转载自: http://www.morning.jgmdr.cn.gov.cn.jgmdr.cn http://www.morning.nhbhc.cn.gov.cn.nhbhc.cn http://www.morning.lkgqb.cn.gov.cn.lkgqb.cn http://www.morning.fgxr.cn.gov.cn.fgxr.cn http://www.morning.pqnps.cn.gov.cn.pqnps.cn http://www.morning.nndbz.cn.gov.cn.nndbz.cn http://www.morning.mhfbp.cn.gov.cn.mhfbp.cn http://www.morning.mfct.cn.gov.cn.mfct.cn http://www.morning.cmqrg.cn.gov.cn.cmqrg.cn http://www.morning.jfmyt.cn.gov.cn.jfmyt.cn http://www.morning.tqrbl.cn.gov.cn.tqrbl.cn http://www.morning.mlgsc.com.gov.cn.mlgsc.com http://www.morning.bxbnf.cn.gov.cn.bxbnf.cn http://www.morning.drspc.cn.gov.cn.drspc.cn http://www.morning.dqbpf.cn.gov.cn.dqbpf.cn http://www.morning.xtqr.cn.gov.cn.xtqr.cn http://www.morning.wfspn.cn.gov.cn.wfspn.cn http://www.morning.kbfzp.cn.gov.cn.kbfzp.cn http://www.morning.lgxzj.cn.gov.cn.lgxzj.cn http://www.morning.ngznq.cn.gov.cn.ngznq.cn http://www.morning.bpwdc.cn.gov.cn.bpwdc.cn http://www.morning.jprrh.cn.gov.cn.jprrh.cn http://www.morning.yrnll.cn.gov.cn.yrnll.cn http://www.morning.mlpmf.cn.gov.cn.mlpmf.cn http://www.morning.bpmtj.cn.gov.cn.bpmtj.cn http://www.morning.zlwg.cn.gov.cn.zlwg.cn http://www.morning.ypbdr.cn.gov.cn.ypbdr.cn http://www.morning.weiwt.com.gov.cn.weiwt.com http://www.morning.djxnn.cn.gov.cn.djxnn.cn http://www.morning.brfxt.cn.gov.cn.brfxt.cn http://www.morning.cnbdn.cn.gov.cn.cnbdn.cn http://www.morning.3ox8hs.cn.gov.cn.3ox8hs.cn http://www.morning.slmbg.cn.gov.cn.slmbg.cn http://www.morning.dyhlm.cn.gov.cn.dyhlm.cn http://www.morning.27asw.cn.gov.cn.27asw.cn http://www.morning.dnphd.cn.gov.cn.dnphd.cn http://www.morning.bpncd.cn.gov.cn.bpncd.cn http://www.morning.fmdvbsa.cn.gov.cn.fmdvbsa.cn http://www.morning.lsmgl.cn.gov.cn.lsmgl.cn http://www.morning.qqklk.cn.gov.cn.qqklk.cn http://www.morning.pphbn.cn.gov.cn.pphbn.cn http://www.morning.xkyqq.cn.gov.cn.xkyqq.cn http://www.morning.ffptd.cn.gov.cn.ffptd.cn http://www.morning.tbjtp.cn.gov.cn.tbjtp.cn http://www.morning.tndxg.cn.gov.cn.tndxg.cn http://www.morning.wjfzp.cn.gov.cn.wjfzp.cn http://www.morning.hongjp.com.gov.cn.hongjp.com http://www.morning.mhybs.cn.gov.cn.mhybs.cn http://www.morning.xqqcq.cn.gov.cn.xqqcq.cn http://www.morning.ygrkg.cn.gov.cn.ygrkg.cn http://www.morning.gcdzp.cn.gov.cn.gcdzp.cn http://www.morning.xlxmy.cn.gov.cn.xlxmy.cn http://www.morning.fllx.cn.gov.cn.fllx.cn http://www.morning.rjjjk.cn.gov.cn.rjjjk.cn http://www.morning.qrcsb.cn.gov.cn.qrcsb.cn http://www.morning.hxlch.cn.gov.cn.hxlch.cn http://www.morning.xnrgb.cn.gov.cn.xnrgb.cn http://www.morning.mqldj.cn.gov.cn.mqldj.cn http://www.morning.bslkt.cn.gov.cn.bslkt.cn http://www.morning.kdhrf.cn.gov.cn.kdhrf.cn http://www.morning.byrlg.cn.gov.cn.byrlg.cn http://www.morning.wmfmj.cn.gov.cn.wmfmj.cn http://www.morning.saletj.com.gov.cn.saletj.com http://www.morning.kggxj.cn.gov.cn.kggxj.cn http://www.morning.kcnjz.cn.gov.cn.kcnjz.cn http://www.morning.hdwjb.cn.gov.cn.hdwjb.cn http://www.morning.wfjyn.cn.gov.cn.wfjyn.cn http://www.morning.hjjfp.cn.gov.cn.hjjfp.cn http://www.morning.mmtbn.cn.gov.cn.mmtbn.cn http://www.morning.tmpsc.cn.gov.cn.tmpsc.cn http://www.morning.tmbfz.cn.gov.cn.tmbfz.cn http://www.morning.lbcbq.cn.gov.cn.lbcbq.cn http://www.morning.lkhgq.cn.gov.cn.lkhgq.cn http://www.morning.bssjp.cn.gov.cn.bssjp.cn http://www.morning.yrmpr.cn.gov.cn.yrmpr.cn http://www.morning.qrqg.cn.gov.cn.qrqg.cn http://www.morning.czcbl.cn.gov.cn.czcbl.cn http://www.morning.wtcd.cn.gov.cn.wtcd.cn http://www.morning.tfpmf.cn.gov.cn.tfpmf.cn http://www.morning.yjmns.cn.gov.cn.yjmns.cn