网站开发 自定义首页显示,南京的电商网站设计,网站建设中药尽量使用图片,wordpress 怎么通过数据库该域名解决RabbitMQ设置x-max-length队列最大长度后不进入死信队列 问题发现问题解决方法一#xff1a;只监听死信队列#xff0c;在死信队列里面处理业务逻辑方法二#xff1a;修改预取值 问题发现
最近再学习RabbitMQ过程中#xff0c;看到关于死信队列内容#xff1a; 来自队… 解决RabbitMQ设置x-max-length队列最大长度后不进入死信队列 问题发现问题解决方法一只监听死信队列在死信队列里面处理业务逻辑方法二修改预取值 问题发现
最近再学习RabbitMQ过程中看到关于死信队列内容 来自队列的消息可以是 “死信”这意味着当以下四个事件中的任何一个发生时这些消息将被重新发布到 Exchange。 使用 basic.reject 或 basic.nack 且 requeue 参数设置为 false 的使用者否定该消息消息由于每条消息的 TTL 而过期队列超出了长度限制 之前的文章里面有讲解过TTL过期后不进入死信队列的疑惑和解决办法然后上手去实践另一个死信队列的方法结果又是一道坑等着我示例代码如下
默认自动应答模式
Configuration
public class MQConfig {/*** 死信队列* return*/Beanpublic Queue deadQueue(){return new Queue(dead_queue);}/*** 死信队列交换机* return*/Beanpublic DirectExchange deadExchange(){return new DirectExchange(dead.exchange);}/*** 死信队列和死信交换机绑定* return*/Beanpublic Binding deadBinding(){return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(dead);}/*** 普通队列* return*/Beanpublic Queue queue(){
// 方法一
// Queue normalQueue new Queue(normal_queue);
// normalQueue.addArgument(x-dead-letter-exchange, dead.exchange); // 死信队列
// normalQueue.addArgument(x-dead-letter-routing-key, dead); // 死信队列routingKey
// normalQueue.addArgument(x-max-length, 5);//设置队列最大长度
// normalQueue.addArgument(x-overflow,reject-publish);//最近发布的消息将被丢弃
// 方法二return QueueBuilder.durable(normal_queue).deadLetterExchange(dead.exchange).deadLetterRoutingKey(dead).maxLength(5) // 设置队列最大长度为5.build();}/*** 普通交换机* return*/Beanpublic DirectExchange normalExchange(){return new DirectExchange(normal.exchange);}/*** 普通队列和普通交换机绑定* return*/Beanpublic Binding binding(){return BindingBuilder.bind(queue()).to(normalExchange()).with(normal);}
}监听普通队列消费方示例代码如下
Component
RabbitListener(queues normal_queue)
public class MQReceiver {private static final Logger log LoggerFactory.getLogger(MQReceiver.class);RabbitHandlerpublic void receive(String msg) {log.info(收到消息msg);}
}监听死信队列消费方示例代码如下
Component
RabbitListener(queues dead_queue)
public class MQReceiver2 {private static final Logger log LoggerFactory.getLogger(MQReceiver2.class);RabbitHandlerpublic void receive(String msg) {log.info(死信队列收到消息{},msg);}
}发送方发送10条消息示例代码如下
Component
public class MQSender {Autowiredprivate RabbitTemplate template;public void send() {for (int i 0; i 10; i) {String msg hello world_i;template.convertAndSend(normal.exchange, normal, msg);}}
}然后调用send()方法执行结果如图 按道理会将队列前面的5条消息进入死信队列然后剩下的五条消息正常消费才对我们检查一下队列是否设置成功如图所示 设置没有问题那就很懵逼了。。。
问题解决
我们先在页面上向普通队交换机发送10条消息然后查看它的状态如图所示 超过5条消息就会放入死信队列中如图所示 然后再看一下默认行为是从队列前面删除或死信消息如图所示 我们可以看到普通队列存放的是最后5条消息前面的5条消息进入死信队列。也就是说再没有进入普通消费者之前会将队列前面删除或死信消息进入消费者之前将消息进行分配。
方法一只监听死信队列在死信队列里面处理业务逻辑
这种做法也是网上大多数文章的一种处理方法另外一种情况就是进入普通消费者还没被消费完的情况下消费者挂了然后队列就会重新分配将从队列前面删除或者进入死信队列如图所示 但是这些做法都是基于没有普通消费者监听的情况下进行的感觉和我理解的略有偏差应该是再有普通消费者监听和死信队列监听的情况下发送消息时会对消息进行分配处理。
发送方代码和配置的代码就不重复展示了参考之前示例监听死信队列自动确认模式和手动确认模式都一样示例代码如下
Component
RabbitListener(queues dead_queue)
public class MQReceiver2 {private static final Logger log LoggerFactory.getLogger(MQReceiver2.class);RabbitHandlerpublic void receive(String msg,Message message,Channel channel) throws IOException {log.info(死信队列收到消息{},msg);channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}
}调用send()方法执行结果如图 然后再死信队列里面处理业务逻辑即可。
方法二修改预取值
再监听普通队列时抛异常或者手动拒绝重新进入队列这两种方式都不能达到我想要的效果我发现只要有普通消费方监听就会进入消费然后我就在想是不是因为预期值的问题导致可能我消息数量太少了然后默认预期值太高导致消息直接进入消费然后将预取值改为1自动确认模式示例代码如下
spring.rabbitmq.listener.simple.prefetch1发送方代码和配置的代码就不重复展示了参考之前示例监听普通队列示例代码如下
Component
RabbitListener(queues normal_queue)
public class MQReceiver {private static final Logger log LoggerFactory.getLogger(MQReceiver.class);RabbitHandlerpublic void receive(String msg) throws InterruptedException {log.info(收到消息msg);// 模拟业务处理队列等待场景Thread.sleep(10000);}
}监听死信队列示例代码如下
Component
RabbitListener(queues dead_queue)
public class MQReceiver2 {private static final Logger log LoggerFactory.getLogger(MQReceiver2.class);RabbitHandlerpublic void receive(String msg) {log.info(死信队列收到消息{},msg);}
}调用send()方法执行结果如图 当然有的时候也不一定完全按照你设置的最大长度进入死信队列有的时候消费速度太快队列的第一个已经被消费了的情况得看实际情况至少可以确保再设置了大于队列最大长度时是可以正常进入死信队列的。归根结底消息数量太少了。
另外我们再来介绍一下溢出方式一般默认情况下溢出方式为drop-head从队列前面删除或者进入死信队列除此之外还有两种
reject-publish和reject-publish-dlx最近发布的消息将被丢弃。reject-publish 和 reject-publish-dlx 之间的区别在于 reject-publish-dlx 也是死信拒绝消息。
将配置的溢出模式改为reject-publish示例代码如下
Configuration
public class MQConfig {/*** 死信队列* return*/Beanpublic Queue deadQueue(){return new Queue(dead_queue);}/*** 死信队列交换机* return*/Beanpublic DirectExchange deadExchange(){return new DirectExchange(dead.exchange);}/*** 死信队列和死信交换机绑定* return*/Beanpublic Binding deadBinding(){return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(dead);}/*** 普通队列* return*/Beanpublic Queue queue() {
// 方法一
// Queue normalQueue new Queue(normal_queue);
// normalQueue.addArgument(x-dead-letter-exchange, dead.exchange); // 死信队列
// normalQueue.addArgument(x-dead-letter-routing-key, dead); // 死信队列routingKey
// normalQueue.addArgument(x-max-length, 5);//设置队列最大长度
// normalQueue.addArgument(x-overflow,reject-publish);//最近发布的消息将被丢弃
// 方法二return QueueBuilder.durable(normal_queue).deadLetterExchange(dead.exchange).deadLetterRoutingKey(dead).maxLength(5) // 设置队列最大长度为5.overflow(QueueBuilder.Overflow.dropHead).build();}/*** 普通交换机* return*/Beanpublic DirectExchange normalExchange(){return new DirectExchange(normal.exchange);}/*** 普通队列和普通交换机绑定* return*/Beanpublic Binding binding(){return BindingBuilder.bind(queue()).to(normalExchange()).with(normal);}
}重新创建队列调用send()方法如图所示 由图可以死信队列并不会受到消息。
然后我们再来看看将溢出模式设置为reject-publish-dlxQueueBuilder.Overflow没有该参数手动定义示例代码如下
Configuration
public class MQConfig {//忽略死信队列创建和绑定过程参考前面示例.../*** 普通队列* return*/Beanpublic Queue queue() {Queue normalQueue new Queue(normal_queue);normalQueue.addArgument(x-dead-letter-exchange, dead.exchange); // 死信队列normalQueue.addArgument(x-dead-letter-routing-key, dead); // 死信队列routingKeynormalQueue.addArgument(x-max-length, 5);//设置队列最大长度normalQueue.addArgument(x-overflow,reject-publish-dlx); //最近发布的消息进入死信队列return normalQueue;}//忽略普通队列创建过程参考前面示例...
}重新创建队列调用send()方法如图所示 如果你有更好的方法或者不同的理解欢迎评论区交流。