wordpress影视主题模板免费下载,seo外包顾问,线上推广方式和介绍,怎么用网网站模板做网站目录
一、引言
二、优化方案
三、核心代码实现
3.1. 生产者代码
3.2. 消息处理器
3.3. 自定义多延迟消息封装类
3.4. 订单实体类
3.5. 消费者代码
四、运行效果 一、引言
上一章节我们提到#xff0c;直接使用延迟插件#xff0c;创建一个延迟指定时间的消息直接使用延迟插件创建一个延迟指定时间的消息如10分钟并不是最好的解决方案因为假如我们的订单是在5分钟支付的那么剩余的5分钟时间RabbitMQ中延迟消息时钟还是一直占用着资源。如果有大量的延迟消息那么对于服务来说压力是很大的同时会耗费庞大昂贵的资源。因此本章节我们就来近一步对延迟插件的消息进行优化。
我们通过下面的流程图来做近一步分析 1. 用户下单完成后发送15分钟延迟消息在15分钟后接收消息检查支付状态 2. 已支付更新订单状态为已支付 3. 未支付更新订单状态为关闭订单恢复商品库存 常规延迟插件消息使用的弊端总结 1. 设置30分钟后检测订单支付状态实现起来非常简单但是存在两个问题 2. 如果并发较高30分钟可能堆积消息过多对MQ压力很大 3. 大多数订单在下单后1分钟内就会支付但是却需要在MQ内等待30分钟浪费资源 二、优化方案
如下图所示我们可以将10分钟甚至30分钟拆分成多份零散的较短的时间。 消息初次发送的延迟时间设定为10s10s过后如果订单还是未支付状态我们判断延迟时间数组里还有没有剩余延迟时间如果有则继续发送延迟消息时间设定为数组中的第二个时间10s直到订单支付成功终止循环或是最后一份时间消耗完依然未支付我们取消订单。 三、核心代码实现
3.1. 生产者代码
package com.example.publisher;import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;/*** 生产者*/
Slf4j
SpringBootTest
class PublisherApplicationTests {Resourceprivate RabbitTemplate rabbitTemplate;Testvoid test() {Order order Order.builder().orderId(1L).content(生活不易所以保持足够的努力对自己要有信心积极地去面对工作生活的挑战).build();MultiDelayMessageOrder msg MultiDelayMessage.of(order, 1000L, 5000L, 2000L, 10000L);rabbitTemplate.convertAndSend(delay.direct, delay, msg, new MessagePostProcessor() {Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setDelayLong(msg.removeNextDelay());return message;}});}
}3.2. 消息处理器
package com.example.publisher;import lombok.AllArgsConstructor;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;/*** 消息请求处理器*/
AllArgsConstructor
public class DelayMessageProcessor implements MessagePostProcessor {private final Long delay;Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setDelayLong(delay);return message;}
}3.3. 自定义多延迟消息封装类
package com.example.publisher;import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.util.CollectionUtils;import java.io.Serializable;
import java.util.List;/*** 自定义的多延时消息封装类* param T*/
Data
NoArgsConstructor
public class MultiDelayMessageT implements Serializable {/*** 消息体*/private T data;/*** 记录延迟时间的集合*/private ListLong delayMillis;public MultiDelayMessage(T data, ListLong delayMillis) {this.data data;this.delayMillis delayMillis;}public static T MultiDelayMessageT of(T data, Long...delayMillis) {return new MultiDelayMessage(data, (ListLong) CollectionUtils.arrayToList(delayMillis));}/*** 获取并移除下一个延迟时间* return 队列中的第一个延迟时间*/public Long removeNextDelay() {return delayMillis.remove(0);}/*** 是否还有下一个延迟时间* return*/public boolean hasNextDelay() {return !delayMillis.isEmpty();}
}3.4. 订单实体类
package com.example.publisher;import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;/*** 订单类 * 此处为了演示将真实业务中的订单类做了简化* 只包含一个订单ID和自定义消息内容*/
Data
AllArgsConstructor
NoArgsConstructor
Builder
public class Order {private Long orderId;private String content;
}3.5. 消费者代码
package com.example.consumer;import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;/*** 消费者* 因为作为演示所以商城支付、订单、及扣减库存的业务代码已注释* 注释中保留了整个商城下单支付扣减库存的流程步骤*/
Slf4j
Component
public class SimpleListener {Resourceprivate RabbitTemplate rabbitTemplate;RabbitListener(bindings QueueBinding(value Queue(name delay.queue, durable true),exchange Exchange(name delay.direct, delayed true),key delay))public void listener(MultiDelayMessageOrder msg) throws Exception {System.out.println(((Order)msg.getData()).getContent());// 1. 查询订单状态// Order order orderService.getById(msg.getData())// 2. 判断是否已支付
// if (Order null || order.status 2) {
// 订单不存在或者已处理则直接返回
// return;
// }// 主动去支付服务查询真正的支付状态
// PayOrder payOrder payService.getById(order.getId());// 2.1. 已支付则标记订单为已支付
// if (payOrder.isPay()) {
// orderService.markOrderPaySuccess(order.getId());
// return;
// }// 2.2. 未支付获取下次订单延迟时间// 3. 判断是否存在延迟时间if (msg.hasNextDelay()) {// 3.1 存在重发延迟消息Long nextDelay msg.removeNextDelay();rabbitTemplate.convertAndSend(delay.direct, delay, msg, new MessagePostProcessor() {Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setDelayLong(nextDelay);return message;}});return;}// 3.2 不存在取消订单
// orderService.lambdaUpdate()
// .set(Order::getStatus, 5);
// .set(Order::getCloseTime, LocalDateTime.now());
// .eq(Order::getId, order.getId())
// .update();// 4. 恢复库存}
}四、运行效果
最终我们会看到每间隔一段时间消费者就会消费一条消息这个间隔时间就是我们设定的分段时间数组这么做就能极大地减少资源消耗和服务的压力