惠州公司做网站,网易企业邮箱登入路口,做简易网站的APP,商业网站有哪些开启RabbitMQ的生产者发送消息到RabbitMQ服务端的接收确认#xff08;ACK#xff09;和消费者通过手动确认或者丢弃消费的消息。 通过配置 publisher-confirm-type: correlated 和publisher-returns: true开启生产者确认消息。
server:port: 8014spring:rabbitmq:username: …开启RabbitMQ的生产者发送消息到RabbitMQ服务端的接收确认ACK和消费者通过手动确认或者丢弃消费的消息。 通过配置 publisher-confirm-type: correlated 和publisher-returns: true开启生产者确认消息。
server:port: 8014spring:rabbitmq:username: adminpassword: 123456dynamic: true
# port: 5672
# host: 192.168.49.9addresses: 192.168.49.10:5672,192.168.49.9:5672,192.168.49.11:5672publisher-confirm-type: correlatedpublisher-returns: trueapplication:name: shushandatasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://ip/shushanusername: rootpassword: hikari:minimum-idle: 10maximum-pool-size: 20idle-timeout: 50000max-lifetime: 540000connection-test-query: select 1connection-timeout: 600000RabbitConfig
package com.kexuexiong.shushan.common.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;Slf4j
Configuration
public class RabbitConfig {Beanpublic RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);rabbitTemplate.setMandatory(true);rabbitTemplate.setConfirmCallback((correlationData, ack, cause) - {log.info(confirmCallback data: correlationData);log.info(confirmCallback ack : ack);log.info(confirmCallback cause : cause);});rabbitTemplate.setReturnsCallback(returned - log.info(returnsCallback msg : returned));return rabbitTemplate;}
}
AckReceiver 手动确认消费者
package com.kexuexiong.shushan.common.mq;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
import java.util.Map;
import java.util.Objects;Slf4j
Component
public class AckReceiver implements ChannelAwareMessageListener {Overridepublic void onMessage(Message message, Channel channel) throws Exception {long deliveryTag message.getMessageProperties().getDeliveryTag();byte[] messageBody message.getBody();try (ObjectInputStream inputStream new ObjectInputStream(new ByteArrayInputStream(messageBody));) {MapString, String msg (MapString, String) inputStream.readObject();log.info(message.getMessageProperties().getConsumerQueue()-ack Receiver : msg);log.info(header msg :message.getMessageProperties().getHeaders());if(Objects.equals(message.getMessageProperties().getConsumerQueue(),MqConstant.BUSINESS_QUEUE)){channel.basicNack(deliveryTag,false,false);}else if(Objects.equals(message.getMessageProperties().getConsumerQueue(),MqConstant.DEAD_LETTER_QUEUE)){channel.basicAck(deliveryTag, true);}else {channel.basicAck(deliveryTag, true);}} catch (Exception e) {channel.basicReject(deliveryTag, false);log.error(e.getMessage());}}
}
通过配置 simpleMessageListenerContainer.setQueueNames(MqConstant.DEAD_LETTER_QUEUE)可以监听多个消息队列。
package com.kexuexiong.shushan.common.mq;import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;Configuration
public class MessageListenerConfig {Autowiredprivate CachingConnectionFactory connectionFactory;Autowiredprivate AckReceiver ackReceiver;Beanpublic SimpleMessageListenerContainer simpleMessageListenerContainer() {SimpleMessageListenerContainer simpleMessageListenerContainer new SimpleMessageListenerContainer(connectionFactory);simpleMessageListenerContainer.setConcurrentConsumers(2);simpleMessageListenerContainer.setMaxConcurrentConsumers(2);simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);//,MqConstant.demoDirectQueue, MqConstant.FANOUT_A, MqConstant.BIG_CAR_TOPICsimpleMessageListenerContainer.setQueueNames(MqConstant.DEAD_LETTER_QUEUE);simpleMessageListenerContainer.setMessageListener(ackReceiver);return simpleMessageListenerContainer;}}
package com.kexuexiong.shushan.controller.mq;import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.RandomUtil;
import com.kexuexiong.shushan.common.mq.MqConstant;
import com.kexuexiong.shushan.controller.BaseController;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;Slf4j
RestController
RequestMapping(/mq/)
public class MqController extends BaseController {AutowiredRabbitTemplate rabbitTemplate;GetMapping(/callback/sendDirectMessage)public String sendDirectMessageCallback(){String msgId UUID.randomUUID().toString();String msg demo msg ,kexuexiong;String createTime DateUtil.format(new Date(),YYYY-MM-dd HH:mm:ss);MapString,Object map new HashMap();map.put(msgId,msgId);map.put(msg,msg);map.put(createTime,createTime);rabbitTemplate.convertAndSend(noneDirectExchange,demoDirectRouting,map);return ok;}GetMapping(/callback/lonelyDirectExchange)public String lonelyDirectExchange(){String msgId UUID.randomUUID().toString();String msg demo msg ,kexuexiong;String createTime DateUtil.format(new Date(),YYYY-MM-dd HH:mm:ss);MapString,Object map new HashMap();map.put(msgId,msgId);map.put(msg,msg);map.put(createTime,createTime);rabbitTemplate.convertAndSend(MqConstant.lonelyDirectExchange,demoDirectRouting,map);return ok;}
}
测试
发送dirct消息 找不到交换机情况
2023-10-10T17:04:58.49208:00 ERROR 27232 --- [.168.49.10:5672] o.s.a.r.c.CachingConnectionFactory : Shutdown Signal: channel error; protocol method: #methodchannel.close(reply-code404, reply-textNOT_FOUND - no exchange noneDirectExchange in vhost /, class-id60, method-id40)
2023-10-10T17:04:58.49208:00 INFO 27232 --- [nectionFactory6] c.k.shushan.common.config.RabbitConfig : confirmCallback data: null
2023-10-10T17:04:58.49208:00 INFO 27232 --- [nectionFactory6] c.k.shushan.common.config.RabbitConfig : confirmCallback ack :false
2023-10-10T17:04:58.49208:00 INFO 27232 --- [nectionFactory6] c.k.shushan.common.config.RabbitConfig : confirmCallback cause :channel error; protocol method: #methodchannel.close(reply-code404, reply-textNOT_FOUND - no exchange noneDirectExchange in vhost /, class-id60, method-id40)
ack 为false。
发送dirct消息 找不到队列
2023-10-10T17:05:55.85108:00 INFO 27232 --- [nectionFactory5] c.k.shushan.common.config.RabbitConfig : confirmCallback data: null
2023-10-10T17:05:55.85208:00 INFO 27232 --- [nectionFactory5] c.k.shushan.common.config.RabbitConfig : confirmCallback ack :true
2023-10-10T17:05:55.85208:00 INFO 27232 --- [nectionFactory5] c.k.shushan.common.config.RabbitConfig : confirmCallback cause :null
2023-10-10T17:05:55.86508:00 INFO 27232 --- [nectionFactory6] c.k.shushan.common.config.RabbitConfig : returnsCallback msg : ReturnedMessage [message(Body:[serialized object] MessageProperties [headers{}, contentTypeapplication/x-java-serialized-object, contentLength0, receivedDeliveryModePERSISTENT, priority0, deliveryTag0]), replyCode312, replyTextNO_ROUTE, exchangelonelyDirectExchange, routingKeydemoDirectRouting]
ACK为truereplyTextNO_ROUTE。 文章转载自: http://www.morning.prgyd.cn.gov.cn.prgyd.cn http://www.morning.bqwrn.cn.gov.cn.bqwrn.cn http://www.morning.jhkzl.cn.gov.cn.jhkzl.cn http://www.morning.tfrmx.cn.gov.cn.tfrmx.cn http://www.morning.pdghl.cn.gov.cn.pdghl.cn http://www.morning.bnlsd.cn.gov.cn.bnlsd.cn http://www.morning.gbljq.cn.gov.cn.gbljq.cn http://www.morning.kqqk.cn.gov.cn.kqqk.cn http://www.morning.rnqbn.cn.gov.cn.rnqbn.cn http://www.morning.ksbmx.cn.gov.cn.ksbmx.cn http://www.morning.rkdw.cn.gov.cn.rkdw.cn http://www.morning.rqckh.cn.gov.cn.rqckh.cn http://www.morning.dygsz.cn.gov.cn.dygsz.cn http://www.morning.lnsnyc.com.gov.cn.lnsnyc.com http://www.morning.gftnx.cn.gov.cn.gftnx.cn http://www.morning.snmsq.cn.gov.cn.snmsq.cn http://www.morning.tbjb.cn.gov.cn.tbjb.cn http://www.morning.bcjbm.cn.gov.cn.bcjbm.cn http://www.morning.ykrg.cn.gov.cn.ykrg.cn http://www.morning.nbiotank.com.gov.cn.nbiotank.com http://www.morning.bfsqz.cn.gov.cn.bfsqz.cn http://www.morning.stmkm.cn.gov.cn.stmkm.cn http://www.morning.ngcsh.cn.gov.cn.ngcsh.cn http://www.morning.wjtwn.cn.gov.cn.wjtwn.cn http://www.morning.bbxbh.cn.gov.cn.bbxbh.cn http://www.morning.lkfhk.cn.gov.cn.lkfhk.cn http://www.morning.xpfwr.cn.gov.cn.xpfwr.cn http://www.morning.ydzly.cn.gov.cn.ydzly.cn http://www.morning.bmnm.cn.gov.cn.bmnm.cn http://www.morning.wylpy.cn.gov.cn.wylpy.cn http://www.morning.glxmf.cn.gov.cn.glxmf.cn http://www.morning.ztcxx.com.gov.cn.ztcxx.com http://www.morning.rshkh.cn.gov.cn.rshkh.cn http://www.morning.ygth.cn.gov.cn.ygth.cn http://www.morning.lrzst.cn.gov.cn.lrzst.cn http://www.morning.mpgfk.cn.gov.cn.mpgfk.cn http://www.morning.sxlrg.cn.gov.cn.sxlrg.cn http://www.morning.xkwrb.cn.gov.cn.xkwrb.cn http://www.morning.hrpbq.cn.gov.cn.hrpbq.cn http://www.morning.wypyl.cn.gov.cn.wypyl.cn http://www.morning.lzjxn.cn.gov.cn.lzjxn.cn http://www.morning.sfnr.cn.gov.cn.sfnr.cn http://www.morning.nzcgj.cn.gov.cn.nzcgj.cn http://www.morning.rbnnq.cn.gov.cn.rbnnq.cn http://www.morning.hotlads.com.gov.cn.hotlads.com http://www.morning.glncb.cn.gov.cn.glncb.cn http://www.morning.pfnwt.cn.gov.cn.pfnwt.cn http://www.morning.pmdlk.cn.gov.cn.pmdlk.cn http://www.morning.xnltz.cn.gov.cn.xnltz.cn http://www.morning.fkwp.cn.gov.cn.fkwp.cn http://www.morning.gfmpk.cn.gov.cn.gfmpk.cn http://www.morning.kfwrq.cn.gov.cn.kfwrq.cn http://www.morning.mrcpy.cn.gov.cn.mrcpy.cn http://www.morning.mqxrx.cn.gov.cn.mqxrx.cn http://www.morning.kntsd.cn.gov.cn.kntsd.cn http://www.morning.lcxzg.cn.gov.cn.lcxzg.cn http://www.morning.hmpxn.cn.gov.cn.hmpxn.cn http://www.morning.fdwlg.cn.gov.cn.fdwlg.cn http://www.morning.bqdgr.cn.gov.cn.bqdgr.cn http://www.morning.grwgw.cn.gov.cn.grwgw.cn http://www.morning.qbnfc.cn.gov.cn.qbnfc.cn http://www.morning.nzms.cn.gov.cn.nzms.cn http://www.morning.brps.cn.gov.cn.brps.cn http://www.morning.cltrx.cn.gov.cn.cltrx.cn http://www.morning.rrbhy.cn.gov.cn.rrbhy.cn http://www.morning.gzzncl.cn.gov.cn.gzzncl.cn http://www.morning.cfnsn.cn.gov.cn.cfnsn.cn http://www.morning.tlfyb.cn.gov.cn.tlfyb.cn http://www.morning.wzdjl.cn.gov.cn.wzdjl.cn http://www.morning.rtsx.cn.gov.cn.rtsx.cn http://www.morning.nkcfh.cn.gov.cn.nkcfh.cn http://www.morning.jfnlj.cn.gov.cn.jfnlj.cn http://www.morning.pzbqm.cn.gov.cn.pzbqm.cn http://www.morning.knnc.cn.gov.cn.knnc.cn http://www.morning.lqchz.cn.gov.cn.lqchz.cn http://www.morning.mbnhr.cn.gov.cn.mbnhr.cn http://www.morning.zzfjh.cn.gov.cn.zzfjh.cn http://www.morning.lpqgq.cn.gov.cn.lpqgq.cn http://www.morning.lltdf.cn.gov.cn.lltdf.cn http://www.morning.tgyzk.cn.gov.cn.tgyzk.cn