晋城市公用事业建设局网站,网络开发公司,网站开发实例解析,帝国 织梦 wordpress延迟队列与SpringBoot实战
概念
延时队列,队列内部是有序的#xff0c;最重要的特性就体现在它的延时属性上#xff0c;延时队列中的元素是希望在指定时间到了以后或之前取出和处理#xff0c;简单来说#xff0c;延时队列就是用来存放需要在指定时间被处理的元素的队列 …延迟队列与SpringBoot实战
概念
延时队列,队列内部是有序的最重要的特性就体现在它的延时属性上延时队列中的元素是希望在指定时间到了以后或之前取出和处理简单来说延时队列就是用来存放需要在指定时间被处理的元素的队列
TTL介绍
TTL 是什么呢TTL 是 RabbitMQ 中一个消息或者队列的属性表明一条消息或者该队列中的所有消息的最大存活时间单位是毫秒。换句话说如果一条消息设置了 TTL 属性或者进入了设置TTL 属性的队列那么这条消息如果在TTL 设置的时间内没有被消费则会成为死信。如果同时配置了队列的TTL 和消息的TTL那么较小的那个值将会被使用有两种方式设置 TTL。
设置TTL 消息设置TTL rabbitTemplate.convertAndSend(X, XC, message ttl: ttl, msg - {msg.getMessageProperties().setExpiration(ttl);return msg;});队列设置TTL args.put(x-message-ttl,15000);
QueueBuilder.durable(QUEUE_B).withArguments(args).build();如果不设置 TTL表示消息永远不会过期如果将 TTL 设置为 0则表示除非此时可以直接投递该消息到消费者否则该消息将会被丢弃
代码实战
配置POM
dependencies!--RabbitMQ 依赖--dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdscopetest/scope/dependencydependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion1.2.47/version/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactId/dependency!--swagger--dependencygroupIdio.springfox/groupIdartifactIdspringfox-swagger2/artifactIdversion2.9.2/version/dependencydependencygroupIdio.springfox/groupIdartifactIdspringfox-swagger-ui/artifactIdversion2.9.2/version/dependency!--RabbitMQ 测试依赖--dependencygroupIdorg.springframework.amqp/groupIdartifactIdspring-rabbit-test/artifactIdscopetest/scope/dependency/dependencies配置application
spring.rabbitmq.host192.168.31.232
spring.rabbitmq.port5672
spring.rabbitmq.usernameadmin
spring.rabbitmq.passwordadmin配置Swagger
package com.vmware.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;Configuration
EnableSwagger2
public class SwaggerConfig {Beanpublic Docket webApiConfig() {return new Docket(DocumentationType.SWAGGER_2).groupName(webApi).apiInfo(webApiInfo()).select().build();}private ApiInfo webApiInfo() {return new ApiInfoBuilder().title(rabbitmq 接口文档).description(本文档描述了 rabbitmq 微服务接口定义).version(1.0).contact(new Contact(name, url,email)).build();}
}代码架构图
创建两个队列 QA 和 QB两者队列 TTL 分别设置为 10S 和 40S然后在创建一个交换机 X 和死信交换机 Y它们的类型都是direct创建一个死信队列 QD它们的绑定关系如下 RabbitMQ配置类
package com.vmware.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;Configuration
public class RabbitConfig {//普通交换机private static final String X_EXCHANGE X;//死信交换机private static final String Y_EXCHANGE Y;//普通队列Aprivate static final String QUEUE_A QA;//普通队列Bprivate static final String QUEUE_B QB;//普通队列Cprivate static final String QUEUE_C QC;//死信队列Dprivate static final String QUEUE_D QD;Bean(xExchange)public DirectExchange xExchange() {return new DirectExchange(X_EXCHANGE);}Bean(yExchange)public DirectExchange yExchange() {return new DirectExchange(Y_EXCHANGE);}Bean(queueA)public Queue queueA() {MapString, Object args new HashMap();//设置死信交换机args.put(x-dead-letter-exchange, Y_EXCHANGE);//设置死信Routing Keyargs.put(x-dead-letter-routing-key, YD);//设置超时args.put(x-message-ttl, 10000);//构建队列return QueueBuilder.durable(QUEUE_A).withArguments(args).build();}Bean(queueB)public Queue queueB() {MapString, Object args new HashMap();//设置死信交换机args.put(x-dead-letter-exchange, Y_EXCHANGE);//设置死心Routing Keyargs.put(x-dead-letter-routing-key, YD);//设置超时ttlargs.put(x-message-ttl,15000);//构建队列return QueueBuilder.durable(QUEUE_B).withArguments(args).build();}Bean(queueC)public Queue queueC(){MapString,Object argsnew HashMap();//设置死信交换机args.put(x-dead-letter-exchange, Y_EXCHANGE);//设置死信Routing Keyargs.put(x-dead-letter-routing-key, YD);//构建队列return QueueBuilder.durable(QUEUE_C).withArguments(args).build();}Bean(queueD)public Queue queueD(){//构建死信队列Dreturn QueueBuilder.durable(QUEUE_D).build();}//绑定普通交换机和队列ABeanpublic Binding queueABindingX(){return BindingBuilder.bind(queueA()).to(xExchange()).with(XA);}//绑定普通交换机与队列BBeanpublic Binding queueBBindingX(){return BindingBuilder.bind(queueB()).to(xExchange()).with(XB);}//绑定普通交换机与队列CBeanpublic Binding queueCBindingX(){return BindingBuilder.bind(queueC()).to(xExchange()).with(XC);}//绑定死信交换机与死信队列Beanpublic Binding queueDBindingY(){return BindingBuilder.bind(queueD()).to(yExchange()).with(YD);}
}生产者
package com.vmware.controller;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;import java.util.Date;RestController
RequestMapping(/ttl)
Slf4j
public class SendMsgController {Autowiredprivate RabbitTemplate rabbitTemplate;/*** param message 消息* apiNote 生产者代码*/GetMapping(/sendMsg/{message})public void sendMsg(PathVariable String message) {log.info(当前时间:{},发送消息给两个队列:{}, new Date(), message);rabbitTemplate.convertAndSend(X, XA, 消息来自ttl为10秒的队列 message);rabbitTemplate.convertAndSend(X, XB, 消息来自ttl为15秒的队列 message);}GetMapping(/sendMsg/{message}/{ttl})public void sendMsg(PathVariable String message, PathVariable String ttl) {rabbitTemplate.convertAndSend(X, XC, message ttl: ttl, msg - {msg.getMessageProperties().setExpiration(ttl);return msg;});log.info(当前时间:{},发送消息:{}给队列:XCttl:{}, new Date(), message, ttl);}
}
消费者
package com.vmware.consumer;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Date;Slf4j
Component
public class DeadLetterQueueConsumer {RabbitListener(queues {QD})public void receiveD(Message message, Channel channel) {log.info(当前时间:{} 死信队列收到消息:{}, new Date(), message);}
}存在的问题
当生产者发布消息到延迟队列后消息只能按顺序被消费者消费当某一消息阻塞时间很长时则会导致其他消息一同阻塞不能达到ttl到期优先被延时队列的消费者所消费的效果
优化
下载插件rabbitmq_delayed_message_exchange到rabbit的plugin目录下 官网:https://www.rabbitmq.com/community-plugins.html ubuntu下载方式 cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.2/plugins
sudo wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.8.0/rabbitmq_delayed_message_exchange-3.8.0.ez启用插件 rabbitmq-plugins enable rabbitmq_delayed_message_exchange重启服务 systemctl restart rabbitmq-server安装完成后可以在rabbit交换机页面看到x-delayed-message
基于插件的延时队列代码实战
在我们自定义的交换机中这是一种新的交换类型该类型消息支持延迟投递机制 消息传递后并不会立即投递到目标队列中而是存储在 mnesia(一个分布式数据系统)表中当达到投递时间时才投递到目标队列中
配置延时队列与交换机
package com.vmware.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;Configuration
public class DelayQueueConfig {private static final String DELAY_QUEUE_NAME delayed.queue;private static final String DELAY_EXCHANGE_NAME delayed.exchange;private static final String DELAY_ROUTING_KEY delayed.routingkey;Beanpublic Queue delayQueue(){return new Queue(DELAY_QUEUE_NAME);}Beanpublic CustomExchange delayExchange(){MapString,Object args new HashMap();args.put(x-delayed-type, direct);/*** 1.交换机名称* 2.交换机类型插件类型* 3.是否持久化* 4.是否自动删除*/return new CustomExchange(DELAY_EXCHANGE_NAME,x-delayed-message,true,false,args);}Beanpublic Binding delayQueueBindExchange(){return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(DELAY_ROUTING_KEY).noargs();}
}生产者
package com.vmware.controller;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;import java.util.Date;RestController
RequestMapping(/ttl)
Slf4j
public class SendMsgController {Autowiredprivate RabbitTemplate rabbitTemplate;GetMapping(/sendDelayMsg/{message}/{delayTime})public void sendMsg(PathVariable String message, PathVariable Integer delayTime) {rabbitTemplate.convertAndSend(delayed.exchange, delayed.routingkey, message, msg - {msg.getMessageProperties().setDelay(delayTime);return msg;});log.info(当前时间:{},发送一条延迟{}毫秒的信息给队列 delayed.queue:{}, new Date(), delayTime, message);}
}消费者
package com.vmware.consumer;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Date;Slf4j
Component
public class DeadLetterQueueConsumer {RabbitListener(queues {delayed.queue})public void receiveDelayedQueue(Message message) {String msg new String(message.getBody());log.info(当前时间{},收到延时队列的消息{}, new Date(), msg);}
}实际效果
2022-07-19 23:33:18.021 INFO 23040 --- [nio-8080-exec-4] com.vmware.controller.SendMsgController : 当前时间:Tue Jul 19 23:33:18 CST 2022,发送一条延迟20000毫秒的信息给队列 delayed.queue:哈哈哈
2022-07-19 23:33:23.349 INFO 23040 --- [nio-8080-exec-5] com.vmware.controller.SendMsgController : 当前时间:Tue Jul 19 23:33:23 CST 2022,发送一条延迟2000毫秒的信息给队列 delayed.queue:哈
2022-07-19 23:33:25.332 INFO 23040 --- [ntContainer#0-1] c.v.consumer.DeadLetterQueueConsumer : 当前时间Tue Jul 19 23:33:25 CST 2022,收到延时队列的消息哈
2022-07-19 23:33:37.830 INFO 23040 --- [ntContainer#0-1] c.v.consumer.DeadLetterQueueConsumer : 当前时间Tue Jul 19 23:33:37 CST 2022,收到延时队列的消息哈哈哈可以看到前一条延时消息并没有阻塞到后面的消息 文章转载自: http://www.morning.tdscl.cn.gov.cn.tdscl.cn http://www.morning.tlzbt.cn.gov.cn.tlzbt.cn http://www.morning.fzlk.cn.gov.cn.fzlk.cn http://www.morning.gryzk.cn.gov.cn.gryzk.cn http://www.morning.fktlr.cn.gov.cn.fktlr.cn http://www.morning.nqypf.cn.gov.cn.nqypf.cn http://www.morning.yhywx.cn.gov.cn.yhywx.cn http://www.morning.aowuu.com.gov.cn.aowuu.com http://www.morning.hpxxq.cn.gov.cn.hpxxq.cn http://www.morning.rqhbt.cn.gov.cn.rqhbt.cn http://www.morning.bpmdg.cn.gov.cn.bpmdg.cn http://www.morning.llsrg.cn.gov.cn.llsrg.cn http://www.morning.klcdt.cn.gov.cn.klcdt.cn http://www.morning.jkzjs.cn.gov.cn.jkzjs.cn http://www.morning.jxwhr.cn.gov.cn.jxwhr.cn http://www.morning.cjsrg.cn.gov.cn.cjsrg.cn http://www.morning.rpfpx.cn.gov.cn.rpfpx.cn http://www.morning.synkr.cn.gov.cn.synkr.cn http://www.morning.mydgr.cn.gov.cn.mydgr.cn http://www.morning.smkxm.cn.gov.cn.smkxm.cn http://www.morning.cryb.cn.gov.cn.cryb.cn http://www.morning.xctdn.cn.gov.cn.xctdn.cn http://www.morning.ktrdc.cn.gov.cn.ktrdc.cn http://www.morning.pqchr.cn.gov.cn.pqchr.cn http://www.morning.gwjqq.cn.gov.cn.gwjqq.cn http://www.morning.fdmtr.cn.gov.cn.fdmtr.cn http://www.morning.dljujia.com.gov.cn.dljujia.com http://www.morning.hylbz.cn.gov.cn.hylbz.cn http://www.morning.bwmm.cn.gov.cn.bwmm.cn http://www.morning.mtrz.cn.gov.cn.mtrz.cn http://www.morning.hwnnh.cn.gov.cn.hwnnh.cn http://www.morning.lrmts.cn.gov.cn.lrmts.cn http://www.morning.mpscg.cn.gov.cn.mpscg.cn http://www.morning.mftzm.cn.gov.cn.mftzm.cn http://www.morning.c7630.cn.gov.cn.c7630.cn http://www.morning.qxlhj.cn.gov.cn.qxlhj.cn http://www.morning.fpkpz.cn.gov.cn.fpkpz.cn http://www.morning.yhwmg.cn.gov.cn.yhwmg.cn http://www.morning.tqpnf.cn.gov.cn.tqpnf.cn http://www.morning.lfqtp.cn.gov.cn.lfqtp.cn http://www.morning.spwm.cn.gov.cn.spwm.cn http://www.morning.ftgwj.cn.gov.cn.ftgwj.cn http://www.morning.trrhj.cn.gov.cn.trrhj.cn http://www.morning.fnlnp.cn.gov.cn.fnlnp.cn http://www.morning.ebpz.cn.gov.cn.ebpz.cn http://www.morning.sbdqy.cn.gov.cn.sbdqy.cn http://www.morning.xcfmh.cn.gov.cn.xcfmh.cn http://www.morning.xcnwf.cn.gov.cn.xcnwf.cn http://www.morning.sqtsl.cn.gov.cn.sqtsl.cn http://www.morning.hjjkz.cn.gov.cn.hjjkz.cn http://www.morning.ksggr.cn.gov.cn.ksggr.cn http://www.morning.dtnzk.cn.gov.cn.dtnzk.cn http://www.morning.wnhgb.cn.gov.cn.wnhgb.cn http://www.morning.ztqyj.cn.gov.cn.ztqyj.cn http://www.morning.hwnnm.cn.gov.cn.hwnnm.cn http://www.morning.dthyq.cn.gov.cn.dthyq.cn http://www.morning.kklwz.cn.gov.cn.kklwz.cn http://www.morning.rnfn.cn.gov.cn.rnfn.cn http://www.morning.ryspp.cn.gov.cn.ryspp.cn http://www.morning.qbwyd.cn.gov.cn.qbwyd.cn http://www.morning.nkjnr.cn.gov.cn.nkjnr.cn http://www.morning.zbnts.cn.gov.cn.zbnts.cn http://www.morning.ttvtv.cn.gov.cn.ttvtv.cn http://www.morning.gktds.cn.gov.cn.gktds.cn http://www.morning.nggbf.cn.gov.cn.nggbf.cn http://www.morning.cszbj.cn.gov.cn.cszbj.cn http://www.morning.jxwhr.cn.gov.cn.jxwhr.cn http://www.morning.lkthj.cn.gov.cn.lkthj.cn http://www.morning.kflbf.cn.gov.cn.kflbf.cn http://www.morning.simpliq.cn.gov.cn.simpliq.cn http://www.morning.kwrzg.cn.gov.cn.kwrzg.cn http://www.morning.ggtgl.cn.gov.cn.ggtgl.cn http://www.morning.wdxr.cn.gov.cn.wdxr.cn http://www.morning.xbckm.cn.gov.cn.xbckm.cn http://www.morning.khdw.cn.gov.cn.khdw.cn http://www.morning.btpll.cn.gov.cn.btpll.cn http://www.morning.kqpq.cn.gov.cn.kqpq.cn http://www.morning.rywn.cn.gov.cn.rywn.cn http://www.morning.sgnxl.cn.gov.cn.sgnxl.cn http://www.morning.splcc.cn.gov.cn.splcc.cn