手机版网站推荐,赣州本地网,怎样注册网站账号申请,毕业答辩企业网站开发的问题RabbitMQ手动签收消息
这里讲解SpringBoot使用RabbitMQ进行有回调的用法和消费者端手动签收消息的用法。
1、pom依赖
?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0 xmlns:xsih…RabbitMQ手动签收消息
这里讲解SpringBoot使用RabbitMQ进行有回调的用法和消费者端手动签收消息的用法。
1、pom依赖
?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0 xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersionparentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion2.5.4/versionrelativePath//parentgroupIdcom.example.demo/groupIdartifactIdrabbitmq-demo/artifactIdversion0.0.1-SNAPSHOT/versionnamerabbitmq-demo/namedescriptionrabbitmq-demno/descriptionpropertiesjava.version8/java.version/propertiesdependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdscopetest/scope/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactId/dependency/dependenciesbuildpluginsplugingroupIdorg.springframework.boot/groupIdartifactIdspring-boot-maven-plugin/artifactId/plugin/plugins/build/project2、配置文件
server:port: 9090
spring:application:name: rabbit-confirmrabbitmq:template:# 使用return-callback时必须设置mandatory为truemandatory: true# 消息发送到交换机确认机制,是否确认回调publisher-confirm-type: correlated# 消息发送到交换机确认机制是否返回回调publisher-returns: truelistener:simple:# 并发消费者初始化值concurrency: 5# 最大值max-concurrency: 10# 每个消费者每次监听时可拉取处理的消息数量prefetch: 20# 确认模式设置为手动签收acknowledge-mode: manualusername: zsx242030password: zsx242030virtual-host: /3、定义配置类
package com.example.demo.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;Configuration
public class ConfirmConfiguration {/*** 声明confirm.message队列*/Beanpublic Queue confirmQueue() {return new Queue(confirm.message);}/*** 声明一个名为exchange-2的交换机*/Beanpublic TopicExchange exchange2() {return new TopicExchange(exchange-2);}/*** 将confirm.message的队列绑定到exchange-2交换机*/Beanpublic Binding bindMessage1() {return BindingBuilder.bind(confirmQueue()).to(exchange2()).with(confirm.message);}
}4、定义生产者
package com.example.demo.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.sql.Timestamp;
import java.time.LocalDateTime;Component
Slf4j
public class ConfirmProducer {Resourceprivate RabbitTemplate rabbitTemplate;/*** 如果消息没有到exchange,则confirm回调,ackfalse* 如果消息到达exchange,则confirm回调,acktrue* exchange到queue成功,则不回调return* exchange到queue失败,则回调return(需设置mandatorytrue,否则不回回调,消息就丢了)*/private final RabbitTemplate.ConfirmCallback confirmCallback (correlationData, ack, cause) - {if (!ack) {log.error(消息发送失败correlationData: {},cause: {}, correlationData, cause);} else {log.info(消息发送成功correlationData: {},ack: {}, correlationData, ack);}};private final RabbitTemplate.ReturnCallback returnCallback (message, replyCode, replyText, exchange, routeKey) -log.error(消息丢失: exchange: {},routeKey: {},replyCode: {},replyText: {}, exchange, routeKey, replyCode, replyText);/*** 发送消息** param message 消息内容*/public void send(String message) {// 构建回调返回的数据CorrelationData correlationData new CorrelationData();Timestamp time Timestamp.valueOf(LocalDateTime.now());correlationData.setId(time );Message message1 MessageBuilder.withBody(message.toString().getBytes()).setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)// 将CorrelationData的id 与 Message的correlationId绑定然后关系保存起来,然后人工处理.setCorrelationId(correlationData.getId()).build();rabbitTemplate.setConfirmCallback(confirmCallback);rabbitTemplate.setReturnCallback(returnCallback);rabbitTemplate.convertAndSend(exchange-2, confirm.message, message1, correlationData);}
}5、定义消费者
package com.example.demo.config;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;Component
Slf4j
public class ConfirmConsumer {RabbitListener(bindings QueueBinding(value Queue(value confirm.message,durable true),exchange Exchange(value exchange-2,type topic),key confirm.message))public void receive(String message, Message message1, Channel channel) throws IOException {log.info(消费者收到消息{}, message);long deliverTag message1.getMessageProperties().getDeliveryTag();//第一个deliveryTag参数为每条信息带有的tag值第二个multiple参数为布尔类型//为true时会将小于等于此次tag的所有消息都确认掉如果为false则只确认当前tag的信息可根据实际情况进行选择。channel.basicAck(deliverTag, false);}
}6、创建controller调用
package com.example.demo.controller;import com.example.demo.config.ConfirmProducer;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;RestController
public class ConfirmController {Resourceprivate ConfirmProducer confirmProducer;GetMapping(/confirm-message)public void confirmMessage() {confirmProducer.send(hello confirm message);}
}7、启动类
package com.example.demo;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;SpringBootApplication
public class RabbitDemoApplication {public static void main(String[] args) {SpringApplication.run(RabbitDemoApplication.class, args);}}8、测试
http://localhost:9090/confirm-message2022-07-05 18:20:43.043 INFO 4492 --- [nectionFactory1] com.example.demo.config.ConfirmProducer : 消息发送成功correlationData: CorrelationData [id2022-07-05 18:20:43.025],ack: true
2022-07-05 18:20:43.046 INFO 4492 --- [ntContainer#0-5] com.example.demo.config.ConfirmConsumer : 消费者收到消息hello confirm message