dede 网站目录,谷歌浏览器下载安装2023最新版,网站可以称为系统吗,建站服务网络公司文章目录
1.场景描述 1.1 场景11.2 场景2 2.原理3.实战开发 3.1 建表3.2 集成mybatis-plus3.3 集成RabbitMq 3.3.1 安装mq3.3.2 springBoot集成mq 3.4 具体实现 3.4.1 mq配置类3.4.2 生产者3.4.3 消费者
1.场景描述
消息中间件是分布式系统常用的组件#xff0c;无论是异…文章目录
1.场景描述 1.1 场景11.2 场景2 2.原理3.实战开发 3.1 建表3.2 集成mybatis-plus3.3 集成RabbitMq 3.3.1 安装mq3.3.2 springBoot集成mq 3.4 具体实现 3.4.1 mq配置类3.4.2 生产者3.4.3 消费者
1.场景描述
消息中间件是分布式系统常用的组件无论是异步化、解耦、削峰等都有广泛的应用价值。我们通常会认为消息中间件是一个可靠的组件——这里所谓的可靠是指只要我把消息成功投递到了消息中间件消息就不会丢失即消息肯定会至少保证消息能被消费者成功消费一次这是消息中间件最基本的特性之一也就是我们常说的“AT LEAST ONCE”即消息至少会被“成功消费一遍”。
1.1 场景1
什么意思呢举个例子一个消息M发送到了消息中间件消息投递到了消费程序AA接受到了消息然后进行消费但在消费到一半的时候程序重启了这时候这个消息并没有标记为消费成功这个消息还会继续投递给这个消费者直到其消费成功了消息中间件才会停止投递。 这种情景就会出现消息可能被多次地投递。
1.2 场景2
还有一种场景是程序A接受到这个消息M并完成消费逻辑之后正想通知消息中间件“我已经消费成功了”的时候程序就重启了那么对于消息中间件来说这个消息并没有成功消费过所以他还会继续投递。这时候对于应用程序A来说看起来就是这个消息明明消费成功了但是消息中间件还在重复投递。
以上两个场景对于消息队列来说就是同一个messageId的消息重复投递下来了。
我们利用消息id来判断消息是否已经消费过如果该信息被消费过那么消息表中已经 会有一条数据由于消费时会先执行插入操作此时会因为主键冲突无法重复插入我们就利用这个原理来进行幂等的控制,消息内容可以用json格式来进行传输的。
3.实战开发
3.1 建表
DROP TABLE IF EXISTS message_idempotent;
CREATE TABLE message_idempotent (message_id varchar(50) NOT NULL COMMENT 消息ID,message_content varchar(2000) DEFAULT NULL COMMENT 消息内容,status int DEFAULT 0 COMMENT 消费状态0-未消费成功;1-消费成功,retry_times int DEFAULT 0 COMMENT 重试次数,type int DEFAULT 0 COMMENT 消费类型,PRIMARY KEY (message_id)
) ENGINEInnoDB DEFAULT CHARSETutf8;3.2 集成mybatis-plus
《springBoot集成mybatisPlus》
3.3 集成RabbitMq
3.3.1 安装mq
推荐使用docker安装rabbitmq还未安装的可以参考以下信息
docker安装
3.3.2 springBoot集成mq
1.添加依赖 !-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp --dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependency3.4 生产者具体实现
3.4.1 mq配置类
DirectRabbitConfig 具体如何开启可以参考《rabbitMq实现死信队列》
import org.springframework.amqp.core.\*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;Configuration
public class RabbitmqConfig {//正常交换机的名字public final static String EXCHANGE\_NAME exchange\_name;//正常队列的名字public final static String QUEUE\_NAMEqueue\_name;//死信交换机的名字public final static String EXCHANGE\_DEAD exchange\_dead;//死信队列的名字public final static String QUEUE\_DEADqueue\_dead;//死信路由keypublic final static String DEAD\_KEYdead.key;//创建正常交换机Bean(EXCHANGE\_NAME)public Exchange exchange(){return ExchangeBuilder.topicExchange(EXCHANGE\_NAME)//持久化 mq重启后数据还在.durable(true).build();}//创建正常队列Bean(QUEUE\_NAME)public Queue queue(){//正常队列和死信进行绑定 转发到 死信队列配置参数MapString,ObjectmapgetMap();return new Queue(QUEUE\_NAME,true,false,false,map);}//正常队列绑定正常交换机 设置规则 执行绑定 定义路由规则 requestmaping映射Beanpublic Binding binding(Qualifier(QUEUE\_NAME) Queue queue,Qualifier(EXCHANGE\_NAME) Exchange exchange){return BindingBuilder.bind(queue).to(exchange)//路由规则.with(app.#).noargs();}//创建死信队列Bean(QUEUE\_DEAD)public Queue queueDead(){return new Queue(QUEUE\_DEAD,true,false,false);}//创建死信交换机Bean(EXCHANGE\_DEAD)public Exchange exchangeDead(){return ExchangeBuilder.topicExchange(EXCHANGE\_DEAD).durable(true) //持久化 mq重启后数据还在.build();}//绑定死信队列和死信交换机Beanpublic Binding deadBinding(){return BindingBuilder.bind(queueDead()).to(exchangeDead())//路由规则 正常路由key.with(DEAD\_KEY).noargs();}/\*\*获取死信的配置信息\*\*\*/public MapString,ObjectgetMap(){//3种方式 任选其一,选择其他方式之前,先把交换机和队列删除了,在启动项目,否则报错。//方式一MapString,Object mapnew HashMap(16);//死信交换器名称过期或被删除因队列长度超长或因空间超出阈值的消息可指定发送到该交换器中map.put(x-dead-letter-exchange, EXCHANGE\_DEAD);//死信消息路由键在消息发送到死信交换器时会使用该路由键如果不设置则使用消息的原来的路由键值map.put(x-dead-letter-routing-key, DEAD\_KEY);//方式二//消息的过期时间单位毫秒达到时间 放入死信队列// map.put(x-message-ttl,5000);//方式三//队列最大长度超过该最大值则将从队列头部开始删除消息放入死信队列一条数据// map.put(x-max-length,3);return map;}}延迟队列配置 具体如何开启可以参考《rabbitMq实现死信队列》
由于rabbitMq中不直接支持死信队列需要我们利用插件rabbitmq_delayed_messgae_exchage进行开启
/*** 定义延迟交换机*/
Configuration
public class RabbitMQDelayedConfig {//队列private static final String DELAYQUEUE delayedqueue;//交换机private static final String DELAYEXCHANGE delayedExchange;Beanpublic Queue delayqueue(){return new Queue(DELAYQUEUE);}//自定义延迟交换机Beanpublic CustomExchange delayedExchange(){MapString, Object arguments new HashMap();arguments.put(x-delayed-type,direct);/*** 1、交换机名称* 2、交换机类型* 3、是否需要持久化* 4、是否需要自动删除* 5、其他参数*/return new CustomExchange(DELAYEXCHANGE,x-delayed-message,true,false,arguments);}//绑定队列和延迟交换机Beanpublic Binding delaybinding(){return BindingBuilder.bind(delayqueue()).to(delayedExchange()).with(sectest).noargs();}
}3.4.2 生产者
1.消费队列的生产者
import com.example.shop.config.RabbitmqConfig;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.UUID;Component
public class Sender_Direct {Autowiredprivate AmqpTemplate rabbitTemplate;/*** 用于消费订单** param orderId*/public void send2Direct(String orderId) {//创建消费对象并指定全局唯一ID(这里使用UUID也可以根据业务规则生成只要保证全局唯一即可)MessageProperties messageProperties new MessageProperties();rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_NAME, RabbitmqConfig.ROUTING_KEY, 内容设置, message - {//设置消息的id为唯一messageProperties.setMessageId(UUID.randomUUID().toString());messageProperties.setContentType(text/plain);messageProperties.setContentEncoding(utf-8);message.getMessageProperties().setMessageId(orderId);return message;});}}
3.4.3 消费者
1.开启手动ack配置
spring:application:name: shoprabbitmq:host: 192.168.1.102port: 5673virtual-host: /username: guestpassword: guestlistener:simple:# 表示消费者消费成功消息以后需要手工的进行签收ack确认默认为 autoacknowledge-mode: manual消费者要配置ack重试机制具体参考前几篇文章使用的是mysql消息ID的唯一性有时候可能生成一样的订单具体的没有进行实验内容是json生成的可以执行业务
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.example.des.Bean.MessageIdempotent;
import com.example.des.Bean.Shop;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.io.IOException;Component
public class Receiver_Direct {private static final Integer delayTimes 30;//延时消费时间,单位秒Autowiredprivate RabbitTemplate rabbitTemplate;RabbitListener(queues {smsQueue})public void receiveD(Message message, Channel channel) throws IOException {try {// 获取消息IdString messageId message.getMessageProperties().getMessageId();String msg new String(message.getBody());//获取消息//向数据库插入数据MessageIdempotent messageIdempotent new MessageIdempotent();messageIdempotent.setMessageId(messageId);messageIdempotent.setMessageContent(msg);messageIdempotent.setRetryTimes(0);System.out.println(messageIdempotent.toString());Boolean save true; //设置保存成功,消息投递失败是在确认模式那里if (!save) {//说明属于重重复请求//1、处理消息内容的业务,解析json数据//2、创建订单,并保存Boolean flag consumeOrder(new Shop());if (flag){//投入延迟队列如果30分钟订单还没有消费就删除订单rabbitTemplate.convertAndSend(delayedExchange,sectest,message,message1-{//设置发送消息的延长时间 单位ms,表示30分钟message1.getMessageProperties().setDelay(1000*60*30);return message1;});//更新消息状态消费成功,channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}else {//延迟投入死信进行重试channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);}} else {//1、处理消息内容的业务,解析json数据//2、创建订单,并保存//投入死信队列channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);}}catch (Exception e){System.out.println(错误信息);}}private boolean consumeOrder(Shop shop) {return true;}RabbitListener(queues { delay.queue.demo.delay.queue})public void dead(String payload, Message message, Channel channel) throws IOException {System.out.println(死信队列:payload);//删除消息 将数据库状态更新为失败更新邮件或者消息通知,有时候可以人工消费long deliveryTagmessage.getMessageProperties().getDeliveryTag();channel.basicAck(deliveryTag,true);}RabbitListener(queues delayedqueue)public void receivemsg(Message messages){//查询有没有被消费也就是更新成功有时候需要乐观锁}
}至此mq的消息重复以及幂等的信息处理就很完美的解决了当然本文以数据库为例进行实现感兴趣的可以尝试使用redis来进行实现