网站优化培训班,东莞如何制作免费的网页,广州品牌网络营销方式,吴中区网站设计公司1. Return机制 Confirm只能保证消息到达exchange#xff0c;无法保证消息可以被exchange分发到指定queue。 而且exchange是不能持久化消息的#xff0c;queue是可以持久化消息。 采用Return机制来监听消息是否从exchange送到了指定的queue中 2.Java的实现方式
1.导入依赖 无法保证消息可以被exchange分发到指定queue。 而且exchange是不能持久化消息的queue是可以持久化消息。 采用Return机制来监听消息是否从exchange送到了指定的queue中 2.Java的实现方式
1.导入依赖 dependencygroupIdcom.rabbitmq/groupIdartifactIdamqp-client/artifactIdversion5.6.0/version/dependency
2.生产者的实现方式 采用Return机制来监听消息是否从exchange送到了指定的queue中
package com.qf.mq2302.hello;import com.qf.mq2302.utils.MQUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ReturnListener;import java.io.IOException;public class SendRetrun {public static final String QUEUE_NAMEhello-queue;public static void main(String[] args) throws Exception {//1.获取连接对象Connection conn MQUtils.getConnection();//2. 创建一个channel对象,对于MQ的大部分操作都定义在了channel对象上Channel channel conn.createChannel();//3.声明了一个队列/*** queue – the name of the queue* durable – true代表创建的队列是持久化的当mq重启后该队列依然存在* exclusive – 该队列是不是排他的 (该对立是否只能由当前创建该队列的连接使用)* autoDelete – 该队列是否可以被mq服务器自动删除* arguments – 队列的其他参数可以为null*/channel.queueDeclare(QUEUE_NAME, false, false, false, null);//开启 return 机制//编写回调方法channel.addReturnListener(new ReturnListener() {//如果消息没有成功发送到队列这个方法会被调用Overridepublic void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(ReturnListener);System.out.println(replyCode:replyCode);System.out.println(replyText:replyText);System.out.println(exchange:exchange);System.out.println(routingKey:routingKey);System.out.println(properties:properties);System.out.println(body:new String(body,utf-8));System.out.println(ReturnListener);}});String message Hello doubleasdasda!;//生产者如何发送消息使用下面的方法即可/*** exchange – 交换机的名字 ,如果是空串说明是把消息发给了默认交换机* routingKey – 路由的key,当发送消息给默认交换机时routingkey代表队列的名字* other properties - 消息的其他属性可以为null* body – 消息的内容注意要是有 字节数组*///注意如果要使用生产者的return机制需要在发送消息时指定mandatory(强制性)为truechannel.basicPublish(, sadnaas, true,null, message.getBytes());System.out.println( [x] Sent message );Thread.sleep(1000);// 关闭资源channel.close();conn.close();}
}这个必须要加上才能让rutern返回机制生效 3.消费者的实现方式
package com.qf.mq2302.hello;import com.qf.mq2302.utils.MQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;import java.io.IOException;public class Recv {private final static String QUEUE_NAMEhello-queue;public static void main(String[] args) throws Exception {//1.获取连接对象Connection conn MQUtils.getConnection();//2. 创建一个channel对象,对于MQ的大部分操作都定义在了channel对象上Channel channel conn.createChannel();/*** 第一个参数队列名称* 第二个参数耐用性* 第三个参数排外性* 第四个参数是否自动删除* 第五个参数可以定义什么类型的队列*/channel.queueDeclare(QUEUE_NAME, false, false, false, null);//3.该消费者收到消息之后的处理逻辑写在DeliverCallback对象中DeliverCallback deliverCallback new DeliverCallback() {Overridepublic void handle(String consumerTag, Delivery message) throws IOException {System.out.println(consumerTag);//从Delivery对象中可以获取到生产者发送的消息的字节数组byte[] body message.getBody();String msg new String(body, utf-8);//在这里写消费者的业务逻辑,例如,发送邮件System.out.println(msg);}};//4.让当前消费者开始消费QUEUE_NAME队列中的消息/*** queue – the name of the queue* autoAck – true 代表当前消费者是不是自动确认模式。true代表自动确认。* deliverCallback – 当有消息发送给该消费者时消费者如何处理消息的逻辑* cancelCallback – 当消费者被取消掉时如果要执行代码写到这里*/channel.basicConsume(QUEUE_NAME,true,deliverCallback,consumerTag - {});}}3.整合springboot实现
1.导入依赖
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependency2.yml配置文件
spring:rabbitmq:host: 8.140.244.227port: 6786username: testpassword: testvirtual-host: /testpublisher-returns: true #开启return机制
3.RabbitMQ配置文件
package com.qf.bootmq2302.config;import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;Configuration
public class RabbitConfig {Beanpublic RabbitTemplate rabbitTemplate(CachingConnectionFactory cachingConnectionFactory){RabbitTemplate rabbitTemplate new RabbitTemplate();//设置连接工厂对象rabbitTemplate.setConnectionFactory(cachingConnectionFactory);// 开启return机制rabbitTemplate.setMandatory(true);rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {System.out.println(message:new String(message.getBody()));System.out.println(replyCode:replyCode);System.out.println(replyText:replyText);System.out.println(exchange:exchange);System.out.println(routingKey:routingKey);}});return rabbitTemplate;}}4.生产者的controller AutowiredRabbitTemplate rabbitTemplate;GetMapping(/test1)public String test1(String msg,String routkey){System.out.println(msg);String exchangeName ;//默认交换机String routingkey routkey;//队列名字//生产者发送消息rabbitTemplate.convertAndSend(exchangeName,routingkey,msg);return ok;}
5.消费者写一个队列 RabbitListener(queues queueA)public void getMsg1(MapString,Object data, Channel channel,Message message) throws IOException {System.out.println(data);//手动ack//若开启手动ack不给手动ack就按照 prefetch: 1 #等价于basicQos(1)的量就这么多不会多给你了因为你没有确认。确认一条就给你一条channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}
6.消费者的配置文件
spring:rabbitmq:host: 8.140.244.227port: 6786username: testpassword: testvirtual-host: /test#手动ACKlistener:simple:acknowledge-mode: manual # 手动ackprefetch: 1 #等价于basicQos(1)