信宜网站建设,网站被k的迹象,wordpress批量发布工具,网站建设销售招聘目录一、创建消费者二、创建订单链路配置2.1 定义RabbitMQ配置类2.2 创建RabbitmqOrderConfig配置类三、如何实现RabbitMQ重复投递机制3.1 开启发送者消息确认模式3.2 消费发送确认3.2.1 创建ConfirmCallBack确认模式3.2.2 创建ReturnCallBack退回模式3.3 创建生产者3.4 创建消…
目录一、创建消费者二、创建订单链路配置2.1 定义RabbitMQ配置类2.2 创建RabbitmqOrderConfig配置类三、如何实现RabbitMQ重复投递机制3.1 开启发送者消息确认模式3.2 消费发送确认3.2.1 创建ConfirmCallBack确认模式3.2.2 创建ReturnCallBack退回模式3.3 创建生产者3.4 创建消费者手动消费3.5 启动测试3.6 踩坑日记3.6.1 异常点一RabbitListener3.6.2 手动确认消息3.6.3 消息格式3.6.4 消息不确认3.6.5 消息无限投递3.6.6 重复消费四、秒杀业务优化4.1 修改秒杀订单生成方式4.2 消费者监听器完成秒杀订单生成测试一、创建消费者
第1步基于Spring Initialzr方式创建zmall-rabbitmq消费者模块
第2步在公共模块中添加rabbitmq相关依赖
!--rabbitmq--
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId
/dependency第3步配置子模块zmall-rabbitmq的pom.xml引入公共模块zmall-common
dependenciesdependencygroupIdcom.zking.zmall/groupIdartifactIdzmall-common/artifactIdversion1.0-SNAPSHOT/version/dependency
/dependencies第4步配置父模块的pom.xml添加子模块zmall-rabbitmq
modulesmodulezmall-common/module...modulezmall-rabbitmq/module
/modules第5步配置application.yml
server:port: 8050
spring:application:name: zmall-rabbitmqdatasource:#type连接池类型 DBCP,C3P0,Hikari,Druid,默认为Hikaritype: com.zaxxer.hikari.HikariDataSourcedriver-class-name: com.mysql.jdbc.Driverurl: jdbc:mysql://localhost:3306/zmall?characterEncodingutf8useSSLfalseserverTimezoneAsia/ShanghairewriteBatchedStatementstrueusername: rootpassword: 1234cloud:nacos:config:server-addr: localhost:8848redis:host: 127.0.0.1port: 6379password: 123456jedis:pool:max-active: 100max-wait: 10max-idle: 10min-idle: 10database: 0rabbitmq:host: 43.143.169.245port: 5672username: adminpassword: adminvirtual-host: my_vhost# 发送者开启 confirm 确认机制publisher-confirm-type: correlated# 发送者开启 return 确认机制publisher-returns: true# 设置消费端手动 acklistener:simple:#手动应答acknowledge-mode: manual#消费端最小并发数concurrency: 5#消费端最大并发数max-concurrency: 10#一次请求中预处理的消息数量prefetch: 5# 是否支持重试retry:#启用消费重试enabled: true#重试次数max-attempts: 3#重试间隔时间initial-interval: 3000cache:channel:#缓存的channel数量size: 50
#mybatis-plus配置
mybatis-plus:#所对应的 XML 文件位置mapper-locations: classpath*:/mapper/*Mapper.xml#别名包扫描路径type-aliases-package: com.zking.zmall.modelconfiguration:#驼峰命名规则map-underscore-to-camel-case: true
#日志配置
logging:level:com.zking.zmall.mapper: debug消费者采用的是手动消费模式请注意设置spring.rabbitmq.listener.simple.acknowledge-modemanual 第6步配置启动类
EnableFeignClients
EnableDiscoveryClient
MapperScan({com.zking.zmall.mapper,com.zking.zmallrabbitmq.mapper})
SpringBootApplication
public class ZmallRabbitmqApplication {public static void main(String[] args) {SpringApplication.run(ZmallRabbitmqApplication.class, args);}}二、创建订单链路配置
2.1 定义RabbitMQ配置类
定义RabbitMQ配置类设置生产者发送数据时自动转换成JSON设置消费者获取消息自动转换成JSON。
Configuration
public class RabbitmqConfig {Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate template new RabbitTemplate(connectionFactory);template.setMessageConverter(new Jackson2JsonMessageConverter());return template;}Beanpublic SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setMessageConverter(new Jackson2JsonMessageConverter());return factory;}
}设置RabbitTemplate消息转换模式为Jackson2JsonMessageConverter 设置RabbitMQ消费者监听器的的消息转换模式为Jackson2JsonMessageConverter 2.2 创建RabbitmqOrderConfig配置类
创建RabbitmqOrderConfig配置类增加订单队列、交换机及绑定关系。
Configuration
public class RabbitmqOrderConfig {public static final String ORDER_QUEUEorder-queue;public static final String ORDER_EXCHANGEorder-exchange;public static final String ORDER_ROUTING_KEYorder-routing-key;Beanpublic Queue orderQueue(){return new Queue(ORDER_QUEUE,true);}Beanpublic DirectExchange orderExchange(){return new DirectExchange(ORDER_EXCHANGE,true,false);}Beanpublic Binding orderBinding(){return BindingBuilder.bind(orderQueue()).to(orderExchange()).with(ORDER_ROUTING_KEY);}
}三、如何实现RabbitMQ重复投递机制
3.1 开启发送者消息确认模式
配置application.yml开启发送者confirm确认机制和return确认机制
spring:rabbitmq:# 发送者开启 confirm 确认机制publisher-confirm-type: correlated# 发送者开启 return 确认机制publisher-returns: true3.2 消费发送确认
rabbitmq 的消息确认分为两部分发送消息确认 和 消息接收确认 发送消息确认用来确认生产者 producer 将消息发送到 broker broker 上的交换机 exchange 再投递给队列 queue的过程中消息是否成功投递。
消息从 producer 到 rabbitmq broker有一个 confirmCallback 确认模式。
消息从 exchange 到 queue 投递失败有一个 returnCallback 退回模式。
我们可以利用这两个Callback来确保消息100%送达。 Broker简单来说,就是一个消息队列服务器实体。 3.2.1 创建ConfirmCallBack确认模式
Slf4j
Component
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {/**** param correlationData 对象内部只有一个 id 属性用来表示当前消息的唯一性* param ack 消息投递到broker 的状态true表示成功* param cause 表示投递失败的原因*/Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (!ack) {log.error(消息发送异常!);} else {log.info(发送者已经收到确认ack{}, cause{},ack, cause);}}
}3.2.2 创建ReturnCallBack退回模式
Slf4j
Component
public class ReturnCallbackService implements RabbitTemplate.ReturnCallback {/**** param message 消息体* param replyCode 响应code* param replyText 响应内容* param exchange 交换机* param routingKey 路由键*/Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {log.info(returnedMessage replyCode{} ,replyText{} ,exchange{} ,routingKey{}, replyCode, replyText, exchange, routingKey);}
}3.3 创建生产者
创建生产者模拟发送消息
package com.zking.zmallrabbitmq.controller;import com.zking.zmall.model.Order;
import com.zking.zmallrabbitmq.component.ConfirmCallbackService;
import com.zking.zmallrabbitmq.component.ReturnCallbackService;
import com.zking.zmallrabbitmq.config.RabbitmqOrderConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.Date;RestController
public class ProducerController {Autowiredprivate RabbitTemplate rabbitTemplate;Autowiredprivate ReturnCallbackService returnCallbackService;Autowiredprivate ConfirmCallbackService confirmCallbackService;RequestMapping(/sendMessage)public void sendMessage(){Order ordernew Order();order.setId(1);order.setUserId(2);order.setLoginName(zhangsan);order.setUserAddress(长沙);order.setCreateTime(new Date());order.setCost(120.0F);order.setSerialNumber(123);order.setState(0);//ConfirmCallback确认模式rabbitTemplate.setConfirmCallback(confirmCallbackService);//ReturnCallback退回模式rabbitTemplate.setReturnCallback(returnCallbackService);rabbitTemplate.convertAndSend(RabbitmqOrderConfig.ORDER_EXCHANGE,RabbitmqOrderConfig.ORDER_ROUTING_KEY,order);}
}3.4 创建消费者手动消费
Slf4j
Component
public class OrderConsumerListener {//最大重试次数private static final Integer MAX_RECONSUME_COUNT3;//用于记录消息重试次数的集合可以采用Redis方式实现private static MapString,Integer retryMapnew HashMap();RabbitHandlerRabbitListener(queues {order-queue},ackMode MANUAL)public void recieveMessage(Message message,Order order,Channel channel) throws IOException {//channel内按顺序自增的消息IDlong deliverTag message.getMessageProperties().getDeliveryTag();try {System.out.println(接收到消息:message,消息内容: JSON.toJSONString(order));//模拟异常开始消息重试int i 1/0;} catch (Exception e) {e.printStackTrace();String msgId (String) message.getMessageProperties().getHeaders().get(spring_returned_message_correlation);Integer retryCount retryMap.get(msgId)null?1:retryMap.get(msgId);log.info(即将开始第{}次消息重试....,retryCount);if(retryCountMAX_RECONSUME_COUNT){log.info(重试次数达到3次消息被拒绝retryCountretryCount);//此处要注意当重试次数到达3次后将拒绝消息且不在重新入队列channel.basicReject(deliverTag,false);}else{//重新发送消息到队尾//再次发送该消息到消息队列异常消息就放在了消息队列尾部这样既保证消息不会丢失又保证了正常业务的进行。channel.basicPublish(message.getMessageProperties().getReceivedExchange(),message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.MINIMAL_PERSISTENT_BASIC,JSON.toJSONBytes(order));}retryMap.put(msgId,retryCount1);}//成功确认消息非批量模式channel.basicAck(deliverTag, false);}
}3.5 启动测试 从测试结果上来看当消费者监听器出现异常后进入消息重试模式并且设置消息重试次数为3次重试次数达到3次消息被拒绝不再重新投递到队列中。这里只是为了演示消息重试机制并未考虑到后续的消息拒绝之后的处理。 3.6 踩坑日记
3.6.1 异常点一RabbitListener 异常原因RabbitListener作用于类上引发异常 解决方案RabbitListener移至消费者监听器的方法上而RabbitListener只适用于方法级别。 3.6.2 手动确认消息 虽然在消费者端的application.yml中配置手动消费模式但是在服务消费时引发了这个异常错误导致重复消费的问题。原因是使用RabbitListener注解会自动ACK如果方法中再手动ACK会造成重复ACK所以报错解决方式就是在RabbitListener中配置手动消费模式ackMode “MANUAL”。 3.6.3 消息格式 在消费者消费消息时引发异常触发消息重新投递但是由于重新投递时导致消息格式问题引发了消息转换异常。 具体原因通过查看日志发现重新投递的消息格式为text/plain而我们在处理消息时采用的是json方式导致消息转换异常。解决方案将重新发送消息的状态由MessageProperties.PERSISTENT_TEXT_PLAIN更改为MessageProperties.MINIMAL_PERSISTENT_BASIC 3.6.4 消息不确认
这是一个非常没技术含量的坑但却是非常容易犯错的地方。开启消息确认机制消费消息别忘了channel.basicAck否则消息会一直存在导致重复消费。 3.6.5 消息无限投递
最开始接触消息确认机制的时候消费端代码就像下边这样写的思路很简单处理完业务逻辑后确认消息 int a 1 / 0 发生异常后将消息重新投入队列
RabbitHandler
public void recieveMessage(Message message,Order order,Channel channel) throws IOException {//channel内按顺序自增的消息IDlong deliverTag message.getMessageProperties().getDeliveryTag();try {System.out.println(接收到消息:message,消息内容: JSON.toJSONString(order));int i 1 / 0;channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);}
}但是有个问题是业务代码一旦出现 bug 99.9%的情况是不会自动修复一条消息会被无限投递进队列消费端无限执行导致了死循环。 经过测试分析发现当消息重新投递到消息队列时这条消息不会回到队列尾部仍是在队列头部。
消费者会立刻消费这条消息业务处理再抛出异常消息再重新入队如此反复进行。导致消息队列处理出现阻塞导致正常消息也无法运行。
而解决方案是先将消息进行应答此时消息队列会删除该条消息同时我们再次发送该消息到消息队列异常消息就放在了消息队列尾部这样既保证消息不会丢失又保证了正常业务的进行。
//重新发送消息到队尾
//再次发送该消息到消息队列异常消息就放在了消息队列尾部这样既保证消息不会丢失又保证了正常业务的进行。
channel.basicPublish(message.getMessageProperties().getReceivedExchange(),message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.MINIMAL_PERSISTENT_BASIC,JSON.toJSONBytes(order));3.6.6 重复消费
如何保证 MQ 的消费是幂等性这个需要根据具体业务而定可以借助MySQL、或者redis 将消息持久化。 上述代码结构图
四、秒杀业务优化
4.1 修改秒杀订单生成方式
第1步修改zmall-order订单模块的application.yml加入rabbitmq相关配置
spring:rabbitmq:host: 192.168.195.143port: 5672username: adminpassword: adminvirtual-host: my_vhost# 设置消费端手动 acklistener:simple:acknowledge-mode: manual# 是否支持重试retry:enabled: truemax-attempts: 3第2步修改秒杀订单生成方式针对抢购成功的秒杀订单直接推送到RabbitMQ中
TransactionalOverridepublic JsonResponseBody? createKillOrder(User user, Integer pid, Float price) {//判断用户是否登录if(nulluser)throw new BusinessException(JsonResponseStatus.TOKEN_ERROR);//根据秒杀商品ID和用户ID判断是否重复抢购Order order redisService.getKillOrderByUidAndPid(user.getId(), pid);if(null!order)return new JsonResponseBody(JsonResponseStatus.ORDER_REPART);//Redis库存预减long stock redisService.decrement(pid);if(stock0){redisService.increment(pid);return new JsonResponseBody(JsonResponseStatus.STOCK_EMPTY);}//创建订单ordernew Order();order.setUserId(user.getId());order.setLoginName(user.getLoginName());order.setPid(pid);order.setCost(price);//将生成的秒杀订单保存到Redis中redisService.setKillOrderToRedis(pid,order);//将生成的秒杀订单推送到RabbitMQ中的订单队列中rabbitTemplate.convertAndSend(RabbitmqOrderConfig.ORDER_EXCHANGE,RabbitmqOrderConfig.ORDER_ROUTING_KEY,order);return new JsonResponseBody();}4.2 消费者监听器完成秒杀订单生成
第1步将zmall-order订单模块中的service业务处理接口及实现类移至消息者监听器模块。 第2步zmall-rabbitmq模块中在IOrderService及OrderServiceImpl中重新定义生成秒杀订单方法
public interface IOrderService extends IServiceOrder {void saveOrder(Order order);
}package com.zking.zmallrabbitmq.service.impl;import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.zking.zmall.mapper.OrderMapper;
import com.zking.zmall.model.Kill;
import com.zking.zmall.model.Order;
import com.zking.zmall.model.OrderDetail;
import com.zking.zmall.service.IRedisService;
import com.zking.zmall.service.impl.RedisServiceImpl;
import com.zking.zmall.util.SnowFlake;
import com.zking.zmallrabbitmq.service.IOrderService;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;/*** p* 服务实现类* /p** author zking* since 2023-02-05*/
Service
public class OrderServiceImpl extends ServiceImplOrderMapper, Order implements IOrderService {Autowiredprivate KillServiceImpl killService;Autowiredprivate OrderDetailServiceImpl orderDetailService;Autowiredprivate RabbitTemplate rabbitTemplate;TransactionalOverridepublic void saveOrder(Order order) {//2.秒杀商品库存减一boolean flagkillService.update(new UpdateWrapperKill().setSql(totaltotal-1).eq(item_id,order.getPid()).gt(total,0));if(!flag)return;//3.生成秒杀订单及订单项SnowFlake snowFlakenew SnowFlake(2,3);Long orderIdsnowFlake.nextId();int orderIdInt new Long(orderId).intValue();//订单order.setSerialNumber(orderIdInt);this.save(order);//订单项OrderDetail orderDetailnew OrderDetail();orderDetail.setOrderId(orderIdInt);orderDetail.setProductId(order.getPid());orderDetail.setQuantity(1);orderDetail.setCost(order.getCost());orderDetailService.save(orderDetail);}
}第3步修改秒杀订单消费者监听器
Slf4j
Component
public class OrderConsumerListener {//最大重试次数private static final Integer MAX_RECONSUME_COUNT3;//用于记录消息重试次数的集合可以采用Redis方式实现private static MapString,Integer retryMapnew HashMap();Autowiredprivate IOrderService orderService;RabbitHandlerRabbitListener(queues {order-queue},ackMode MANUAL)public void recieveMessage(Message message,Order order,Channel channel) throws IOException {//channel内按顺序自增的消息IDlong deliverTag message.getMessageProperties().getDeliveryTag();try {System.out.println(接收到消息:message,消息内容: JSON.toJSONString(order));//模拟异常开始消息重试//int i 1/0;//保存秒杀订单及订单项orderService.saveOrder(order);} catch (Exception e) {e.printStackTrace();String msgId (String) message.getMessageProperties().getHeaders().get(spring_returned_message_correlation);Integer retryCount retryMap.get(msgId)null?1:retryMap.get(msgId);log.info(即将开始第{}次消息重试....,retryCount);if(retryCountMAX_RECONSUME_COUNT){log.info(重试次数达到3次消息被拒绝retryCountretryCount);//此处要注意当重试次数到达3次后将拒绝消息且不在重新入队列channel.basicReject(deliverTag,false);}else{//重新发送消息到队尾//再次发送该消息到消息队列异常消息就放在了消息队列尾部这样既保证消息不会丢失又保证了正常业务的进行。channel.basicPublish(message.getMessageProperties().getReceivedExchange(),message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.MINIMAL_PERSISTENT_BASIC,JSON.toJSONBytes(order));}retryMap.put(msgId,retryCount1);}//成功确认消息非批量模式channel.basicAck(deliverTag, false);}
}重启jmeter压测并查看测试结果。 注意1目前微服务架构明显拆分存在重复类的现象所以在做微服务架构设计的时候要尽可能避免此问题 如service层中尽量避免调用serviceservice中尽可能只调用Mapper
注意2实际项目开发中对于消息未能正常消费应该设置人工补偿机制
测试