网站推广怎么做比较好,企业宣传手册封面模板,做网站域名后缀选择,格力网站建设首页SpringBoot教程#xff08;十五#xff09; | SpringBoot集成RabbitMq#xff08;死信队列、延迟队列#xff09; #xff08;一#xff09;死信队列使用场景具体用法前提示例: #xff08;二#xff09;延迟队列使用场景方法一#xff1a;通过死亡队列实现方法二十五 | SpringBoot集成RabbitMq死信队列、延迟队列 一死信队列使用场景具体用法前提示例: 二延迟队列使用场景方法一通过死亡队列实现方法二通过延迟消息插件rabbitmq_delayed_message_exchange实现 一死信队列
死信队列是一个重要的概念用于处理那些因各种原因无法被正常消费的消息。 它不是RabbitMQ直接提供的一个现成的方法或工具而是通过特定的配置和机制来实现的。
使用场景
死信队列在多种场景下都非常有用包括但不限于 消息重试机制当消息处理失败时可以将其发送到死信队列进行重试。异常消息处理对于无法被正常处理的异常消息可以将其存储在死信队列中以便后续分析处理。延迟消息处理通过结合消息的TTLTime-To-Live生存时间和死信队列可以实现消息的延迟处理。确保消息不丢失在消息处理过程中如果发生消费者崩溃或网络故障等情况消息可能会丢失。通过死信队列可以确保这些消息得到保留并在系统恢复后重新处理。 具体用法
要在RabbitMQ中设置和使用死信队列通常需要按照以下步骤进行 定义死信交换机DLX首先需要定义一个交换机作为死信交换机它可以是任何类型的交换机如direct、fanout、topic等。配置原队列在声明原队列时需要指定两个参数x-dead-letter-exchange和x-dead-letter-routing-key。前者指定了当消息变成死信时应该发送到的交换机即死信交换机后者指定了发送到该交换机的路由键。声明死信队列接着需要声明一个或多个死信队列并将它们绑定到死信交换机上。这样当死信消息被发送到死信交换机时就可以根据路由键将其路由到相应的死信队列中。处理死信消息最后需要编写消费者代码来监听死信队列中的消息并对这些消息进行相应的处理。 前提 要想进入死信队列得出现异常出现异常后会根据你的配置帮你放到死信队列中 所以异常不要被捕获。 如果实在要捕获的话就得你在消费者这边去做“发送消息的”操作自己把发送过来消息塞到死信队列中 示例:
消费者 mq的yml配置重试机制
spring:rabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guestlistener:simple:# 重试机制retry:enabled: true #是否开启消费者重试配置类
package com.example.reactboot.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;Configuration
public class DirectExchangeConfig {//普通//定义队列的名称常量public static final String DIRECT_QUEUE directQueue;public static final String DIRECT_QUEUE2 directQueue2;//定义直接交换机的名称常量public static final String DIRECT_EXCHANGE directExchange;//定义路由键常量用于交换机和队列之间的绑定public static final String DIRECT_ROUTING_KEY direct;//定义路由键常量用于交换机和队列之间的绑定public static final String DIRECT_ROUTING_KEY_2 direct2;//定义队列名称为DIRECT_QUEUE//为普通队列 绑设置 死信参数Beanpublic Queue directQueue() {//return new Queue(DIRECT_QUEUE, true);MapString, Object args new HashMap();// 设置死信交换机args.put(x-dead-letter-exchange, DLX_EXCHANGE);// 设置发送到死信交换机的路由键args.put(x-dead-letter-routing-key, DLX_ROUTING_KEY);// 创建队列设置为持久化、非排他、非自动删除并附带死信参数return new Queue(DIRECT_QUEUE, true, false, false, args);}//定义直接交换机Beanpublic DirectExchange directExchange() {return new DirectExchange(DIRECT_EXCHANGE, true, false);}//定义队列名称为DIRECT_QUEUE2Beanpublic Queue directQueue2() {return new Queue(DIRECT_QUEUE2, true);}//定义一个绑定将directQueue队列绑定到directExchange交换机上//使用direct作为路由键Beanpublic Binding bindingDirectExchange(Queue directQueue, DirectExchange directExchange) {return BindingBuilder.bind(directQueue).to(directExchange).with(DIRECT_ROUTING_KEY);}// 定义一个绑定Bean将directQueue2队列也绑定到directExchange交换机上Beanpublic Binding bindingDirectExchange2(Queue directQueue2, DirectExchange directExchange) {return BindingBuilder.bind(directQueue2).to(directExchange).with(DIRECT_ROUTING_KEY_2);}//死信// 定义死信交换机的名称public static final String DLX_EXCHANGE dlx_exchange;// 定义发送到死信交换机的路由键public static final String DLX_ROUTING_KEY dlx.routing.key;// 定义死信队列的名称public static final String DLX_QUEUE dlx_queue;/*** 声明死信交换机这里使用Direct类型。* return 返回一个配置好的DirectExchange对象。*/BeanDirectExchange dlxExchange() {// 创建并返回Direct类型的交换机return new DirectExchange(DLX_EXCHANGE,true, false);}/*** 声明死信队列。* return 返回一个配置好的Queue对象用作死信队列。*/BeanQueue dlxQueue() {// 创建并返回死信队列设置为持久化return new Queue(DLX_QUEUE, true);}/*** 绑定死信队列到死信交换机使用指定的路由键。*/BeanBinding binding(Queue dlxQueue,DirectExchange dlxExchange) {return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(DLX_ROUTING_KEY);}}
生产者发送消息
package com.example.reactboot.controller;import com.example.reactboot.config.DirectExchangeConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.nio.charset.StandardCharsets;RestController
public class RabbitMqTest {AutowiredRabbitTemplate rabbitTemplate;RequestMapping(/sendMQ)public String sendMessage() {rabbitTemplate.convertAndSend(DirectExchangeConfig.DIRECT_EXCHANGE,DirectExchangeConfig.DIRECT_ROUTING_KEY, 发送一条测试消息direct);return direct消息发送成功;}}消费者消费消息
package com.example.reactboot.queueListener;import com.example.reactboot.config.DirectExchangeConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;/*** className: DirectQueueListener* description: 直连交换机的监听器* author: sh.Liu* date: 2021-08-23 16:03*/
Slf4j
Component
public class DirectQueueListener {//监听普通队列RabbitHandlerRabbitListener(queues DirectExchangeConfig.DIRECT_QUEUE)public void process(String xx){SimpleDateFormat sdf new SimpleDateFormat(yyyy-MM-dd HH:mm:ss);System.out.println(DirectReceiver消费者收到消息1 : xx 接收时间 sdf.format(new Date()) \n);//先执行业务代码int i 1 / 0;}//监听死信队列RabbitHandlerRabbitListener(queues DirectExchangeConfig.DLX_QUEUE)public void process3(String testMessage) {System.out.println(死信得列里面的 : testMessage \n);}}
二延迟队列 延迟队列是一种特殊的消息队列其内部消息是有序的并且具有延时属性。 在RabbitMQ中虽然AMQP协议本身没有直接支持延迟队列但可以通过一些变通的方法如使用死信队列配合消息的TTL属性或者使用RabbitMQ的延迟消息插件来实现延迟队列的功能。 使用场景
延迟队列在多种业务场景中都有广泛的应用包括但不限于 订单超时未支付自动取消用户下单后如果在规定时间内未完成支付系统可以自动取消订单。退款超时通知用户申请退款后如果长时间未得到处理系统可以自动通知相关运营人员介入。新用户注册后的引导邮件用户注册账号后系统可以在一段时间后发送欢迎邮件或引导邮件。会议提醒在预定的会议开始前一段时间系统自动发送提醒给参会人员。任务调度在指定时间后执行某项任务如定时清理日志、执行批处理任务等。 方法一通过死亡队列实现
以下是使用死信队列配合TTL属性实现延迟队列的基本步骤 定义死信交换机DLX, Dead-Letter Exchange和死信队列DLQ, Dead-Letter Queue设置普通队列的TTL和死信交换机在创建普通队列时可以为其设置TTL属性指定消息在该队列中的最大存活时间。同时需要将该队列的死信交换机设置为前面定义的DLX以便消息在过期后能够被发送到DLQ。生产者发送消息生产者将消息发送到普通队列并指定消息的TTL。消息在队列中等待直到TTL过期。消息过期并发送到死信队列当消息的TTL过期后RabbitMQ会自动将该消息发送到其配置的死信交换机再由死信交换机根据路由键将其发送到DLQ。消费者从死信队列消费消息消费者监听DLQ当有新消息到达时进行消费处理。 就是把普通队列的消息设置存活时间目前有两者方式 1.在队列上面设置消息的过期时间 2.直接在消息上面设置过期时间。
方式一队列上面设置消息过期时间
上面的关于 死信示例 完全可以复用进行测试
在以下的方法里面多加一行 args.put(“x-message-ttl”, 10000); //定义队列名称为DIRECT_QUEUE//为普通队列 绑设置 死信参数Beanpublic Queue directQueue() {//return new Queue(DIRECT_QUEUE, true);MapString, Object args new HashMap();// 设置消息TTL为10秒args.put(x-message-ttl, 10000);// 设置死信交换机args.put(x-dead-letter-exchange, DLX_EXCHANGE);// 设置发送到死信交换机的路由键args.put(x-dead-letter-routing-key, DLX_ROUTING_KEY);// 创建队列设置为持久化、非排他、非自动删除并附带死信参数return new Queue(DIRECT_QUEUE, true, false, false, args);} 你可以把 DirectQueueListener 里面的 process 方法注释掉以免被消费掉。 再执行生产者的 sendMessage 方法。 这个时候你就可以看到下面关于 监听死信队列 的方法 等10秒后就会打印你发的消息了
方式二消息上面设置过期时间
上面的关于 死信示例 完全可以复用进行测试
改一下 这个 生产者发送消息
package com.example.reactboot.controller;import com.example.reactboot.config.DirectExchangeConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.nio.charset.StandardCharsets;RestController
public class RabbitMqTest {AutowiredRabbitTemplate rabbitTemplate;RequestMapping(/sendMQ)public String sendMessage() {SimpleDateFormat sdf new SimpleDateFormat(yyyy-MM-dd HH:mm:ss);rabbitTemplate.convertAndSend(DirectExchangeConfig.DIRECT_EXCHANGE, DirectExchangeConfig.DIRECT_ROUTING_KEY, 发送一条测试消息direct! sdf.format(new Date()), new MessagePostProcessor() {Overridepublic Message postProcessMessage(Message message) throws AmqpException {//设置过期时间超过5秒消息就会消失message.getMessageProperties().setExpiration(5000);//设置编码格式message.getMessageProperties().setContentEncoding(UTF-8);return message;}}); return direct消息发送成功;}}你可以把 DirectQueueListener 里面的 process 方法注释掉以免被消费掉。 再执行生产者的 sendMessage 方法。 这个时候你就可以看到下面关于 监听死信队列 的方法 等5秒后就会打印你发的消息了
到这里其实就结束了剩下的就是监听到死信队列里面的消息后的业务操作了
方法二通过延迟消息插件rabbitmq_delayed_message_exchange实现
后续在说 文章转载自: http://www.morning.ydnx.cn.gov.cn.ydnx.cn http://www.morning.tlfyb.cn.gov.cn.tlfyb.cn http://www.morning.qfgxk.cn.gov.cn.qfgxk.cn http://www.morning.mhmsn.cn.gov.cn.mhmsn.cn http://www.morning.litao7.cn.gov.cn.litao7.cn http://www.morning.jrpmf.cn.gov.cn.jrpmf.cn http://www.morning.fwnqq.cn.gov.cn.fwnqq.cn http://www.morning.knngw.cn.gov.cn.knngw.cn http://www.morning.tlbhq.cn.gov.cn.tlbhq.cn http://www.morning.txrq.cn.gov.cn.txrq.cn http://www.morning.nsfxt.cn.gov.cn.nsfxt.cn http://www.morning.hytqt.cn.gov.cn.hytqt.cn http://www.morning.rlhjg.cn.gov.cn.rlhjg.cn http://www.morning.fyskq.cn.gov.cn.fyskq.cn http://www.morning.fsfz.cn.gov.cn.fsfz.cn http://www.morning.kgqww.cn.gov.cn.kgqww.cn http://www.morning.mcpby.cn.gov.cn.mcpby.cn http://www.morning.vnuwdy.cn.gov.cn.vnuwdy.cn http://www.morning.pflry.cn.gov.cn.pflry.cn http://www.morning.jzgxp.cn.gov.cn.jzgxp.cn http://www.morning.lqynj.cn.gov.cn.lqynj.cn http://www.morning.nqyfm.cn.gov.cn.nqyfm.cn http://www.morning.mgmqf.cn.gov.cn.mgmqf.cn http://www.morning.xbhpm.cn.gov.cn.xbhpm.cn http://www.morning.lbpqk.cn.gov.cn.lbpqk.cn http://www.morning.qymqh.cn.gov.cn.qymqh.cn http://www.morning.tjndb.cn.gov.cn.tjndb.cn http://www.morning.hpjpy.cn.gov.cn.hpjpy.cn http://www.morning.xlwpz.cn.gov.cn.xlwpz.cn http://www.morning.fhjnh.cn.gov.cn.fhjnh.cn http://www.morning.jjnry.cn.gov.cn.jjnry.cn http://www.morning.prqdr.cn.gov.cn.prqdr.cn http://www.morning.ykrkq.cn.gov.cn.ykrkq.cn http://www.morning.qxlgt.cn.gov.cn.qxlgt.cn http://www.morning.dmxzd.cn.gov.cn.dmxzd.cn http://www.morning.zhmgcreativeeducation.cn.gov.cn.zhmgcreativeeducation.cn http://www.morning.gsrh.cn.gov.cn.gsrh.cn http://www.morning.qprtm.cn.gov.cn.qprtm.cn http://www.morning.lfcfn.cn.gov.cn.lfcfn.cn http://www.morning.kscwt.cn.gov.cn.kscwt.cn http://www.morning.zhmgcreativeeducation.cn.gov.cn.zhmgcreativeeducation.cn http://www.morning.hrnrx.cn.gov.cn.hrnrx.cn http://www.morning.fhqsm.cn.gov.cn.fhqsm.cn http://www.morning.iknty.cn.gov.cn.iknty.cn http://www.morning.wnjsp.cn.gov.cn.wnjsp.cn http://www.morning.rkkpr.cn.gov.cn.rkkpr.cn http://www.morning.qsmdd.cn.gov.cn.qsmdd.cn http://www.morning.ryrpq.cn.gov.cn.ryrpq.cn http://www.morning.jikuxy.com.gov.cn.jikuxy.com http://www.morning.rqgjr.cn.gov.cn.rqgjr.cn http://www.morning.ckrnq.cn.gov.cn.ckrnq.cn http://www.morning.wlxfj.cn.gov.cn.wlxfj.cn http://www.morning.yxwcj.cn.gov.cn.yxwcj.cn http://www.morning.bpxmw.cn.gov.cn.bpxmw.cn http://www.morning.srsln.cn.gov.cn.srsln.cn http://www.morning.zcxjg.cn.gov.cn.zcxjg.cn http://www.morning.sqqdy.cn.gov.cn.sqqdy.cn http://www.morning.lxhgj.cn.gov.cn.lxhgj.cn http://www.morning.kmqlf.cn.gov.cn.kmqlf.cn http://www.morning.gxtbn.cn.gov.cn.gxtbn.cn http://www.morning.gtbjc.cn.gov.cn.gtbjc.cn http://www.morning.ljdd.cn.gov.cn.ljdd.cn http://www.morning.yqqgp.cn.gov.cn.yqqgp.cn http://www.morning.pjrql.cn.gov.cn.pjrql.cn http://www.morning.yxnkr.cn.gov.cn.yxnkr.cn http://www.morning.dwwlg.cn.gov.cn.dwwlg.cn http://www.morning.wbysj.cn.gov.cn.wbysj.cn http://www.morning.wlxfj.cn.gov.cn.wlxfj.cn http://www.morning.ywpwq.cn.gov.cn.ywpwq.cn http://www.morning.qkrz.cn.gov.cn.qkrz.cn http://www.morning.yktwr.cn.gov.cn.yktwr.cn http://www.morning.qsdnt.cn.gov.cn.qsdnt.cn http://www.morning.dpruuode.cn.gov.cn.dpruuode.cn http://www.morning.ffwrq.cn.gov.cn.ffwrq.cn http://www.morning.xq3nk42mvv.cn.gov.cn.xq3nk42mvv.cn http://www.morning.lgnz.cn.gov.cn.lgnz.cn http://www.morning.hnrls.cn.gov.cn.hnrls.cn http://www.morning.jmmz.cn.gov.cn.jmmz.cn http://www.morning.xq3nk42mvv.cn.gov.cn.xq3nk42mvv.cn http://www.morning.ldqzz.cn.gov.cn.ldqzz.cn