共享门店新增跑腿距离计算优化,关于seo网站优化公司,品牌网站建设设计,推广计划怎么修改目录
1 消息队列(Message Queue)简介
1.1 概述
1.2 消息服务中两个重要概念
1.3 消息队列主要有两种形式的目的地
1.4 JMS和AMQP对比
1.5 应用场景
1.6 Spring支持
1.7 SpringBoot自动配置
1.7 市面上的MQ产品
2 RabbitMQ
2.1 RabbitMQ简介
2.1.1 RabbitMQ简介
2…目录
1 消息队列(Message Queue)简介
1.1 概述
1.2 消息服务中两个重要概念
1.3 消息队列主要有两种形式的目的地
1.4 JMS和AMQP对比
1.5 应用场景
1.6 Spring支持
1.7 SpringBoot自动配置
1.7 市面上的MQ产品
2 RabbitMQ
2.1 RabbitMQ简介
2.1.1 RabbitMQ简介
2.1.2 核心概念
2.2 docker安装RabbitMQ
2.3 RabbitMQ管理操作页面介绍
2.3.1 Overview概述
2.3.2 Connections连接信息
2.3.3 Channels信道信息
2.3.4 Exchanges交换机信息
2.3.5 Queues队列信息
2.3.6 Admin用户信息
2.4 RabbitMQ运行机制
2.5 交换机Exchange类型
2.5.1 Direct Exchange直连式
2.5.2 Fanout Exchange扇出/广播式
2.5.3 Topic Exchange主题/发布订阅式
2.6 收发消息测试
2.6.1 exchange.direct发送消息
2.6.1.1 exchange.direct绑定队列
2.6.1.2 exchange.direct发送消息
2.6.2 exchange.fanout发送消息
2.6.2.1 exchange.fanout绑定队列
2.6.2.2 exchange.fanout发送消息
2.6.3 exchange.topic发送消息
2.6.3.1 exchange.topic绑定队列
2.6.3.2 exchange.topic发送消息
3 SpringBoot整合RabbitMQ
3.1 引入依赖
3.2 yml配置
3.3 开启RabbitMQ
3.4 整合测试
3.4.1 AmqpAdmin使用
3.4.1.1 创建交换机
3.4.1.2 创建队列
3.4.1.3 创建绑定关系
3.4.2 RabbitTemplate使用
3.4.2.1 发送消息
3.4.2.1.1 使用json序列化对象
3.4.3 RabbitListener RabbitHandler接收消息
3.4.3.1 RabbitListener监听接收消息
3.4.3.1.1 简单接收消息
3.4.3.1.2 接收消息内容并反序列化
3.4.3.1.3 完整参数写法
3.4.3.1.4 验证多个消费者监听场景
3.4.3.2 RabbitListener和RabbitHandler监听接收消息
3.4.3.2.1 RabbitListener和RabbitHandler的区别与作用
3.4.3.2.2 测试重载处理不同类型的消息
3.5 消息可靠抵达
3.5.1 发送端确认
3.5.1.1 ConfirmCallback确认模式
3.5.1.2 ReturnCallback回退模式
3.5.1.3 消息唯一id
3.5.2 消费端确认Ack确认机制
3.5.2.1 消费端自动确认
3.5.2.2 消费端手动确认
3.5.2.3 消费端退货
4 RabbitMQ延时队列实现定时任务
4.1 为什么不使用定时任务
4.2 使用场景
4.3 消息的存活时间TTL(Time To Live)
4.4 DXL和死信队列
4.5 延时队列实现
4.5.1 设置队列过期时间实现延时队列
4.5.2 设置消息过期时间实现延时队列
4.6 延时队列定时关单模拟
4.6.1 实现方式
4.6.1.1 基础版
4.6.1.2 升级版
4.6.2 实现
4.6.2.1 创建Queue、Exchange、Binding方式
4.6.2.2 发送消息相关代码 4.6.2.3 监听接收消息相关代码
4.6.3 测试 1 消息队列(Message Queue)简介
1.1 概述 大多应用可以通过消息服务中间件来提升系统异步通信和扩展解耦能力。
1.2 消息服务中两个重要概念 消息代理message broker和目的地destination。
1.3 消息队列主要有两种形式的目的地
队列queue点对点消息通信point-to-point;主题topic发布publish/订阅subscribe消息通信。
点对点模式和发布订阅模式区别
1点对点式
消息发送者发送消息消息代理将其放入一个队列中消息接收者从队列中获取消息内容消息读取后被移除队列。消息只有唯一的发送者和接受者只能有一个接收者读取信息 但不是说只有一个接收者。
2发布订阅式
发送者发布者发送消息到主题多个接收者订阅者监听订阅这个主题那么就会在消息到达时同时收到消息。
1.4 JMS和AMQP对比
1JMS
是Java消息服务的规范基于jvm消息代理的规范用JmsListener监听消息JMS是java提供的一套消息服务API标准其目的是为所有的java应用程序提供统一的消息通信的标准类似java的 jdbc只要遵循jms标准的应用程序之间都可以进行消息通信。ActiveMQ、HornetMQ是JMS实现的。
2AMQP(Advacnce Message Queuing Protocol)
高级消息队列协议是一个消息代理的规范兼容JMSRabbitMQ 是AMQP的实现用RabbitListener(AMQP)监听消息它和AMQP有什么 不同jms是java语言专属的消息服务标准它是在api层定义标准并且只能用于java应用而AMQP是在协议层定义的标准是跨语言的 。RabbitMQ是AMQP的实现。 1.5 应用场景 在实际应用中常用的场景异步处理、应用解耦、流量削峰和消息通讯四个场景。 1.6 Spring支持
spring-jms提供了对JMS的支持spring-rabbit提供了对AMQP的支持需要ConnectionFactory的实现来连接消息代理提供JmsTemplate、RabbitTemplate来发送消息JmsListener(JMS).RabbitListener(AMQP注解在方法上监听消息代理发布的消息EnableJms、EnableRabbit开启支持
1.7 SpringBoot自动配置
JmsAutoConfigurationRabbitAutoConfiguration
1.7 市面上的MQ产品 ActiveMQ、RabbitMQ、RocketMQ、Kafka。
2 RabbitMQ
2.1 RabbitMQ简介
RabbitMQ官方文档Networking and RabbitMQ — RabbitMQ
2.1.1 RabbitMQ简介 RabbitMQ是一个由erlang开发的AMQPAdvanced Message Queue Protocol。 2.1.2 核心概念
Message消息消息是不具名的它由消息头和消息体组成。消息体是不透明的而消息头是由一些可选属性组成这些属性包含routing-key路由键、priority相对于其他消息的优先权、delivery-mode指出该消息可能需要持久性存储等。Publisher生产者也是一个向交换机发布消息的客户端应用程序。Exchange交换机用来接收生产者发送的消息并将这些消息路由给服务器中的队列。Exchange有4种类型direct默认、 fanout、topic、headers不同类型的Exchange转发消息的策略有所区别。Queue消息队列用来保存消息直到发送给消费者。它是消息的容器也是消息的终点。一个消息可投入一个或多个队列。消息一直 在队列里面等待消费者连接到这个队列将其取走。Binding绑定用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则所以可以将交 换器理解成一个由绑定构成的路由表。Exchange 和Queue的绑定可以是多对多的关系。Connection网络连接比如一个TCP连接。Channel信道多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接AMQP 命令都是通过信道 发出去的不管是发布消息、订阅队列还是接收消息这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都 是非常昂贵的开销所以引入了信道的概念以复用一条 TCP 连接。Consumer信息的消费者表示一个从消息队列中取得消息的客户端应用程序。Virtual Host虚拟主机表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加 密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器拥 有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础必须在连接时 指定RabbitMQ 默认的 vhost 是 / 。Broker表示消息队列服务器实体。 拓展 RabbitMQ为什么使用信道而不直接使用TCP连接通信
原因TCP连接的创建和销毁开销特别大。创建需要 3 次握手销毁需要 4 次挥手。高峰时每秒成千上万条TCP连接的创建会照成资源巨大浪费。而且操作系统每秒处理TCP连接数也是有限制的会造成性能瓶颈。而如果一条线程使用一条信道一条TCP连接可以容纳无限的信道即使每秒成千上万的请求也不会照成性能瓶颈。
2.2 docker安装RabbitMQ
# 下载影像
docker pull rabbitmq:3.9.11-management# 安装
docker run -d --namerabbitmq --restartalways -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 15671:15671 -p 15672:15672 -p 25672:25672 rabbitmq:3.9.11-management 4369、25672Erlang发现和集群端口 5671、5672AMQP端口 15672web管理后台端口 61613、61614STOMP协议端口 1883、8883MQTT协议接口
rabbitmq的虚拟主机以路径来区分例如/、/a、/b。虚拟主机之间是相互隔离的。
2.3 RabbitMQ管理操作页面介绍
2.3.1 Overview概述 2.3.2 Connections连接信息 2.3.3 Channels信道信息 2.3.4 Exchanges交换机信息 2.3.5 Queues队列信息 2.3.6 Admin用户信息 2.4 RabbitMQ运行机制
AMQP中的消息路由
AMQP中消息的路由过程和Java开发者熟悉的JMS存在一些差别AMQP中增加了Exchange和Binding的角色。生产者把消息发布到Exchange上消息最终到达队列并被消费者接收Binding决定交换机将消息发送到哪个队列。 2.5 交换机Exchange类型 Exchange分发消息时根据类型的不同分发策略有区别目前共四种类型direct、fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由键headers 交换器和 direct 交换器完全一致但性能差很多目前几乎用不到了所以直接看另外三种类型
2.5.1 Direct Exchange直连式 消息中的路由键routing key如果和Binding 中的 binding key 一致 交换器就将消息发到对应的队列中。路由键与队列名完全匹配如果一个队列绑定到交换机要求路由键为“dog”则只转发 routingkey 标记为“dog”的消息不会转发“dog.puppy”也不会转发“dog.guard”等等。它是完全匹配、单播的模式。 2.5.2 Fanout Exchange扇出/广播式 每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键只是简单的将队列绑定到交换器上每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播每台子网内的主机都获得了一份复制的消息。fanout 类型转发消息是最快的。 2.5.3 Topic Exchange主题/发布订阅式 topic 交换器通过模式匹配分配消息的路由键属性将路由键和某个模式进行匹配此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词这些单词之间用点隔开。它同样也会识别两个通配符符号“#”和符号“*”。#匹配0个或多个单词*匹配一个单词。 2.6 收发消息测试 按照如下图新建交换机、队列进行测试。 1新建交换机如下 2新建队列如下 2.6.1 exchange.direct发送消息
2.6.1.1 exchange.direct绑定队列 2.6.1.2 exchange.direct发送消息 只有atguigu队列拿到消息直连型路由键需要完全匹配。 Ask Mode: 1. Nack message requeue true接收消息但不做确认消息会重新加入队列 2. Automatic ack获取消息应答确认消息不重新入队将会从队列中删除 3. Reject requeue true拒绝消息消息会重新加入队列 4. Reject requeue false拒绝消息消息会被从队列中删除。 2.6.2 exchange.fanout发送消息
2.6.2.1 exchange.fanout绑定队列 2.6.2.2 exchange.fanout发送消息 所有与exchange.fanout交换机绑定的队列都会收到消息这就是扇出型交换机。 2.6.3 exchange.topic发送消息
2.6.3.1 exchange.topic绑定队列 2.6.3.2 exchange.topic发送消息
1 2 atguigu、atguigu.emps、atguigu.news这三个队列收到了消息。“#”匹配0个或多个单词。 3 SpringBoot整合RabbitMQ
3.1 引入依赖 gulimall-order/pom.xml dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId
/dependency
3.2 yml配置 gulimall-order/src/main/resources/application.yml spring:rabbitmq:host: 172.1.11.10port: 5672virtual-host: /
3.3 开启RabbitMQ
相关注解 EnableRabbit监听消息必须使用收发消息可以不使用该注解。 gulimall-order/src/main/java/com/wen/gulimall/order/GulimallOrderApplication.java EnableRabbit
SpringBootApplication
public class GulimallOrderApplication {public static void main(String[] args) {SpringApplication.run(GulimallOrderApplication.class, args);}}
3.4 整合测试
3.4.1 AmqpAdmin使用
3.4.1.1 创建交换机
1交换机类型 2创建交换机
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;Slf4j
SpringBootTest
class GulimallOrderApplicationTests {Resourceprivate AmqpAdmin amqpAdmin;Testvoid createExchange(){// 声明交换机/*** DirectExchange(String name, 【交换机名称】* boolean durable, 【是否持久化】* boolean autoDelete, 【是否自动删除】* MapString, Object arguments) 【自定义参数】*/DirectExchange directExchange new DirectExchange(hello-java-exchange,true,false);amqpAdmin.declareExchange(directExchange);log.info(Exchange[{}]创建成功,hello-java-exchange);}
} 3.4.1.2 创建队列
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;Slf4j
SpringBootTest
class GulimallOrderApplicationTests {Resourceprivate AmqpAdmin amqpAdmin;Testvoid createQueue(){// 创建队列/*** Queue(String name, 【队列名称】* boolean durable, 【是否持久化】* boolean exclusive, 【是否排他只能被一个consumer连接占用】* boolean autoDelete, 【是否自动删除】* Nullable MapString, Object arguments) 【自定义参数】*/Queue queue new Queue(hello-java-queue,true,false,false);amqpAdmin.declareQueue(queue);log.info(Queue[{}]创建成功,hello-java-queue);}
} 3.4.1.3 创建绑定关系
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;Slf4j
SpringBootTest
class GulimallOrderApplicationTests {Resourceprivate AmqpAdmin amqpAdmin;Testpublic void createBinding(){// 创建绑定/*** Binding(String destination, 【目的地队列name或交换机name】* Binding.DestinationType destinationType, 【目的地类型queue还是exchange(路由)】* String exchange, 【交换机】* String routingKey, 【路由键】* Nullable MapString, Object arguments) 【自定义参数】** 将exchange指定交换机和destination目的地进行绑定使用routingKey作为路由键*/Binding binding new Binding(hello-java-queue,Binding.DestinationType.QUEUE,hello-java-exchange,hello-java,null);amqpAdmin.declareBinding(binding);log.info(Binding[{}]创建成功,hello-java);}
} 3.4.2 RabbitTemplate使用
3.4.2.1 发送消息 如果发送的消息是个对象我们会使用序列化机制将对象写出。对象必须实现Serializable接口例如 Slf4j
SpringBootTest
class GulimallOrderApplicationTests {Resourceprivate RabbitTemplate rabbitTemplate;Testvoid sendMessage(){/*** convertAndSend(String exchange, 【交换机】* String routingKey, 【路由键】* Object object) 【发送的信息】*/String msg hjhajkxdhjashj哈哈哈哈哈;OrderReturnReasonEntity entity new OrderReturnReasonEntity();entity.setCreateTime(new Date());entity.setId(2L);entity.setName(多多);//1. 发送消息如果发送的消息是个对象我们会使用序列化机制将对象写出。对象必须实现SerializablerabbitTemplate.convertAndSend(hello-java-exchange,hello-java,entity);log.info(发送消息【{}】成功,entity);}
} 队列收到的消息如下 3.4.2.1.1 使用json序列化对象 通过源码分析如果容器中没有MessageConverter默认使用使用SimpleMessageConverter 这里自定义RabbitMQ配置使用Jackson2JsonMessageConverter 消息转换器 gulimall-order/src/main/java/com/wen/gulimall/order/config/MyRabbitConfig.java Configuration
public class MyRabbitConfig {Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}
}
重新发送消息测试
Slf4j
SpringBootTest
class GulimallOrderApplicationTests {Resourceprivate RabbitTemplate rabbitTemplate;Testvoid sendMessage(){/*** convertAndSend(String exchange, 【交换机】* String routingKey, 【路由键】* Object object) 【发送的信息】*/String msg hjhajkxdhjashj哈哈哈哈哈;OrderReturnReasonEntity entity new OrderReturnReasonEntity();entity.setCreateTime(new Date());entity.setId(2L);entity.setName(多多);//1. 发送消息如果发送的消息是个对象我们会使用序列化机制将对象写出。对象必须实现SerializablerabbitTemplate.convertAndSend(hello-java-exchange,hello-java,entity);log.info(发送消息【{}】成功,entity);}
}
测试结果如下 3.4.3 RabbitListener RabbitHandler接收消息
3.4.3.1 RabbitListener监听接收消息 监听消息使用注解RabbitListener该注解标注在方法上使用该注解的前提需要主类上使用注解EnableRabbit。 3.4.3.1.1 简单接收消息 使用注解RabbitListener并通过属性queues声明需要监听的所有队列。 gulimall-order/src/main/java/com/wen/gulimall/order/service/impl/OrderServiceImpl.java RabbitListener(queues {hello-java-queue})
public void receiveMessage(Object msg){System.out.println(接收到消息...内容msg类型msg.getClass());
}
监听结果
接收到消息...内容(Body:[B17ee5a8a(byte[77]) MessageProperties [headers{__TypeId__com.wen.gulimall.order.entity.OrderReturnReasonEntity}, contentTypeapplication/json, contentEncodingUTF-8, contentLength0, receivedDeliveryModePERSISTENT, priority0, redeliveredfalse, receivedExchangehello-java-exchange, receivedRoutingKeyhello-java, deliveryTag1, consumerTagamq.ctag-K2kLpChLRK15TemEJ4XRBQ, consumerQueuehello-java-queue])类型class org.springframework.amqp.core.Message
3.4.3.1.2 接收消息内容并反序列化 由上一步接收到的消息内容可知消息的类型是org.springframework.amqp.core.Message将需要反序列化的类放在第二个参数的位置可以将消息内容自动封装成对象。消息接收对象类型要和消息发送对象类型保持一致 第一个参数Message -- 原生消息详细信息消息头消息体。 第二个参数T发送消息的类型 RabbitListener(queues {hello-java-queue})
public void receiveMessage2(Message msg, OrderReturnReasonEntity content){// 消息体byte[] body msg.getBody();// 消息头信息MessageProperties messageProperties msg.getMessageProperties();System.out.println(接收到消息...msg内容content);
}
监听结果
接收到消息...(Body:[B2ec3c5a5(byte[77]) MessageProperties [headers{__TypeId__com.wen.gulimall.order.entity.OrderReturnReasonEntity}, contentTypeapplication/json, contentEncodingUTF-8, contentLength0, receivedDeliveryModePERSISTENT, priority0, redeliveredfalse, receivedExchangehello-java-exchange, receivedRoutingKeyhello-java, deliveryTag1, consumerTagamq.ctag-vuaHxsngkQHP8vWIUy6oGA, consumerQueuehello-java-queue])内容OrderReturnReasonEntity(id2, name多多, sortnull, statusnull, createTimeThu Jan 25 14:32:30 CST 2024) 3.4.3.1.3 完整参数写法 参数类型 1 Message message原生消息详细信息消息头消息体。 2T发送消息的类型 OrderReturnReasonEntity content。 3 Channel channel当前传输数据的通道。一个连接可以容纳多个通道 RabbitListener(queues {hello-java-queue})
public void receiveMessage3(Message msg,OrderReturnReasonEntity content,Channel channel){log.info(信道Channel:{},channel);// 消息体byte[] body msg.getBody();// 消息头信息MessageProperties messageProperties msg.getMessageProperties();System.out.println(接收到消息...msg内容content);
}
监听结果 3.4.3.1.4 验证多个消费者监听场景 Queue可以由多个消费者监听只能有一个消费者接收到消息。只要收到消息队列删除该消息。 场景 1启动多个订单服务同一个消息只有一个客户端接收到 2只有一个消息处理完方法运行结束才可以接收到下一个消息。 模拟发送10条消息。 gulimall-order/src/test/java/com/wen/gulimall/order/GulimallOrderApplicationTests.java Slf4j
SpringBootTest
class GulimallOrderApplicationTests {Resourceprivate AmqpAdmin amqpAdmin;Resourceprivate RabbitTemplate rabbitTemplate;Testvoid sendMessage(){for(int i0;i10;i) {OrderReturnReasonEntity entity new OrderReturnReasonEntity();entity.setCreateTime(new Date());entity.setId(2L);entity.setName(多多i);//1. 发送消息如果发送的消息是个对象我们会使用序列化机制将对象写出。对象必须实现SerializablerabbitTemplate.convertAndSend(hello-java-exchange, hello-java, entity);log.info(发送消息【{}】成功, entity);}}
} 复制一个订单模块并启用。 监听结果 每个客户端都是接收同一个队列一条消息只能由一个客户端接收没有重复消息。 在此期间有两个客户端有两个连接一个客户端只能有一个连接一个连接可以有多个信道(channel)。 3.4.3.2 RabbitListener和RabbitHandler监听接收消息
3.4.3.2.1 RabbitListener和RabbitHandler的区别与作用
RabbitListener标注在类或方法上。【作用监听从哪些队列接收消息】RabbitHandler标注在方法上。【作用重载区分不同类型的消息】
3.4.3.2.2 测试重载处理不同类型的消息 发送消息接口发送两种不同类型的消息 gulimall-order/src/main/java/com/wen/gulimall/order/controller/RabbitController.java RestController
public class RabbitController {Resourceprivate RabbitTemplate rabbitTemplate;GetMapping(/sendMq)public String sendMq(Integer num){for(int i0;inum;i){if(i%20){OrderReturnReasonEntity entity new OrderReturnReasonEntity();entity.setCreateTime(new Date());entity.setId(2L);entity.setName(多多i);//1. 发送消息如果发送的消息是个对象我们会使用序列化机制将对象写出。对象必须实现SerializablerabbitTemplate.convertAndSend(hello-java-exchange, hello-java, entity);}else {OrderEntity orderEntity new OrderEntity();orderEntity.setOrderSn(UUID.randomUUID().toString());rabbitTemplate.convertAndSend(hello-java-exchange, hello-java, orderEntity);}}return ok;}
} 重载方法监听接收不同类型的消息 gulimall-order/src/main/java/com/wen/gulimall/order/service/impl/OrderServiceImpl.java RabbitListener(queues {hello-java-queue})
Slf4j
Service(orderService)
public class OrderServiceImpl extends ServiceImplOrderDao, OrderEntity implements OrderService {//RabbitListener(queues {hello-java-queue})RabbitHandlerpublic void receiveMessage3(Message msg,OrderReturnReasonEntity content,Channel channel) throws InterruptedException {//log.info(信道Channel:{},channel);log.info(接收到消息...content);// 消息体//byte[] body msg.getBody();消息头信息//MessageProperties messageProperties msg.getMessageProperties();Thread.sleep(3000);//System.out.println(消息处理完成content.getName());}RabbitHandlerpublic void receiveMessage3(Message msg,OrderEntity content,Channel channel) {log.info(接收到消息...content);}
} 重启订单服务请求http://localhost:9000/sendMq?num10发送消息测试通过RabbitListener(queues{hello-java-queue})声明队列RabbitHandler重载用于接收同一个队列不同类型的数据。 注意接收同一个队列不同类型的数据并自动封装成相应的对象单使用RabbitListener是无法实现的需要RabbitListener和RabbitHandler搭配使用。如上。 测试结果 3.5 消息可靠抵达
官网参考文档https://www.rabbitmq.com/ Docs Server Documentation Reliable Delivery 1. 为什么不适用事务消息保证消息不丢失可靠抵达因为使用事务消息会使性能下降250倍为此引入确认机制。 2. 发送端确认 publisherconfirmCallback 确认模式publisherreturnCallback 未投递到 queue 退回模式。 3. 消费端确认 consumerack机制。 3.5.1 发送端确认
https://www.rabbitmq.com/ Docs Server Documentation Reliable Delivery
Acknowledgements and Confirms Publisher confirms
3.5.1.1 ConfirmCallback确认模式 1. 开启 p-e 的发送确认配置我这里使用的是SpringBoot2.7.8之前的配置已经废弃这里使用spring.rabbitmq.publisher-confirm-typecorrelated替换spring.rabbitmq.publisher-confirmstrue 2. 发送端确认类型spring.rabbitmq.publisher-confirm-type有3种如下 none值是禁用发布确认模式是默认值。correlated值是发布消息成功到交换器后会触发回调方法。simple值经测试有两种效果其一效果和CORRELATED值一样会触发回调方法其二在发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法等待broker节点返回发送结果根据返回结果来判定下一步的逻辑要注意的点是waitForConfirmsOrDie方法如果返回false则会关闭channel则接下来无法发送消息到broker。 开启配置 gulimall-order/src/main/resources/application.yml spring:rabbitmq:host: 172.xx.xx.xxport: 5672virtual-host: /publisher-confirm-type: correlated 自定义RabbitTemplate设置确认回调 gulimall-order/src/main/java/com/wen/gulimall/order/config/MyRabbitConfig.java Configuration
public class MyRabbitConfig {Resourceprivate RabbitTemplate rabbitTemplate;Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}/*** 定制RabbitTemplate* 1.服务收到消息就回调* 1)spring.rabbitmq.publisher-confirmstrue(这里使用spring.rabbitmq.publisher-confirm-typecorrelated)* 2)设置确认回调* 注意springboot rabbitmq 中 spring.rabbitmq.publisher-confirms 已经失效。需要使用 spring.rabbitmq.publisher-confirm-type 替代* publisher-confirm-type有3种* 1NONE值是禁用发布确认模式是默认值* 2CORRELATED值是发布消息成功到交换器后会触发回调方法* 3SIMPLE值经测试有两种效果其一效果和CORRELATED值一样会触发回调方法其二在发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法等待broker节点返回发送结果根据返回结果来判定下一步的逻辑要注意的点是waitForConfirmsOrDie方法如果返回false则会关闭channel则接下来无法发送消息到broker;*/PostConstruct //MyRabbitConfig对象创建完成之后执行这个方法public void initRabbitTemplate(){// 设置确认回调rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {/*** 只要消息抵达Broker就acktrue* param correlationData 当前消息的唯一关联数据这个是消息的唯一id* param b (ack) 消息是否成功收到* param s (cause) 失败的原因*/Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {System.out.println(confirm...correlationData[correlationData]ack[b]cause[s]);}});}
} 测试 请求http://localhost:9000/sendMq?num10 看测试结果只要消息抵达Broker就acktrue如下 3.5.1.2 ReturnCallback回退模式 开启 e-q 的确认配置消息不能由交换机投递到目标队列将调用returnCallback。 gulimall-order/src/main/resources/application.yml server:port: 9000
spring:datasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://172.xx.xx.xx:9906/gulimall_omsusername: rootpassword: rootrabbitmq:host: 172.xx.xx.xxport: 5672virtual-host: /# 开启发送端确认publisher-confirm-type: correlated# 开启发送端消息抵达队列的确认默认是falsepublisher-returns: true# 只要消息抵达队列以异步发送优先回调我们的returnConfirmtemplate:mandatory: trueredis:host: 172.xx.xx.xxcloud:nacos:discovery:server-addr: 172.xx.xx.xx:8848main:allow-circular-references: true
mybatis-plus:mapper-locations: classpath:/mapper/**/*.xmlglobal-config: # 全局配置db-config:id-type: auto # 主键自增 设置消息抵达队列的确认回调 Configuration
public class MyRabbitConfig {Resourceprivate RabbitTemplate rabbitTemplate;Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}/*** 定制RabbitTemplate* 1.服务收到消息就回调* 1)spring.rabbitmq.publisher-confirmstrue(这里使用spring.rabbitmq.publisher-confirm-typecorrelated)* 2)设置确认回调 ConfirmCallback* 2. 消息抵达队列就回调* 1)spring.rabbitmq.publisher-returnstrue* spring.rabbitmq.template.mandatorytrue* 2)设置消息抵达队列的确认回调 ReturnCallback* 注意springboot rabbitmq 中 spring.rabbitmq.publisher-confirms 已经失效。需要使用 spring.rabbitmq.publisher-confirm-type 替代* publisher-confirm-type有3种* 1NONE值是禁用发布确认模式是默认值* 2CORRELATED值是发布消息成功到交换器后会触发回调方法* 3SIMPLE值经测试有两种效果其一效果和CORRELATED值一样会触发回调方法其二在发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法等待broker节点返回发送结果根据返回结果来判定下一步的逻辑要注意的点是waitForConfirmsOrDie方法如果返回false则会关闭channel则接下来无法发送消息到broker;*/PostConstruct //MyRabbitConfig对象创建完成之后执行这个方法public void initRabbitTemplate(){// 设置确认回调rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {/*** 只要消息抵达Broker就acktrue* param correlationData 当前消息的唯一关联数据这个是消息的唯一id* param b (ack) 消息是否成功收到* param s (cause) 失败的原因*/Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {System.out.println(confirm...correlationData[correlationData]ack[b]cause[s]);}});// 设置消息抵达队列的确认回调rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {/*** 只要消息没有投递给指定的队列就触发这个失败回调* param message the returned message. 投递失败的消息详细信息* param replyCode the reply code. 回复的状态码* param replyText the reply text. 回复的文本内容* param exchange the exchange. 当时这个消息接收的交换机* param routingKey the routing key. 当时这个消息用的哪个路由键*/Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {System.out.println(Fail Message[message]replyCode[replyCode]replyText[replyText]exchange[exchange]routingKey[routingKey]);}});}
} 测试 该模式需要消息投递到指定队列失败才会触发这里将路由键故意改错便于测试测试结束后将错误改回来。 错误消息 Fail Message[(Body:[Bf04a8cf(byte[855]) MessageProperties [headers{__TypeId__com.wen.gulimall.order.entity.OrderEntity}, contentTypeapplication/json, contentEncodingUTF-8, contentLength0, receivedDeliveryModePERSISTENT, priority0, deliveryTag0])]replyCode[312]replyText[NO_ROUTE]exchange[hello-java-exchange]routingKey[hello-java333] 3.5.1.3 消息唯一id 在发送消息时参数CorrelationData就是消息的唯一id。 这个id在发送端确认时是可以拿到的可以用于排查哪些消息为成功抵达与消费端接收到存放在数据库中的消息唯一id进行对比。 3.5.2 消费端确认Ack确认机制 3.5.2.1 消费端自动确认 1消费端确认默认是自动确认的只要消息收到客户端会自动确认服务端就会移除这个消息。 2消费端自动确认存在问题 在接收消息这里打上断点在处理完第一个消息后关掉服务器模拟突发状况服务器宕机如下 关掉服务器后剩余未处理的4个消息也消失不见未经过消费端处理相当于消息丢失。 3.5.2.2 消费端手动确认 为了解决消息自动确认遇到突发状况造成消息丢失问题可以使用手动确认。 只有手动确认的消息才能被队列移除。 开启消费端手动确认配置 gulimall-order/src/main/resources/application.yml spring:rabbitmq:host: 172.1.11.10port: 5672virtual-host: /# 开启发送端确认publisher-confirm-type: correlated# 开启发送端消息抵达队列的确认默认是falsepublisher-returns: true# 只要消息抵达队列以异步发送优先回调我们的returnConfirmtemplate:mandatory: true# 开启消费端手动确认listener:simple:acknowledge-mode: manual
1. 消息抵达客户端不手动ack测试 发送五个消息进行测试客户端就算拿到消息不做消息抵达确认队列不能移除未确认的消息只是消息的状态有ready变为unacked关闭客户端服务器后消息的状态又由unacked变为ready下次重启客户端服务器又可以接收消息。 关闭客户端服务器观察消息的状态
由unacked变为ready 2. 消息抵达客户端模拟部分消息手动ack测试 客户端确认debug模式下无法模拟真实情况下宕机关闭了也会继续执行这里断点放开根据投递标签deliveryTagchannel内按顺序自增取余模拟客户端突然宕机未接收到消息如下 RabbitListener(queues {hello-java-queue})
Slf4j
Service(orderService)
public class OrderServiceImpl extends ServiceImplOrderDao, OrderEntity implements OrderService {...//RabbitListener(queues {hello-java-queue})RabbitHandlerpublic void receiveMessage3(Message msg,OrderReturnReasonEntity content,Channel channel) throws InterruptedException {//log.info(信道Channel:{},channel);System.out.println(接收到消息...content);// 消息体byte[] body msg.getBody();// 消息头信息MessageProperties messageProperties msg.getMessageProperties();Thread.sleep(3000);System.out.println(消息处理完成content.getName());// channel内按顺序自增的long deliveryTag msg.getMessageProperties().getDeliveryTag();System.out.println(deliveryTagdeliveryTag);// basicAck(long deliveryTag, // channel内按顺序自增// boolean multiple) // 是否批量确认// debug下无法模拟真实情况下的宕机关闭了也会继续执行// 这里根据投递标签deliveryTag模拟客户端突然宕机未接收到消息try {if(deliveryTag%20) {channel.basicAck(deliveryTag,false);// 手动ack确认接收到消息System.out.println(签收了货物...deliveryTag);}} catch (IOException e) {throw new RuntimeException(e);}}} 此时只确认签收了货物-2和货物-4还有3个未确认如下 关闭客户端服务器消息状态由unacked变成ready如下 重启客户端服务器剩余的消息可以继续签收因为deliveryTag是根据信道channel里的消息顺序自增客户端重启后消息又重新排序。之前发送的5个消息(1,2,3,4,5)只确认了偶数2和4客户端重启后channel内的消息顺序变为(1,2,3)消费端可以再次确认接收一个消息。 3.5.2.3 消费端退货
RabbitListener(queues {hello-java-queue})
Slf4j
Service(orderService)
public class OrderServiceImpl extends ServiceImplOrderDao, OrderEntity implements OrderService {...//RabbitListener(queues {hello-java-queue})RabbitHandlerpublic void receiveMessage3(Message msg,OrderReturnReasonEntity content,Channel channel) throws InterruptedException {//log.info(信道Channel:{},channel);System.out.println(接收到消息...content);// 消息体byte[] body msg.getBody();// 消息头信息MessageProperties messageProperties msg.getMessageProperties();Thread.sleep(3000);System.out.println(消息处理完成content.getName());// channel内按顺序自增的long deliveryTag msg.getMessageProperties().getDeliveryTag();System.out.println(deliveryTagdeliveryTag);// basicAck(long deliveryTag, // channel内按顺序自增// boolean multiple) // 是否批量确认// debug下无法模拟真实情况下的宕机关闭了也会继续执行// 这里根据投递标签deliveryTag模拟客户端突然宕机未接收到消息try {// 签收货物非批量模式if(deliveryTag%20) {// 收货channel.basicAck(deliveryTag,false);// 手动ack确认接收到消息System.out.println(签收了货物...deliveryTag);}else {// 退货// requeuefalse 丢弃 requeuetrue 重回队列// basicNack(long deliveryTag, boolean multiple, boolean requeue)channel.basicNack(deliveryTag,false,false);System.out.println(没有签收货物...deliveryTag);}} catch (IOException e) {throw new RuntimeException(e);}}
}channel.basicNack(deliveryTag,false,false);中requeuefalse丢弃掉未被签收的货物清空队列如下 当requeuetrue时deliveryTag为奇数消息被拒收重新入队deliveryTag变成偶数被签收。 4 RabbitMQ延时队列实现定时任务
4.1 为什么不使用定时任务 定时任务的时效性问题 场景订单30min中未支付关闭订单 出现问题定时任务在0min中扫描的时候没有订单未支付1min中后下订单定时任务第二次执行的时候30min这时下订单29min中未支付订单不会被关闭到定时任务第三次执行的时候60min才能关闭未支付的订单订单从创建到关闭花费了302959min不满足订单30min中未支付关闭订单。 4.2 使用场景 4.3 消息的存活时间TTL(Time To Live)
消息的TTL就是消息的存活时间。RabbitMQ可以对队列和消息分别设置TTL。 对队列设置就是队列没有消费者连着的保留时间也可以对每一个单独的消息做单独的设置。超过了这个时间我们认为这个消息就死了称之为死信。如果队列设置了消息也设置了那么会取小的。所以一个消息如果被路由到不同的队列中这个消息死亡的时间有可能不一样不同的队列设置。这里单讲单个消息的TTL因为它才是实现延时任务的关键。可以通过设置消息的expiration字段或者x-message-ttl属性来设置时间两者是一样的效果。
4.4 DXL和死信队列 DXL(Dead Letter Exchanges)即死信交换机它其实就是一个正常的交换机能够与任何队列绑定。死信队列是指队列(正常)上的消息(过期)变成死信后能够发送到另外一个交换机(DLX)然后被路由到一个队列上这个队列就是死信队列。成为死信一般有以下几种情况 1消息被拒收basic.reject/basic.nackrequeuefalse; 2消息的TTL到了消息过期 3队列长度限制被超越队列满了。 4.5 延时队列实现 1. 延时队列实现有两种方式设置队列过期时间、设置消息过期时间实现延时队列。 2. 建议使用设置队列过期时间实现延时队列因为设置消息过期时间MQ是惰性检查。比如第一个进来的消息过期时间是5min第二个进来的消息过期时间是2minMQ会先检查第一个消息等第一个消息进入死信队列再去检查第二个消息这时第二个消息已经死3min了。 4.5.1 设置队列过期时间实现延时队列 4.5.2 设置消息过期时间实现延时队列 4.6 延时队列定时关单模拟
以下单为例设置队列过期时间实现延时队列。
4.6.1 实现方式
4.6.1.1 基础版
交换机与队列一一对应一台路由器路由一个队列。 给队列user.order.delay.queue死信队列设置了三个参数 x-dead-letter-exchangeuser.order.exchange 死信交换机x-dead-letter-rounting-keyorder死信路由键x-message-ttl60000队列里面所有消息存活时间60000ms 4.6.1.2 升级版
一个微服务模块配置一个交换机一个交换机绑定多个队列。 4.6.2 实现
4.6.2.1 创建Queue、Exchange、Binding方式 容器中的 Binding、Queue、Exchange都会自动创建RabbitMQ中没有的情况 第一次发送消息【使用队列】的时候创建交换机、队列、绑定关系RabbitMQ中没有交换机、队列才会创建RabbitMQ中只要有属性发生变化也不会覆盖。 gulimall-order/src/main/java/com/wen/gulimall/order/config/MyMQConfig.java import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** author W* createDate 2024/02/21 14:39* description: 创建 Exchange 、 Queue、 Binding*/
Configuration
public class MyMQConfig {//RabbitListener(queues order.release.order.queue)//public void listen(Message message, Channel channel, OrderEntity orderEntity) throws IOException {// System.out.println(收到过期订单消息准备关闭订单-------orderEntity);// // 确认收到消息// channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);//}/*** 容器中的 Binding、Queue、Exchange都会自动创建RabbitMQ没有的情况* 延时队列* RabbitMQ中只要有。属性发生变化也不会覆盖。* return*/Beanpublic Queue orderDelayQueue(){MapString, Object arguments new HashMap();//x-dead-letter-exchange: order-event-exchange//x-dead-letter-routing-key: order.release.order//x-message-ttl: 60000arguments.put(x-dead-letter-exchange,order-event-exchange);arguments.put(x-dead-letter-routing-key,order.release.order);arguments.put(x-message-ttl,60000);// String name, 【队列名称】// boolean durable, 【是否持久化】// boolean exclusive, 【是否排它】// boolean autoDelete,【是否自动删除】// Nullable MapString, Object arguments 【自定义参数死信路由、死信路由键、消息存活时间】Queue queue new Queue(order.delay.queue,true,false,false,arguments);return queue;}/*** 死信队列* return*/Beanpublic Queue orderReleaseOrderQueue(){Queue queue new Queue(order.release.order.queue,true,false,false);return queue;}/*** 普通路由/死信路由* return*/Beanpublic Exchange orderEventExchange(){// String name, boolean durable, boolean autoDelete, MapString, Object argumentsTopicExchange topicExchange new TopicExchange(order-event-exchange,true,false);return topicExchange;}/*** 交换机与延时队列的绑定* return*/Beanpublic Binding orderCreateOrder(){// String destination, 【目的地】// DestinationType destinationType, 【目的地类型queue、exchange】// String exchange,// String routingKey,// Nullable MapString, Object argumentsreturn new Binding(order.delay.queue,Binding.DestinationType.QUEUE,order-event-exchange,order.create.order,null);}/*** 死信交换机与死信队列的绑定* return*/Beanpublic Binding orderReleaseOrder(){return new Binding(order.release.order.queue,Binding.DestinationType.QUEUE,order-event-exchange,order.release.order,null);}
} 启动订单服务后给延时队列发送消息后相关代码如下创建队列和交换机以及绑定关系如下 4.6.2.2 发送消息相关代码 gulimall-order/src/main/java/com/wen/gulimall/order/web/HelloController.java Controller
public class HelloController {Resourceprivate RabbitTemplate rabbitTemplate;ResponseBodyGetMapping(/test/createOrder)public String createOrderTest(){OrderEntity orderEntity new OrderEntity();orderEntity.setOrderSn(UUID.randomUUID().toString());orderEntity.setCreateTime(new Date());rabbitTemplate.convertAndSend(order-event-exchange,order.create.order,orderEntity);return ok;}
}4.6.2.3 监听接收消息相关代码 gulimall-order/src/main/java/com/wen/gulimall/order/config/MyMQConfig.java Configuration
public class MyMQConfig {RabbitListener(queues order.release.order.queue)public void listen(Message message, Channel channel, OrderEntity orderEntity) throws IOException {System.out.println(当前时间new Date() 收到过期订单消息准备关闭订单-------orderEntity);// 确认收到消息channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}...// 创建交换机 队列 绑定
}
4.6.3 测试
启动订单服务请求 http://order.gulimall.com/test/createOrder 给延时队列(order.delay.queue) 发送消息。
一分钟后死信队列(order.release.order.queue) 接收到延时队列(order.delay.queue)中过期的消息(即死信)消费者监听队列(order.release.order.queue)消费信息控制台打印如下 当前时间Thu Feb 22 15:16:20 CST 2024收到过期订单消息准备关闭订单-------OrderEntity(idnull, memberIdnull, orderSn0d784fd2-6c82-48cc-aa2c-e573bf8de1d1, couponIdnull, createTimeThu Feb 22 15:15:20 CST 2024, memberUsernamenull, totalAmountnull, payAmountnull, freightAmountnull, promotionAmountnull, integrationAmountnull, couponAmountnull, discountAmountnull, payTypenull, sourceTypenull, statusnull, deliveryCompanynull, deliverySnnull, autoConfirmDaynull, integrationnull, growthnull, billTypenull, billHeadernull, billContentnull, billReceiverPhonenull, billReceiverEmailnull, receiverNamenull, receiverPhonenull, receiverPostCodenull, receiverProvincenull, receiverCitynull, receiverRegionnull, receiverDetailAddressnull, notenull, confirmStatusnull, deleteStatusnull, useIntegrationnull, paymentTimenull, deliveryTimenull, receiveTimenull, commentTimenull, modifyTimenull) 一分钟后死信队列(order.release.order.queue) 接收到延时队列(order.delay.queue)中过期的消息(即死信)如果没有消费者监听接收消息消息在死信队列(order.release.order.queue)中如下