室内设计网站模板,app搭建要多少钱,wordpress公式 插件,织梦帝国wordpress#x1f9d1;#x1f4bb;作者名称#xff1a;DaenCode #x1f3a4;作者简介#xff1a;CSDN实力新星#xff0c;后端开发两年经验#xff0c;曾担任甲方技术代表#xff0c;业余独自创办智源恩创网络科技工作室。会点点Java相关技术栈、帆软报表、低代码平台快速开… 作者名称DaenCode 作者简介CSDN实力新星后端开发两年经验曾担任甲方技术代表业余独自创办智源恩创网络科技工作室。会点点Java相关技术栈、帆软报表、低代码平台快速开发。技术尚浅闭关学习中······ 人生感悟尝尽人生百味方知世间冷暖。 所属专栏图解RabbitMQ 专栏推荐
专门为Redis入门打造的专栏包含Redis基础知识、基础命令、五大数据类型实战场景、key删除策略、内存淘汰机制、持久化机制、哨兵模式、主从复制、分布式锁等等内容。链接《Redis从头学》SpringBoot实战相关专栏包含SpringBoot过滤器、拦截器、AOP实现日志、整合Freemaker、整合Redis等等实战相关内容多篇文章登入全站热榜、领域热榜、被技术社区收录。链接《SpringBoot实战》 文章目录 专栏推荐前言连接工具类简单工作模型介绍代码实现 工作队列模型介绍代码实现 发布订阅模型介绍代码实现 路由模型介绍代码实现 主题模型介绍代码实现 总结写在最后 参考网站https://www.rabbitmq.com/getstarted.html
前言
在上一节学习了RabbitMQ中交换机的相关基础知识本文来学习一下RabbitMQ中的五种队列模型的对其有一个基本的认识。
连接工具类
public class MQConnectionUtil {public static Connection createConnection() throws IOException, TimeoutException {//创建连接工厂ConnectionFactory factory new ConnectionFactory();factory.setHost(192.168.124.23);factory.setUsername(admin);factory.setPassword(password);factory.setVirtualHost(/dev);factory.setPort(5672);return factory.newConnection();}
}简单工作模型
介绍
模型图 流程
生产者发送消息到队列。如果队列存在则直接存入消息若不存在先进行队列的创建。消费者监听队列。处理完消息通过ACK机制确认消息已经消费。
特点
只有一个消费者并且其中没有交换机参与。 代码实现
生产者
public class Send {private final static String QUEUE_NAMEhello;public static void main(String[] args) throws IOException, TimeoutException {try ( //JDK7语法 或自动关闭 connnection和channel//创建连接Connection connectionMQConnectionUtil.createConnection();//创建信道Channel channel connection.createChannel()) {/*** 队列名称* 持久化配置mq重启后还在* 是否独占只能有一个消费者监听队列当connection关闭是否删除队列一般是false发布订阅是独占* 自动删除: 当没有消费者的时候自动删除掉一般是false* 其他参数** 队列不存在则会自动创建如果存在则不会覆盖所以此时的时候需要注意属性*/channel.queueDeclare(QUEUE_NAME, false, false, false, null);String message Hello World!;/*** 参数说明* 交换机名称不写则是默认的交换机那路由健需要和队列名称一样才可以被路由* 路由健名称* 配置信息* 发送的消息数据字节数组*///发布消息channel.basicPublish(, QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));System.out.println( [x] Sent message );}}
}消费者
public class Recv {private final static String QUEUE_NAME hello;public static void main(String[] argv) throws Exception {//消费者一般不增加自动关闭Connection connectionMQConnectionUtil.createConnection();Channel channel connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println( [*] Waiting for messages. To exit press CTRLC);//回调方法下面两种都行Consumer consumer new DefaultConsumer(channel){Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// consumerTag 是固定的 可以做此会话的名字 deliveryTag 每次接收消息1System.out.println(consumerTag消息标识consumerTag);//可以获取交换机路由健等System.out.println(envelope元数据envelope);System.out.println(properties配置信息properties);System.out.println(bodynew String(body,utf-8));}};channel.basicConsume(QUEUE_NAME,true,consumer);// DeliverCallback deliverCallback (consumerTag,delivery) - {
// String message new String(delivery.getBody(), UTF-8);
// System.out.println( [x] Received message );
// };//自动确认消息
// channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag - { });}
}工作队列模型
介绍
模型图 特点
生产者将消息发送到队列并由多个消费者进行消费。
两种消费策略 1 . 轮训策略将消息平均分配给多个消费者进行消费不考虑消费者的处理能力采用自动ACK消息机制。 2. 公平策略消费者每次只能处理一个消息。一定时间内能力强者消费的多否则少采用手动ACK消息机制。 代码实现
轮询策略
//生产者
public class Send {private final static String QUEUE_NAMEwork_rr;public static void main(String[] args) throws IOException, TimeoutException {try ( //JDK7语法 或自动关闭 connnection和channel//创建连接Connection connection MQConnectionUtil.createConnection();//创建信道Channel channel connection.createChannel()) {/*** 队列名称* 持久化配置mq重启后还在* 是否独占只能有一个消费者监听队列当connection关闭是否删除队列一般是false发布订阅是独占* 自动删除: 当没有消费者的时候自动删除掉一般是false* 其他参数** 队列不存在则会自动创建如果存在则不会覆盖所以此时的时候需要注意属性*/channel.queueDeclare(QUEUE_NAME, false, false, false, null);for (int i 0; i 10; i) {String message Hello World!;/*** 参数说明* 交换机名称不写则是默认的交换机那路由健需要和队列名称一样才可以被路由* 路由健名称* 配置信息* 发送的消息数据字节数组*///发布消息channel.basicPublish(, QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));System.out.println( [x] Sent message );}}}
}
//消费者
public class Recv {private final static String QUEUE_NAME work_rr;public static void main(String[] argv) throws Exception {//消费者一般不增加自动关闭Connection connection MQConnectionUtil.createConnection();Channel channel connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println( [*] Waiting for messages. To exit press CTRLC);//回调方法下面两种都行Consumer consumer new DefaultConsumer(channel){Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {throw new RuntimeException(e);}// consumerTag 是固定的 可以做此会话的名字 deliveryTag 每次接收消息1System.out.println(consumerTag消息标识consumerTag);//可以获取交换机路由健等System.out.println(envelope元数据envelope);System.out.println(properties配置信息properties);System.out.println(bodynew String(body,utf-8));channel.basicAck(envelope.getDeliveryTag(), false);}};channel.basicConsume(QUEUE_NAME,false,consumer);// DeliverCallback deliverCallback (consumerTag,delivery) - {
// String message new String(delivery.getBody(), UTF-8);
// System.out.println( [x] Received message );
// };//自动确认消息
// channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag - { });}
}
//消费者2
public class Recv2 {private final static String QUEUE_NAME work_rr;public static void main(String[] argv) throws Exception {Connection connection MQConnectionUtil.createConnection();Channel channel connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println( [*] Waiting for messages. To exit press CTRLC);//回调方法下面两种都行Consumer consumer new DefaultConsumer(channel){Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// consumerTag 是固定的 可以做此会话的名字 deliveryTag 每次接收消息1System.out.println(consumerTag消息标识consumerTag);//可以获取交换机路由健等System.out.println(envelope元数据envelope);System.out.println(properties配置信息properties);System.out.println(bodynew String(body,utf-8));channel.basicAck(envelope.getDeliveryTag(), false);}};channel.basicConsume(QUEUE_NAME,false,consumer);// DeliverCallback deliverCallback (consumerTag,delivery) - {
// String message new String(delivery.getBody(), UTF-8);
// System.out.println( [x] Received message );
// };//自动确认消息
// channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag - { });}
}公平策略 通过channel.basicQos(1);确保每个消费者每次只能处理一个未确认的消息。
public class Send {private final static String QUEUE_NAMEwork_fair;public static void main(String[] args) throws IOException, TimeoutException {try ( //JDK7语法 或自动关闭 connnection和channel//创建连接Connection connection MQConnectionUtil.createConnection();//创建信道Channel channel connection.createChannel()) {/*** 队列名称* 持久化配置mq重启后还在* 是否独占只能有一个消费者监听队列当connection关闭是否删除队列一般是false发布订阅是独占* 自动删除: 当没有消费者的时候自动删除掉一般是false* 其他参数** 队列不存在则会自动创建如果存在则不会覆盖所以此时的时候需要注意属性*/channel.queueDeclare(QUEUE_NAME, false, false, false, null);for (int i 0; i 10; i) {String message Hello World!;/*** 参数说明* 交换机名称不写则是默认的交换机那路由健需要和队列名称一样才可以被路由* 路由健名称* 配置信息* 发送的消息数据字节数组*///发布消息channel.basicPublish(, QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));System.out.println( [x] Sent message );}}}
}
//消费者1
public class Recv {private final static String QUEUE_NAME work_fair;public static void main(String[] argv) throws Exception {Connection connection MQConnectionUtil.createConnection();Channel channel connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println( [*] Waiting for messages. To exit press CTRLC);channel.basicQos(1);//回调方法下面两种都行Consumer consumer new DefaultConsumer(channel){Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {throw new RuntimeException(e);}// consumerTag 是固定的 可以做此会话的名字 deliveryTag 每次接收消息1System.out.println(consumerTag消息标识consumerTag);//可以获取交换机路由健等System.out.println(envelope元数据envelope);System.out.println(properties配置信息properties);System.out.println(bodynew String(body,utf-8));channel.basicAck(envelope.getDeliveryTag(), false);}};channel.basicConsume(QUEUE_NAME,false,consumer);// DeliverCallback deliverCallback (consumerTag,delivery) - {
// String message new String(delivery.getBody(), UTF-8);
// System.out.println( [x] Received message );
// };//自动确认消息
// channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag - { });}
}
//消费者2
public class Recv2 {private final static String QUEUE_NAME work_fair;public static void main(String[] argv) throws Exception {Connection connectionMQConnectionUtil.createConnection();Channel channel connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println( [*] Waiting for messages. To exit press CTRLC);channel.basicQos(1);//回调方法下面两种都行Consumer consumer new DefaultConsumer(channel){Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// consumerTag 是固定的 可以做此会话的名字 deliveryTag 每次接收消息1System.out.println(consumerTag消息标识consumerTag);//可以获取交换机路由健等System.out.println(envelope元数据envelope);System.out.println(properties配置信息properties);System.out.println(bodynew String(body,utf-8));channel.basicAck(envelope.getDeliveryTag(), false);}};channel.basicConsume(QUEUE_NAME,false,consumer);// DeliverCallback deliverCallback (consumerTag,delivery) - {
// String message new String(delivery.getBody(), UTF-8);
// System.out.println( [x] Received message );
// };//自动确认消息
// channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag - { });}
}发布订阅模型
介绍
模型图
特点
一条消息可以被多个消费者同时接收。采用扇形Fanout交换机。无需路由Key类似于公众号的订阅。 代码实现
生产者
public class Send {private final static String EXCHANGE_NAMEexchange_fanout;public static void main(String[] args) throws IOException, TimeoutException {try ( //JDK7语法 或自动关闭 connnection和channel//创建连接Connection connection MQConnectionUtil.createConnection();//创建信道Channel channel connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);String messagedaencode rabbitmq pub;channel.basicPublish(EXCHANGE_NAME, ,null,message.getBytes(StandardCharsets.UTF_8));System.out.println(广播消息已经发送);}}
}消费者两个消费者都是一样的代码都需要绑定相同的扇形交换机。
public class Recv {private final static String EXCHANGE_NAMEexchange_fanout;public static void main(String[] argv) throws Exception {Connection connection MQConnectionUtil.createConnection();Channel channel connection.createChannel();//绑定交换机,fanout扇形即广播类型channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.FANOUT);//获取队列排它队列String queueName channel.queueDeclare().getQueue();//绑定队列和交换机,fanout交换机不用指定routingkeychannel.queueBind(queueName,EXCHANGE_NAME,);DeliverCallback deliverCallback (consumerTag, delivery) - {String message new String(delivery.getBody(), UTF-8);System.out.println( [x] Received message );};//自动确认消息channel.basicConsume(queueName, true, deliverCallback, consumerTag - { });}
}路由模型
介绍
模型图 特点
交换机类型采用直连交换机特定的路由key由特定的消费者进行消费。交换机根据特定的路由key与队列进行绑定。 代码实现 以记录不同日志级别为例不同的消费者进行不同日志级别的记录。 生产者
public class Send {private final static String EXCHANGE_NAMEexchange_direct;public static void main(String[] args) throws IOException, TimeoutException {try ( //JDK7语法 或自动关闭 connnection和channel//创建连接Connection connection MQConnectionUtil.createConnection();//创建信道Channel channel connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);String debugLog[debug]daencode rabbitmq direct;String errorLog[error]出现error错误;channel.basicPublish(EXCHANGE_NAME,errorRoutingKey,null,errorLog.getBytes(StandardCharsets.UTF_8));channel.basicPublish(EXCHANGE_NAME,debugRoutingKey,null,debugLog.getBytes(StandardCharsets.UTF_8));System.out.println(消息已经发送);}}
}消费者1只记录ERROR级别日志。
public class Recv1 {private final static String EXCHANGE_NAMEexchange_direct;public static void main(String[] argv) throws Exception {Connection connection MQConnectionUtil.createConnection();Channel channel connection.createChannel();//绑定交换机,fanout扇形即广播类型channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT);//获取队列排它队列String queueName channel.queueDeclare().getQueue();//绑定队列和交换机,fanout交换机不用指定routingkeychannel.queueBind(queueName,EXCHANGE_NAME,errorRoutingKey);DeliverCallback deliverCallback (consumerTag, delivery) - {String message new String(delivery.getBody(), UTF-8);System.out.println( [x] Received message );};//自动确认消息channel.basicConsume(queueName, true, deliverCallback, consumerTag - { });}
}消费者2只记录Debug级别日志。
public class Recv2 {private final static String EXCHANGE_NAMEexchange_direct;public static void main(String[] argv) throws Exception {Connection connection MQConnectionUtil.createConnection();Channel channel connection.createChannel();//绑定交换机,fanout扇形即广播类型channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT);//获取队列排它队列String queueName channel.queueDeclare().getQueue();//绑定队列和交换机,fanout交换机不用指定routingkeychannel.queueBind(queueName,EXCHANGE_NAME,debugRoutingKey);DeliverCallback deliverCallback (consumerTag, delivery) - {String message new String(delivery.getBody(), UTF-8);System.out.println( [x] Received message );};//自动确认消息channel.basicConsume(queueName, true, deliverCallback, consumerTag - { });}
}主题模型
介绍
模型图 特点
交换机类型采用主题交换机。路由key根据通配符规则限定消息消费规则。*匹配一个词#匹配一个或者多个词。交换机通过通配符路由KEY将消息绑定到不同的队列以此实现不同的消费者进行消息消费。同时满足路由模型和发布订阅模型。 代码实现
生产者生产者通过路由KEY向交换机发送消息。
public class Send {private final static String EXCHANGE_NAMEexchange_topic;public static void main(String[] args) throws IOException, TimeoutException {try ( //JDK7语法 或自动关闭 connnection和channel//创建连接Connection connection MQConnectionUtil.createConnection();//创建信道Channel channel connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);String debugLog[debug]daencode rabbitmq direct;String errorLog[error]出现error错误;channel.basicPublish(EXCHANGE_NAME,log.error,null,errorLog.getBytes(StandardCharsets.UTF_8));channel.basicPublish(EXCHANGE_NAME,log.debug,null,debugLog.getBytes(StandardCharsets.UTF_8));System.out.println(广播消息已经发送);}}
}消费者
public class Recv1 {private final static String EXCHANGE_NAMEexchange_topic;public static void main(String[] argv) throws Exception {Connection connection MQConnectionUtil.createConnection();Channel channel connection.createChannel();//绑定交换机,fanout扇形即广播类型channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC);//获取队列排它队列String queueName channel.queueDeclare().getQueue();//绑定队列和交换机,fanout交换机不用指定routingkeychannel.queueBind(queueName,EXCHANGE_NAME,*.debug);DeliverCallback deliverCallback (consumerTag, delivery) - {String message new String(delivery.getBody(), UTF-8);System.out.println( [x] Received message );};//自动确认消息channel.basicConsume(queueName, true, deliverCallback, consumerTag - { });}
}消费者2 public class Recv2 {private final static String EXCHANGE_NAMEexchange_topic;public static void main(String[] argv) throws Exception {Connection connection MQConnectionUtil.createConnection();Channel channel connection.createChannel();//绑定交换机,fanout扇形即广播类型channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC);//获取队列排它队列String queueName channel.queueDeclare().getQueue();//绑定队列和交换机,fanout交换机不用指定routingkeychannel.queueBind(queueName,EXCHANGE_NAME,*.error);DeliverCallback deliverCallback (consumerTag, delivery) - {String message new String(delivery.getBody(), UTF-8);System.out.println( [x] Received message );};//自动确认消息channel.basicConsume(queueName, true, deliverCallback, consumerTag - { });}
}总结
模型是否交换机参与交换机类型需要路由键描述简单模型否无否消息直接发送到队列最基本的消息传递模型。工作模型否无否多个消费者共同处理一个队列中的消息。发布订阅模型是fanout否将消息广播给所有绑定到交换机的队列多个消费者同时订阅。路由模型是direct是根据消息的路由键将消息发送到与之匹配的队列。主题模型是topic是使用通配符进行灵活的路由根据主题和通配符规则进行匹配。 写在最后
有关于图解RabbitMQ五种队列模型介绍及代码实现到此就结束了。感谢大家的阅读希望大家在评论区对此部分内容散发讨论便于学到更多的知识。