无锡品牌网站建设,培训网站系统建设,网站分页怎么做,郑州市网站制作公司Lison dreamlison163.com, v1.0.0, 2023.06.22
七种模式介绍与代码演示 文章目录 七种模式介绍与代码演示四大交换机四种交换机介绍 工作模式简单模式#xff08;Hello World#xff09;工作队列模式#xff08;Work queues#xff09;订阅模式#xff08;Publis…Lison dreamlison163.com, v1.0.0, 2023.06.22
七种模式介绍与代码演示 文章目录 七种模式介绍与代码演示四大交换机四种交换机介绍 工作模式简单模式Hello World工作队列模式Work queues订阅模式Publish/Subscribe路由模式Routing主题模式Topics远程过程调用RPC发布者确认Publisher Confirms 代码演示简单模式工作队列模式发布订阅模式路由模式主题模式 SpringBoot整合RabbitMQ引入依赖基础配置生产者消费者 四大交换机
交换机概念 交换机可以理解成具有路由表的路由程序仅此而已。每个消息都有一个称为路由键routing key的属性就是一个简单的字符串。最新版本的RabbitMQ有四种交换机类型分别是Direct exchange、Fanout exchange、Topic exchange、Headers exchange。交换机的作用 生产者向brokerrabbitmq服务器发送消息交换机通过生产者绑定的路由键将消息推送到不同的消息队列中。而消费者只绑定队列从队列中获取消息。以上三种类型的发送精准顺序为点对点类型 通配符类型 广播类型 四种交换机介绍
直连交换机Direct exchange 具有路由功能的交换机绑定到此交换机的时候需要指定一个routing_key交换机发送消息的时候需要routing_key会将消息发送道对应的队列扇形交换机Fanout exchange 广播消息到所有队列没有任何处理速度最快主题交换机Topic exchange 在直连交换机基础上增加模式匹配也就是对routing_key进行模式匹配*代表一个单词#代表多个单词首部交换机Headers exchange 忽略routing_key使用Headers信息一个Hash的数据结构进行匹配优势在于可以有更多更灵活的匹配规则
交换机参数 name: 交换机名称type: 交换机类型 directtopicfanoutheadersdurability: 是否需要持久化true 为持久化auto delete: 当最后一个绑定到 Exchange 上的队列被删除后Exchange 就没有绑定的队列了自动删除该 Exchangeinternal: 当前 Exchange 是否为内部交换机默认为 false客户端无法直接发送消息到这个交换机中只能通过交换机路由到交换机这种方式arguments: 扩展参数用于扩展 AMQP 协议自制定化使用 工作模式
简单模式Hello World 做最简单的事情一个生产者对应一个消费者RabbitMQ相当于一个消息代理负责将A的消息转发给B
应用场景 将发送的电子邮件放到消息队列然后邮件服务在队列中获取邮件并发送给收件人
工作队列模式Work queues 在多个消费者之间分配任务竞争的消费者模式一个生产者对应多个消费者一般适用于执行资源密集型任务单个消费者处理不过来需要多个消费者进行处理
应用场景 一个订单的处理需要10s有多个订单可以同时放到消息队列然后让多个消费者同时处理这样就是并行了而不是单个消费者的串行情况
订阅模式Publish/Subscribe 一次向许多消费者发送消息一个生产者发送的消息会被多个消费者获取也就是将消息将广播到所有的消费者中。
应用场景 更新商品库存后需要通知多个缓存和多个数据库这里的结构应该是
一个fanout类型交换机扇出两个个消息队列分别为缓存消息队列、数据库消息队列一个缓存消息队列对应着多个缓存消费者一个数据库消息队列对应着多个数据库消费者
路由模式Routing 有选择地Routing key接收消息发送消息到交换机并且要指定路由key 消费者将队列绑定到交换机时需要指定路由key仅消费指定路由key的消息
应用场景 如在商品库存中增加了1台iphone12iphone12促销活动消费者指定routing key为iphone12只有此促销活动会接收到消息其它促销活动不关心也不会消费此routing key的消息
主题模式Topics 根据主题Topics来接收消息将路由key和某模式进行匹配此时队列需要绑定在一个模式上#匹配一个词或多个词*只匹配一个词。
应用场景 同上iphone促销活动可以接收主题为iphone的消息如iphone12、iphone13等
远程过程调用RPC 如果我们需要在远程计算机上运行功能并等待结果就可以使用RPC具体流程可以看图。应用场景需要等待接口返回数据如订单支付
发布者确认Publisher Confirms
与发布者进行可靠的发布确认发布者确认是RabbitMQ扩展可以实现可靠的发布。在通道上启用发布者确认后RabbitMQ将异步确认发送者发布的消息这意味着它们已在服务器端处理。
应用场景 对于消息可靠性要求较高比如钱包扣款
代码演示
代码中没有对后面两种模式演示有兴趣可以自己研究
简单模式
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class SimpleQueueSender {private final static String QUEUE_NAME simple_queue;public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory new ConnectionFactory();factory.setHost(127.0.0.1);factory.setPort(5672);factory.setUsername(admin);factory.setPassword(123456);Connection connection factory.newConnection();Channel channel connection.createChannel();// 声明队列// queue队列名// durable是否持久化// exclusive是否排外 即只允许该channel访问该队列 一般等于true的话用于一个队列只能有一个消费者来消费的场景// autoDelete是否自动删除 消费完删除// arguments其他属性channel.queueDeclare(QUEUE_NAME, false, false, false, null);//消息内容String message simplest mode message;channel.basicPublish(, QUEUE_NAME, null, message.getBytes());System.out.println([x]Sent message );//最后关闭通关和连接channel.close();connection.close();}
}import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class SimpleQueueReceiver {private final static String QUEUE_NAME simple_queue;public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {// 获取连接ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);factory.setPort(5672);factory.setUsername(admin);factory.setPassword(123456);Connection connection factory.newConnection();Channel channel connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);DeliverCallback deliverCallback (consumerTag, delivery) - {String message new String(delivery.getBody(), UTF-8);System.out.println( [x] Received delivery.getEnvelope().getRoutingKey() : message );};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag - {});System.out.println(----);}
}工作队列模式
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class QueueWorkReceiver1 {private final static String QUEUE_NAME queue_work;public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {// 获取连接ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);factory.setPort(5672);factory.setUsername(admin);factory.setPassword(123456);Connection connection factory.newConnection();Channel channel connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 同一时刻服务器只会发送一条消息给消费者channel.basicQos(1);DeliverCallback deliverCallback (consumerTag, delivery) - {String message new String(delivery.getBody(), UTF-8);System.out.println( [x] Received delivery.getEnvelope().getRoutingKey() : message );};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag - {});}
}import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class QueueWorkReceiver2 {private final static String QUEUE_NAME queue_work;public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {// 获取连接ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);factory.setPort(5672);factory.setUsername(admin);factory.setPassword(123456);Connection connection factory.newConnection();Channel channel connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 同一时刻服务器只会发送一条消息给消费者channel.basicQos(1);DeliverCallback deliverCallback (consumerTag, delivery) - {String message new String(delivery.getBody(), UTF-8);System.out.println( [x] Received delivery.getEnvelope().getRoutingKey() : message );};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag - {});}
}import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class QueueWorkSender {private final static String QUEUE_NAME queue_work;public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);factory.setPort(5672);factory.setUsername(admin);factory.setPassword(123456);Connection connection factory.newConnection();Channel channel connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);for (int i 0; i 100; i) {String message work mode message i;channel.basicPublish(, QUEUE_NAME, null, message.getBytes());System.out.println([x] Sent message );Thread.sleep(i * 10);}channel.close();connection.close();}
}发布订阅模式
package com.lison.upgrade.middleware.rabbitmq;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;public class PublishSubscribeReceive1 {private static final String EXCHANGE_NAME logs;public static void main(String[] argv) throws Exception {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);factory.setPort(5672);factory.setUsername(admin);factory.setPassword(123456);Connection connection factory.newConnection();Channel channel connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, fanout);String queueName channel.queueDeclare().getQueue();channel.queueBind(queueName, EXCHANGE_NAME, );System.out.println( [*] Waiting for messages. To exit press CTRLC);// 订阅消息的回调函数DeliverCallback deliverCallback (consumerTag, delivery) - {String message new String(delivery.getBody(), UTF-8);System.out.println( [x] Received message );};// 消费者有消息时出发订阅回调函数channel.basicConsume(queueName, true, deliverCallback, consumerTag - {});}
}package com.lison.upgrade.middleware.rabbitmq;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;public class PublishSubscribeReceive2 {private static final String EXCHANGE_NAME logs;public static void main(String[] argv) throws Exception {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);factory.setPort(5672);factory.setUsername(admin);factory.setPassword(123456);Connection connection factory.newConnection();Channel channel connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, fanout);String queueName channel.queueDeclare().getQueue();channel.queueBind(queueName, EXCHANGE_NAME, );System.out.println( [*] Waiting for messages. To exit press CTRLC);// 订阅消息的回调函数DeliverCallback deliverCallback (consumerTag, delivery) - {String message new String(delivery.getBody(), UTF-8);System.out.println( [x] Received2 message );};// 消费者有消息时出发订阅回调函数channel.basicConsume(queueName, true, deliverCallback, consumerTag - {});}
}import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class PublishSubscribeSender {private static final String EXCHANGE_NAME logs;public static void main(String[] argv) throws Exception {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);factory.setPort(5672);factory.setUsername(admin);factory.setPassword(123456);Connection connection factory.newConnection();Channel channel connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, fanout);String message publish subscribe message;channel.basicPublish(EXCHANGE_NAME, , null, message.getBytes(UTF-8));System.out.println( [x] Sent message );channel.close();connection.close();}
}
路由模式
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class QueueRoutingReceiver1 {private final static String QUEUE_NAME queue_routing;private final static String EXCHANGE_NAME exchange_direct;public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);factory.setPort(5672);factory.setUsername(admin);factory.setPassword(123456);Connection connection factory.newConnection();Channel channel connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 指定路由的key接收key和key2channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, key);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, key2);channel.basicQos(1);DeliverCallback deliverCallback (consumerTag, delivery) - {String message new String(delivery.getBody(), UTF-8);System.out.println( [x] Received delivery.getEnvelope().getRoutingKey() : message );};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag - {});}}import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class QueueRoutingReceiver2 {private final static String QUEUE_NAME queue_routing2;private final static String EXCHANGE_NAME exchange_direct;public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);factory.setPort(5672);factory.setUsername(admin);factory.setPassword(123456);Connection connection factory.newConnection();Channel channel connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 仅接收key2channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, key2);channel.basicQos(1);DeliverCallback deliverCallback (consumerTag, delivery) - {String message new String(delivery.getBody(), UTF-8);System.out.println( [x] Received delivery.getEnvelope().getRoutingKey() : message );};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag - {});}
}import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class QueueRoutingSender {private final static String EXCHANGE_NAME exchange_direct;private final static String EXCHANGE_TYPE direct;public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);factory.setPort(5672);factory.setUsername(admin);factory.setPassword(123456);Connection connection factory.newConnection();Channel channel connection.createChannel();// 交换机声明channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE);// 只有routingKey相同的才会消费String message routing mode message;channel.basicPublish(EXCHANGE_NAME, key2, null, message.getBytes());System.out.println([x] Sent message );
// channel.basicPublish(EXCHANGE_NAME, key, null, message.getBytes());
// System.out.println([x] Sent message );channel.close();connection.close();}
}主题模式
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Receiver1 {private final static String QUEUE_NAME queue_topic;private final static String EXCHANGE_NAME exchange_topic;public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);factory.setPort(5672);Connection connection factory.newConnection();Channel channel connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 可以接收key.1channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, key.*);channel.basicQos(1);DeliverCallback deliverCallback (consumerTag, delivery) - {String message new String(delivery.getBody(), UTF-8);System.out.println( [x] Received delivery.getEnvelope().getRoutingKey() : message );};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag - {});}
}import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Receiver2 {private final static String QUEUE_NAME queue_topic2;private final static String EXCHANGE_NAME exchange_topic;private final static String EXCHANGE_TYPE topic;public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);factory.setPort(5672);Connection connection factory.newConnection();Channel channel connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);// *号代表单个单词可以接收key.1channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, *.*);// #号代表多个单词可以接收key.1.2channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, *.#);channel.basicQos(1);DeliverCallback deliverCallback (consumerTag, delivery) - {String message new String(delivery.getBody(), UTF-8);System.out.println( [x] Received delivery.getEnvelope().getRoutingKey() : message );};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag - {});}
}import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Sender {private final static String EXCHANGE_NAME exchange_topic;private final static String EXCHANGE_TYPE topic;public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);factory.setPort(5672);Connection connection factory.newConnection();Channel channel connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE);String message topics model message with key.1;channel.basicPublish(EXCHANGE_NAME, key.1, null, message.getBytes());System.out.println([x] Sent message );String message2 topics model message with key.1.2;channel.basicPublish(EXCHANGE_NAME, key.1.2, null, message2.getBytes());System.out.println([x] Sent message2 );channel.close();connection.close();}
}SpringBoot整合RabbitMQ
引入依赖基础配置
1、创建SpringBoot项目引入RabbitMQ起步依赖
!-- RabbitMQ起步依赖 --
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId
/dependency2、写配置文件
spring:rabbitmq:host: 127.0.0.1port: 5672username: adminpassword: 123456virtual-host: /
#日志格式
logging:pattern:console: %d{HH:mm:ss.SSS} %clr(%-5level) --- [%-15thread] %cyan(%-50logger{50}):%msg%nSpringBoot整合RabbitMQ时需要在配置类创建队列和交换机写法如下
Configuration
public class RabbitConfig {private final String EXCHANGE_NAME boot_topic_exchange;private final String QUEUE_NAME boot_queue;// 创建交换机Bean(bootExchange)public Exchange getExchange() {return ExchangeBuilder.topicExchange(EXCHANGE_NAME) // 交换机类型.durable(true) // 是否持久化.build();}// 创建队列Bean(bootQueue)public Queue getMessageQueue() {return new Queue(QUEUE_NAME); // 队列名}// 交换机绑定队列Beanpublic Binding bindMessageQueue(Qualifier(bootExchange) Exchange exchange, Qualifier(bootQueue) Queue queue) {return BindingBuilder.bind(queue).to(exchange).with(#.message.#).noargs();}
}
生产者
SpringBoot整合RabbitMQ时提供了工具类RabbitTemplate发送消息编写生产者时只需要注入RabbitTemplate即可发送消息。
SpringBootTest
public class TestProducer {// 注入RabbitTemplate工具类Autowiredprivate RabbitTemplate rabbitTemplate;Testpublic void testSendMessage(){/*** 发送消息* 参数1交换机* 参数2路由key* 参数3要发送的消息*/rabbitTemplate.convertAndSend(boot_topic_exchange,message,这个是生产者发送消息);}
}消费者
Component
public class Consumer {// 监听队列RabbitListener(queues boot_queue)public void listen_message(String message){System.out.println(发送短信message);}
}启动项目可以看到消费者会消费队列中的消息