北京的重要的网站,外包建站公司,宜昌网页设计,ftp下载wordpressRabbitMQ 即一个消息队列#xff0c;主要是用来实现应用程序的异步和解耦#xff0c;同时也能起到消息缓冲#xff0c;消息分发的作用。
消息中间件在互联网公司的使用中越来越多#xff0c;刚才还看到新闻阿里将 RocketMQ 捐献给了 Apache#xff0c;当然了今天的主角还…RabbitMQ 即一个消息队列主要是用来实现应用程序的异步和解耦同时也能起到消息缓冲消息分发的作用。
消息中间件在互联网公司的使用中越来越多刚才还看到新闻阿里将 RocketMQ 捐献给了 Apache当然了今天的主角还是讲 RabbitMQ。消息中间件最主要的作用是解耦中间件最标准的用法是生产者生产消息传送到队列消费者从队列中拿取消息并处理生产者不用关心是谁来消费消费者不用关心谁在生产消息从而达到解耦的目的。在分布式的系统中消息队列也会被用在很多其它的方面比如分布式事务的支持RPC 的调用等等。
以前一直使用的是 ActiveMQ在实际的生产使用中也出现了一些小问题在网络查阅了很多的资料后决定尝试使用 RabbitMQ 来替换 ActiveMQRabbitMQ 的高可用性、高性能、灵活性等一些特点吸引了我们查阅了一些资料整理出此文。
RabbitMQ 介绍
RabbitMQ 是实现 AMQP高级消息队列协议的消息中间件的一种最初起源于金融系统用于在分布式系统中存储转发消息在易用性、扩展性、高可用性等方面表现不俗。RabbitMQ 主要是为了实现系统之间的双向解耦而实现的。当生产者大量产生数据时消费者无法快速消费那么需要一个中间层。保存这个数据。
AMQP即 Advanced Message Queuing Protocol高级消息队列协议是应用层协议的一个开放标准为面向消息的中间件设计。消息中间件主要用于组件之间的解耦消息的发送者无需知道消息使用者的存在反之亦然。AMQP 的主要特征是面向消息、队列、路由包括点对点和发布/订阅、可靠性、安全。
RabbitMQ 是一个开源的 AMQP 实现服务器端用Erlang语言编写支持多种客户端如Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP 等支持 AJAX。用于在分布式系统中存储转发消息在易用性、扩展性、高可用性等方面表现不俗。
相关概念
通常我们谈到队列服务, 会有三个概念发消息者、队列、收消息者RabbitMQ 在这个基本概念之上, 多做了一层抽象, 在发消息者和 队列之间, 加入了交换器 (Exchange). 这样发消息者和队列就没有直接联系, 转而变成发消息者把消息给交换器, 交换器根据调度策略再把消息再给队列。 图片 左侧 P 代表 生产者也就是往 RabbitMQ 发消息的程序。 中间即是 RabbitMQ其中包括了 交换机 和 队列。 右侧 C 代表 消费者也就是往 RabbitMQ 拿消息的程序。
那么其中比较重要的概念有 4 个分别为虚拟主机交换机队列和绑定。 虚拟主机一个虚拟主机持有一组交换机、队列和绑定。为什么需要多个虚拟主机呢很简单 RabbitMQ 当中用户只能在虚拟主机的粒度进行权限控制。因此如果需要禁止A组访问B组的交换机/队列/绑定必须为A和B分别创建一个虚拟主机。每一个 RabbitMQ 服务器都有一个默认的虚拟主机“/”。 交换机Exchange 用于转发消息但是它不会做存储 如果没有 Queue bind 到 Exchange 的话它会直接丢弃掉 Producer 发送过来的消息。这里有一个比较重要的概念路由键 。消息到交换机的时候交互机会转发到对应的队列中那么究竟转发到哪个队列就要根据该路由键。 绑定也就是交换机需要和队列相绑定这其中如上图所示是多对多的关系。
交换机(Exchange)
交换机的功能主要是接收消息并且转发到绑定的队列交换机不存储消息在启用ack模式后交换机找不到队列会返回错误。交换机有四种类型Direct, topic, Headers and Fanout Directdirect 类型的行为是先匹配, 再投送. 即在绑定时设定一个 routingkey, 消息的routingkey 匹配时, 才会被交换器投送到绑定的队列中去. Topic按规则转发消息最灵活 Headers设置 header attribute 参数类型的交换机 Fanout转发消息到所有绑定队列
Direct Exchange
Direct Exchange 是 RabbitMQ 默认的交换机模式也是最简单的模式根据key全文匹配去寻找队列。 图片
第一个 X - Q1 就有一个 binding key名字为 orangeX - Q2 就有 2 个 binding key名字为 black 和 green。当消息中的 路由键 和 这个 binding key 对应上的时候那么就知道了该消息去到哪一个队列中。
Ps为什么 X 到 Q2 要有 blackgreen2个 binding key呢一个不就行了吗- 这个主要是因为可能又有 Q3而Q3只接受 black 的信息而Q2不仅接受black 的信息还接受 green 的信息。
Topic Exchange
Topic Exchange 转发消息主要是根据通配符。在这种交换机下队列和交换机的绑定会定义一种路由模式那么通配符就要在这种路由模式和路由键之间匹配后交换机才能转发消息。
在这种交换机模式下 路由键必须是一串字符用句号 . 隔开比如说 agreements.us或者 agreements.eu.stockholm 等。 路由模式必须包含一个 星号*主要用于匹配路由键指定位置的一个单词比如说一个路由模式是这样子agreements..b.*那么就只能匹配路由键是这样子的第一个单词是 agreements第四个单词是 b。井号#就表示相当于一个或者多个单词例如一个匹配模式是 agreements.eu.berlin.#那么以agreements.eu.berlin 开头的路由键都是可以的。
具体代码发送的时候还是一样第一个参数表示交换机第二个参数表示 routing key第三个参数即消息。如下
rabbitTemplate.convertAndSend(testTopicExchange,key1.a.c.key2, this is RabbitMQ!);topic 和 direct 类似, 只是匹配上支持了模式, 在点分的 routing_key 形式中, 可以使用两个通配符: *表示一个词. #表示零个或多个词.
Headers Exchange
headers 也是根据规则匹配, 相较于 direct 和 topic 固定地使用 routing_key , headers 则是一个自定义匹配规则的类型. 在队列与交换器绑定时, 会设定一组键值对规则, 消息中也包括一组键值对( headers 属性), 当这些键值对有一对, 或全部匹配时, 消息被投送到对应队列.
Fanout Exchange
Fanout Exchange 消息广播的模式不管路由键或者是路由模式会把消息发给绑定给它的全部队列如果配置了 routing_key 会被忽略。
Spring Boot 集成 RabbitMQ
Spring Boot 集成 RabbitMQ 非常简单如果只是简单的使用配置非常少Spring Boot 提供了 spring-boot-starter-amqp 项目对消息各种支持。
简单使用
1、配置 Pom 包主要是添加 spring-boot-starter-amqp 的支持
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId
/dependency2、配置文件
配置 RabbitMQ 的安装地址、端口以及账户信息
spring.application.nameSpring-boot-rabbitmqspring.rabbitmq.host192.168.0.86
spring.rabbitmq.port5672
spring.rabbitmq.usernameadmin
spring.rabbitmq.password1234563、队列配置
Configuration
public class RabbitConfig {Beanpublic Queue Queue() {return new Queue(hello);}}4、发送者
rabbitTemplate 是 Spring Boot 提供的默认实现
component
public class HelloSender {Autowiredprivate AmqpTemplate rabbitTemplate;public void send() {String context hello new Date();System.out.println(Sender : context);this.rabbitTemplate.convertAndSend(hello, context);}}4、接收者
Component
RabbitListener(queues hello)
public class HelloReceiver {RabbitHandlerpublic void process(String hello) {System.out.println(Receiver : hello);}}5、测试
RunWith(SpringRunner.class)
SpringBootTest
public class RabbitMqHelloTest {Autowiredprivate HelloSender helloSender;Testpublic void hello() throws Exception {helloSender.send();}}注意发送者和接收者的 queue name 必须一致不然不能接收 多对多使用
一个发送者N 个接收者或者 N 个发送者和 N 个接收者会出现什么情况呢
一对多发送
对上面的代码进行了小改造接收端注册了两个 Receiver,Receiver1 和 Receiver2发送端加入参数计数接收端打印接收到的参数下面是测试代码发送一百条消息来观察两个接收端的执行效果
Test
public void oneToMany() throws Exception {for (int i0;i100;i){neoSender.send(i);}
}结果如下
Receiver 1: Spring boot test queue ****** 11
Receiver 2: Spring boot test queue ****** 12
Receiver 2: Spring boot test queue ****** 14
Receiver 1: Spring boot test queue ****** 13
Receiver 2: Spring boot test queue ****** 15
Receiver 1: Spring boot test queue ****** 16
Receiver 1: Spring boot test queue ****** 18
Receiver 2: Spring boot test queue ****** 17
Receiver 2: Spring boot test queue ****** 19
Receiver 1: Spring boot test queue ****** 20根据返回结果得到以下结论 一个发送者N个接受者,经过测试会均匀的将消息发送到N个接收者中 多对多发送
复制了一份发送者加入标记在一百个循环中相互交替发送
Test
public void manyToMany() throws Exception {for (int i0;i100;i){neoSender.send(i);neoSender2.send(i);}
}结果如下
Receiver 1: Spring boot test queue ****** 20
Receiver 2: Spring boot test queue ****** 20
Receiver 1: Spring boot test queue ****** 21
Receiver 2: Spring boot test queue ****** 21
Receiver 1: Spring boot test queue ****** 22
Receiver 2: Spring boot test queue ****** 22
Receiver 1: Spring boot test queue ****** 23
Receiver 2: Spring boot test queue ****** 23
Receiver 1: Spring boot test queue ****** 24
Receiver 2: Spring boot test queue ****** 24
Receiver 1: Spring boot test queue ****** 25
Receiver 2: Spring boot test queue ****** 25结论和一对多一样接收端仍然会均匀接收到消息 高级使用
对象的支持
Spring Boot 以及完美的支持对象的发送和接收不需要格外的配置。
//发送者
public void send(User user) {System.out.println(Sender object: user.toString());this.rabbitTemplate.convertAndSend(object, user);
}...//接收者
RabbitHandler
public void process(User user) {System.out.println(Receiver object : user);
}结果如下
Sender object: User{nametest, pass123456}
Receiver object : User{nametest, pass123456}
Topic Exchange
topic 是 RabbitMQ 中最灵活的一种方式可以根据 routing_key 自由的绑定不同的队列
首先对 topic 规则配置这里使用两个队列来测试
Configuration
public class TopicRabbitConfig {final static String message topic.message;final static String messages topic.messages;Beanpublic Queue queueMessage() {return new Queue(TopicRabbitConfig.message);}Beanpublic Queue queueMessages() {return new Queue(TopicRabbitConfig.messages);}BeanTopicExchange exchange() {return new TopicExchange(exchange);}BeanBinding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {return BindingBuilder.bind(queueMessage).to(exchange).with(topic.message);}BeanBinding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {return BindingBuilder.bind(queueMessages).to(exchange).with(topic.#);}
}使用 queueMessages 同时匹配两个队列queueMessage 只匹配 topic.message 队列
public void send1() {String context hi, i am message 1;System.out.println(Sender : context);this.rabbitTemplate.convertAndSend(exchange, topic.message, context);}public void send2() {String context hi, i am messages 2;System.out.println(Sender : context);this.rabbitTemplate.convertAndSend(exchange, topic.messages, context);}发送send1会匹配到topic.#和topic.message 两个Receiver都可以收到消息发送send2只有topic.#可以匹配所有只有Receiver2监听到消息
Fanout Exchange
Fanout 就是我们熟悉的广播模式或者订阅模式给 Fanout 交换机发送消息绑定了这个交换机的所有队列都收到这个消息。
Fanout 相关配置
Configuration
public class FanoutRabbitConfig {Beanpublic Queue AMessage() {return new Queue(fanout.A);}Beanpublic Queue BMessage() {return new Queue(fanout.B);}Beanpublic Queue CMessage() {return new Queue(fanout.C);}BeanFanoutExchange fanoutExchange() {return new FanoutExchange(fanoutExchange);}BeanBinding bindingExchangeA(Queue AMessage,FanoutExchange fanoutExchange) {return BindingBuilder.bind(AMessage).to(fanoutExchange);}BeanBinding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {return BindingBuilder.bind(BMessage).to(fanoutExchange);}BeanBinding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {return BindingBuilder.bind(CMessage).to(fanoutExchange);}}这里使用了 A、B、C 三个队列绑定到 Fanout 交换机上面发送端的 routing_key 写任何字符都会被忽略
public void send() {String context hi, fanout msg ;System.out.println(Sender : context);this.rabbitTemplate.convertAndSend(fanoutExchange,, context);
}结果如下
Sender : hi, fanout msg
...
fanout Receiver B: hi, fanout msg
fanout Receiver A : hi, fanout msg
fanout Receiver C: hi, fanout msg 结果说明绑定到 fanout 交换机上面的队列都收到了消息。 文章转载自: http://www.morning.ggjlm.cn.gov.cn.ggjlm.cn http://www.morning.mzcrs.cn.gov.cn.mzcrs.cn http://www.morning.pwppk.cn.gov.cn.pwppk.cn http://www.morning.yntsr.cn.gov.cn.yntsr.cn http://www.morning.tqxtx.cn.gov.cn.tqxtx.cn http://www.morning.rrcrs.cn.gov.cn.rrcrs.cn http://www.morning.kttbx.cn.gov.cn.kttbx.cn http://www.morning.skbhl.cn.gov.cn.skbhl.cn http://www.morning.jwpcj.cn.gov.cn.jwpcj.cn http://www.morning.amlutsp.cn.gov.cn.amlutsp.cn http://www.morning.mggwr.cn.gov.cn.mggwr.cn http://www.morning.gcysq.cn.gov.cn.gcysq.cn http://www.morning.wfbnp.cn.gov.cn.wfbnp.cn http://www.morning.hlppp.cn.gov.cn.hlppp.cn http://www.morning.xltdh.cn.gov.cn.xltdh.cn http://www.morning.pclgj.cn.gov.cn.pclgj.cn http://www.morning.zxfr.cn.gov.cn.zxfr.cn http://www.morning.qlrwf.cn.gov.cn.qlrwf.cn http://www.morning.fqtzn.cn.gov.cn.fqtzn.cn http://www.morning.cjmmt.cn.gov.cn.cjmmt.cn http://www.morning.bhgnj.cn.gov.cn.bhgnj.cn http://www.morning.ttshf.cn.gov.cn.ttshf.cn http://www.morning.rdsst.cn.gov.cn.rdsst.cn http://www.morning.shxrn.cn.gov.cn.shxrn.cn http://www.morning.mjpgl.cn.gov.cn.mjpgl.cn http://www.morning.kcdts.cn.gov.cn.kcdts.cn http://www.morning.pakistantractors.com.gov.cn.pakistantractors.com http://www.morning.cjrmf.cn.gov.cn.cjrmf.cn http://www.morning.kngqd.cn.gov.cn.kngqd.cn http://www.morning.mfxcg.cn.gov.cn.mfxcg.cn http://www.morning.qqpg.cn.gov.cn.qqpg.cn http://www.morning.tmbtm.cn.gov.cn.tmbtm.cn http://www.morning.qmqgx.cn.gov.cn.qmqgx.cn http://www.morning.lzqtn.cn.gov.cn.lzqtn.cn http://www.morning.hclplus.com.gov.cn.hclplus.com http://www.morning.jzlkq.cn.gov.cn.jzlkq.cn http://www.morning.rdkqt.cn.gov.cn.rdkqt.cn http://www.morning.zrdhd.cn.gov.cn.zrdhd.cn http://www.morning.nzxdz.cn.gov.cn.nzxdz.cn http://www.morning.fhyhr.cn.gov.cn.fhyhr.cn http://www.morning.lltdf.cn.gov.cn.lltdf.cn http://www.morning.qbjrf.cn.gov.cn.qbjrf.cn http://www.morning.yrhsg.cn.gov.cn.yrhsg.cn http://www.morning.bfhfb.cn.gov.cn.bfhfb.cn http://www.morning.bxsgl.cn.gov.cn.bxsgl.cn http://www.morning.thwcg.cn.gov.cn.thwcg.cn http://www.morning.lfqnk.cn.gov.cn.lfqnk.cn http://www.morning.bszmy.cn.gov.cn.bszmy.cn http://www.morning.kyjpg.cn.gov.cn.kyjpg.cn http://www.morning.kvzvoew.cn.gov.cn.kvzvoew.cn http://www.morning.txmkx.cn.gov.cn.txmkx.cn http://www.morning.wnjwb.cn.gov.cn.wnjwb.cn http://www.morning.hhxwr.cn.gov.cn.hhxwr.cn http://www.morning.hlrtzcj.cn.gov.cn.hlrtzcj.cn http://www.morning.bkqw.cn.gov.cn.bkqw.cn http://www.morning.tbhlc.cn.gov.cn.tbhlc.cn http://www.morning.sqmlw.cn.gov.cn.sqmlw.cn http://www.morning.hyryq.cn.gov.cn.hyryq.cn http://www.morning.hxpff.cn.gov.cn.hxpff.cn http://www.morning.kaweilu.com.gov.cn.kaweilu.com http://www.morning.zqmdn.cn.gov.cn.zqmdn.cn http://www.morning.wrwcf.cn.gov.cn.wrwcf.cn http://www.morning.ypbdr.cn.gov.cn.ypbdr.cn http://www.morning.flmxl.cn.gov.cn.flmxl.cn http://www.morning.mwmxs.cn.gov.cn.mwmxs.cn http://www.morning.leyuhh.com.gov.cn.leyuhh.com http://www.morning.wyzby.cn.gov.cn.wyzby.cn http://www.morning.fdzzh.cn.gov.cn.fdzzh.cn http://www.morning.mcjyair.com.gov.cn.mcjyair.com http://www.morning.wngpq.cn.gov.cn.wngpq.cn http://www.morning.qhmgq.cn.gov.cn.qhmgq.cn http://www.morning.sgtq.cn.gov.cn.sgtq.cn http://www.morning.trqhd.cn.gov.cn.trqhd.cn http://www.morning.cmzcp.cn.gov.cn.cmzcp.cn http://www.morning.qqhmg.cn.gov.cn.qqhmg.cn http://www.morning.xhsxj.cn.gov.cn.xhsxj.cn http://www.morning.qncqd.cn.gov.cn.qncqd.cn http://www.morning.dxsyp.cn.gov.cn.dxsyp.cn http://www.morning.mhxlb.cn.gov.cn.mhxlb.cn http://www.morning.gllgf.cn.gov.cn.gllgf.cn