西安道桥建设有限公司网站,北京个人制作网站有哪些内容,网站制作q,网站开发毕设的需求分析RabbitMQ的DLX 1、RabbitMQ死信队列2、代码示例2.1、队列过期2.1.1、配置类RabbitConfig#xff08;关键代码#xff09;2.1.2、业务类MessageService2.1.3、配置文件application.yml2.1.4、启动类2.1.5、配置文件2.1.6、测试 2.2、消息过期2.2.1、配置类RabbitConfig2.2.2、… RabbitMQ的DLX 1、RabbitMQ死信队列2、代码示例2.1、队列过期2.1.1、配置类RabbitConfig关键代码2.1.2、业务类MessageService2.1.3、配置文件application.yml2.1.4、启动类2.1.5、配置文件2.1.6、测试 2.2、消息过期2.2.1、配置类RabbitConfig2.2.2、业务类MessageService关键代码2.2.3、配置文件application.yml2.2.4、启动类同上2.2.5、配置文件同上2.2.6、测试 2.3、队列达到最大长度先入队的消息会被发送到DLX2.3.1、配置类RabbitConfig关键代码2.3.2、业务类MessageService关键代码2.3.3、配置文件application.yml2.3.4、启动类同上2.3.5、配置文件pom.xml同上2.3.6、测试 2.4、消费者拒绝消息不进行重新投递2.4.1、生产者2.4.1.1、生产者application.yml2.4.1.2、生产者发送消息2.4.1.3、生产者配置类 2.4.2、消费者2.4.2.1、消费者application.yml 启动手动确认关键配置 2.4.2.2、消费者接收消息关键代码 2.4.3、测试 1、RabbitMQ死信队列
RabbitMQ死信队列也有叫 死信交换机、死信邮箱等说法。 DLX: Dead-Letter-Exchange 死信交换器死信邮箱。 1-2、生产者发送一个消息到正常交换机 2-4、正常交换机接收到消息发送到正常队列 4、正常队列设置了队列过期时间超时消息会自动删除 4-6、原本过期自动删除的消息发送到了死信交换机 6-8、死信交换机将消息发送到了死信队列
如上情况下一个消息会进入DLXDead Letter Exchange死信交换机。
2、代码示例
2.1、队列过期
2.1.1、配置类RabbitConfig关键代码
package com.power.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;Configuration
public class RabbitConfig {Value(${my.exchangeNormalName})private String exchangeNormalName;Value(${my.queueNormalName})private String queueNormalName;Value(${my.exchangeDlxName})private String exchangeDlxName;Value(${my.queueDlxName})private String queueDlxName;/*** 正常交换机* return*/Beanpublic DirectExchange normalExchange(){return ExchangeBuilder.directExchange(exchangeNormalName).build();}/*** 正常队列* return*/Beanpublic Queue normalQueue(){MapString, Object arguments new HashMap();arguments.put(x-message-ttl,20000);//设置队列的过期时间为20秒//重点设置这两个参数arguments.put(x-dead-letter-exchange,exchangeDlxName); //设置队列的死信交换机arguments.put(x-dead-letter-routing-key,error);//设置死信路由key要跟死信交换机和死信队列绑定的路由key一致return QueueBuilder.durable(queueNormalName).withArguments(arguments) //设置队列的过期时间.build();}/*** 正常交换机和正常队列绑定* param normalExchange* param normalQueue* return*/Beanpublic Binding bingNormal(DirectExchange normalExchange,Queue normalQueue){return BindingBuilder.bind(normalQueue).to(normalExchange).with(order);}/*** 死信交换机* return*/Beanpublic DirectExchange dlxExchange(){return ExchangeBuilder.directExchange(exchangeDlxName).build();}/*** 死信队列* return*/Beanpublic Queue dlxQueue(){return QueueBuilder.durable(queueDlxName).build();}/*** 死信交换机和死信队列绑定* param dlxExchange* param dlxQueue* return*/Beanpublic Binding bindDlx(DirectExchange dlxExchange,Queue dlxQueue){return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(error);}
}2.1.2、业务类MessageService
package com.power.service;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Service;import javax.annotation.Resource;
import java.util.Date;Service
Slf4j
public class MessageService {Resourceprivate RabbitTemplate rabbitTemplate;Beanpublic void sendMsg(){Message message MessageBuilder.withBody(hello world.getBytes()).build();rabbitTemplate.convertAndSend(exchange.normal.a,order,message);log.info(消息发送完毕发送时间是new Date());}
}2.1.3、配置文件application.yml
server:port: 8080
spring:application:name: dlx-test01rabbitmq:host: 你的服务器IPport: 5672username: 你的账号password: 你的密码virtual-host: powermy:exchangeNormalName: exchange.normal.a #正常交换机queueNormalName: queue.normal.a #正常队列没有消费组设置过期时间exchangeDlxName: exchange.dlx.a #死信交换机queueDlxName: queue.dlx.a #死信队列2.1.4、启动类
package com.power;import com.power.service.MessageService;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;import javax.annotation.Resource;SpringBootApplication
public class Application implements ApplicationRunner {Resourceprivate MessageService messageService;public static void main(String[] args) {SpringApplication.run(Application.class);}Overridepublic void run(ApplicationArguments args) throws Exception {messageService.sendMsg();}
}2.1.5、配置文件
?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0 xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdcom.power/groupIdartifactIdrabbit_06_dlx01/artifactIdversion1.0-SNAPSHOT/versionnamerabbit_06_dlx01/namepropertiesmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.targetproject.build.sourceEncodingUTF-8/project.build.sourceEncoding/propertiesparentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion2.6.13/versionrelativePath//parentdependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdscopetest/scope/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdversion1.18.24/version/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependency/dependenciesbuildpluginsplugingroupIdorg.springframework.boot/groupIdartifactIdspring-boot-maven-plugin/artifactId/plugin/plugins/build/project2.1.6、测试
启动程序发送消息
消息会先被发送到正常队列queue.normal.a中超时未被消费 则消息会被发送到死信队列queue.dlx.a 中
2.2、消息过期
2.2.1、配置类RabbitConfig
package com.power.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;Configuration
public class RabbitConfig {Value(${my.exchangeNormalName})private String exchangeNormalName;Value(${my.queueNormalName})private String queueNormalName;Value(${my.exchangeDlxName})private String exchangeDlxName;Value(${my.queueDlxName})private String queueDlxName;/*** 正常交换机* return*/Beanpublic DirectExchange normalExchange(){return ExchangeBuilder.directExchange(exchangeNormalName).build();}/*** 正常队列* return*/Beanpublic Queue normalQueue(){MapString, Object arguments new HashMap();//重点设置这两个参数//设置队列的死信交换机arguments.put(x-dead-letter-exchange,exchangeDlxName);//设置死信路由key要跟死信交换机和死信队列绑定的路由key一致arguments.put(x-dead-letter-routing-key,error);return QueueBuilder.durable(queueNormalName).withArguments(arguments) //设置队列的过期时间.build();}/*** 正常交换机和正常队列绑定* param normalExchange* param normalQueue* return*/Beanpublic Binding bingNormal(DirectExchange normalExchange,Queue normalQueue){return BindingBuilder.bind(normalQueue).to(normalExchange).with(order);}/*** 死信交换机* return*/Beanpublic DirectExchange dlxExchange(){return ExchangeBuilder.directExchange(exchangeDlxName).build();}/*** 死信队列* return*/Beanpublic Queue dlxQueue(){return QueueBuilder.durable(queueDlxName).build();}/*** 死信交换机和死信队列绑定* param dlxExchange* param dlxQueue* return*/Beanpublic Binding bindDlx(DirectExchange dlxExchange,Queue dlxQueue){return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(error);}
}2.2.2、业务类MessageService关键代码
package com.power.service;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Service;import javax.annotation.Resource;
import java.util.Date;Service
Slf4j
public class MessageService {Resourceprivate RabbitTemplate rabbitTemplate;Beanpublic void sendMsg(){try {MessageProperties messageProperties new MessageProperties();//设置单条消息的过期时间单位为毫秒数据类型为字符串messageProperties.setExpiration(20000);Message message MessageBuilder.withBody(hello world.getBytes()).andProperties(messageProperties).build();rabbitTemplate.convertAndSend(exchange.normal.02,order,message);}catch (Exception e){e.printStackTrace();log.info(消息发送失败new Date());}log.info(消息发送完毕发送时间是new Date());}
}2.2.3、配置文件application.yml
server:port: 8080
spring:application:name: dlx-test01rabbitmq:host: 你的服务器IPport: 5672username: 你的账号password: 你的密码virtual-host: powermy:exchangeNormalName: exchange.normal.02 #正常交换机queueNormalName: queue.normal.02 #正常队列没有消费组设置过期时间exchangeDlxName: exchange.dlx.02 #死信交换机queueDlxName: queue.dlx.02 #死信队列2.2.4、启动类同上
2.2.5、配置文件同上
2.2.6、测试
启动程序发送消息 登录rabbitmq后台 消息先进入正常队列queue.normal.02中超时未消费 消息超过过期时间则进入queue.dlx.02死信队列 2.3、队列达到最大长度先入队的消息会被发送到DLX
2.3.1、配置类RabbitConfig关键代码
package com.power.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;Configuration
public class RabbitConfig {Value(${my.exchangeNormalName})private String exchangeNormalName;Value(${my.queueNormalName})private String queueNormalName;Value(${my.exchangeDlxName})private String exchangeDlxName;Value(${my.queueDlxName})private String queueDlxName;/*** 正常交换机* return*/Beanpublic DirectExchange normalExchange(){return ExchangeBuilder.directExchange(exchangeNormalName).build();}/*** 正常队列* return*/Beanpublic Queue normalQueue(){MapString, Object arguments new HashMap();//设置队列的最大长度arguments.put(x-max-length,5);//重点设置这两个参数//设置队列的死信交换机arguments.put(x-dead-letter-exchange,exchangeDlxName);//设置死信路由key要跟死信交换机和死信队列绑定的路由key一致arguments.put(x-dead-letter-routing-key,error);return QueueBuilder.durable(queueNormalName).withArguments(arguments) //设置队列的参数.build();}/*** 正常交换机和正常队列绑定* param normalExchange* param normalQueue* return*/Beanpublic Binding bingNormal(DirectExchange normalExchange,Queue normalQueue){return BindingBuilder.bind(normalQueue).to(normalExchange).with(order);}/*** 死信交换机* return*/Beanpublic DirectExchange dlxExchange(){return ExchangeBuilder.directExchange(exchangeDlxName).build();}/*** 死信队列* return*/Beanpublic Queue dlxQueue(){return QueueBuilder.durable(queueDlxName).build();}/*** 死信交换机和死信队列绑定* param dlxExchange* param dlxQueue* return*/Beanpublic Binding bindDlx(DirectExchange dlxExchange,Queue dlxQueue){return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(error);}
}2.3.2、业务类MessageService关键代码
package com.power.service;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Service;import javax.annotation.Resource;
import java.util.Date;Service
Slf4j
public class MessageService {Resourceprivate RabbitTemplate rabbitTemplate;Beanpublic void sendMsg(){for (int i 1; i 8; i) {String msg hello world i;Message message MessageBuilder.withBody(msg.getBytes()).build();rabbitTemplate.convertAndSend(exchange.normal.03,order,message);log.info(消息发送完毕发送时间是new Date());}}
}2.3.3、配置文件application.yml
server:port: 8080
spring:application:name: dlx-test01rabbitmq:host: 你的服务器IPport: 5672username: 你的账号password: 你的密码virtual-host: powermy:exchangeNormalName: exchange.normal.03 #正常交换机queueNormalName: queue.normal.03 #正常队列没有消费组设置过期时间exchangeDlxName: exchange.dlx.03 #死信交换机queueDlxName: queue.dlx.03 #死信队列2.3.4、启动类同上
2.3.5、配置文件pom.xml同上
2.3.6、测试
启动项目发送消息 登录rabbitmq后台 两条消息进入死信队列 查看消息发现是前两条消息进入了死信队列
2.4、消费者拒绝消息不进行重新投递
消费者从正常的队列接收消息但是消费者对消息不进行确认并且不对消息进行重新投递此时消息就进入死信队列。
2.4.1、生产者 2.4.1.1、生产者application.yml
server:port: 8080
spring:application:name: dlx-test04rabbitmq:host: 你的服务器IPport: 5672username: 你的账号password: 你的密码virtual-host: powermy:exchangeNormalName: exchange.normal.04 #正常交换机queueNormalName: queue.normal.04 #正常队列没有消费组设置过期时间exchangeDlxName: exchange.dlx.04 #死信交换机queueDlxName: queue.dlx.04 #死信队列2.4.1.2、生产者发送消息
package com.power.service;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Service;import javax.annotation.Resource;
import java.util.Date;Service
Slf4j
public class MessageService {Resourceprivate RabbitTemplate rabbitTemplate;Beanpublic void sendMsg(){String msg hello world;Message message MessageBuilder.withBody(msg.getBytes()).build();rabbitTemplate.convertAndSend(exchange.normal.04,order,message);log.info(消息发送完毕发送时间是new Date());}
}2.4.1.3、生产者配置类
package com.power.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;Configuration
public class RabbitConfig {Value(${my.exchangeNormalName})private String exchangeNormalName;Value(${my.queueNormalName})private String queueNormalName;Value(${my.exchangeDlxName})private String exchangeDlxName;Value(${my.queueDlxName})private String queueDlxName;/*** 正常交换机* return*/Beanpublic DirectExchange normalExchange(){return ExchangeBuilder.directExchange(exchangeNormalName).build();}/*** 正常队列* return*/Beanpublic Queue normalQueue(){MapString, Object arguments new HashMap();//重点设置这两个参数//设置队列的死信交换机arguments.put(x-dead-letter-exchange,exchangeDlxName);//设置死信路由key要跟死信交换机和死信队列绑定的路由key一致arguments.put(x-dead-letter-routing-key,error);return QueueBuilder.durable(queueNormalName).withArguments(arguments) //设置队列的参数.build();}/*** 正常交换机和正常队列绑定* param normalExchange* param normalQueue* return*/Beanpublic Binding bingNormal(DirectExchange normalExchange,Queue normalQueue){return BindingBuilder.bind(normalQueue).to(normalExchange).with(order);}/*** 死信交换机* return*/Beanpublic DirectExchange dlxExchange(){return ExchangeBuilder.directExchange(exchangeDlxName).build();}/*** 死信队列* return*/Beanpublic Queue dlxQueue(){return QueueBuilder.durable(queueDlxName).build();}/*** 死信交换机和死信队列绑定* param dlxExchange* param dlxQueue* return*/Beanpublic Binding bindDlx(DirectExchange dlxExchange,Queue dlxQueue){return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(error);}
}2.4.2、消费者 2.4.2.1、消费者application.yml 启动手动确认
server:port: 9090
spring:application:name: dlx04-receiverrabbitmq:host: 你的服务器IPport: 5672username: 你的账号password: 你的密码virtual-host: powerlistener:simple:acknowledge-mode: manual关键配置 2.4.2.2、消费者接收消息
package com.power.service;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.util.Date;Component
Slf4j
public class MessageReceive {RabbitListener(queues{queue.normal.04})public void receiveMsg(Message message, Channel channel){//获取消息属性MessageProperties messageProperties message.getMessageProperties();//获取消息的唯一标识类似学号和身份证号long deliveryTag messageProperties.getDeliveryTag();try{byte[] body message.getBody();String msg new String(body);log.info(监听到的消息是msg,接收的时间是new Date());//TODO 业务逻辑处理int a1/0;//消费者的手动确认false只确认当前消息true批量确认channel.basicAck(deliveryTag,false);}catch (Exception e){log.error(接收者出现问题{},e.getMessage());try {//消费者的手动不确认参数3是重新入队//不会进入死信队列
// channel.basicNack(deliveryTag,false,true);//消费者的手动不确认参数3false 不重新入队不重新投递就会变成死信channel.basicNack(deliveryTag,false,false);}catch (IOException ex){throw new RuntimeException(ex);}}}}
关键代码 2.4.3、测试
启动生产者发送消息 启动消费者 因业务代码出错程序处理异常消息进入死信队列