建站成功是怎么回事,音乐网站设计总结,企业员工管理培训课程,厦门市建设工程质量安全协会网站文章目录 前言一、MQ是什么#xff1f;优势劣势 二、MQ的用途1、应用解耦2、异步加速3、削峰填谷4、消息分发 三、RabbitMQ是什么1、AMQP 协议2、RabbitMQ 包含的要素3、RabbitMQ 基础架构 四、实战1、Simple模式(即最简单的收发模式)2、Work Queues 模型3、Publish/Subscribe… 文章目录 前言一、MQ是什么优势劣势 二、MQ的用途1、应用解耦2、异步加速3、削峰填谷4、消息分发 三、RabbitMQ是什么1、AMQP 协议2、RabbitMQ 包含的要素3、RabbitMQ 基础架构 四、实战1、Simple模式(即最简单的收发模式)2、Work Queues 模型3、Publish/Subscribe 模型4、Routing 模型5、Topics 模型 前言
最近秋招开始找工作顺便回顾消息队列并且总结。
一、MQ是什么
消息队列Message Queue是一种在应用程序之间传递消息的通信模式。它通过在发送者和接收者之间建立一个消息队列来实现异步通信和解耦。
在消息队列模式中发送者Producer将消息发送到一个中间件Message Broker中的消息队列而接收者Consumer则从该队列中接收和处理消息。这种方式使得发送者和接收者可以独立地进行处理而无需直接交互从而实现解耦。发送者和接收者只需要知道如何与消息队列进行通信而不需要知道彼此的存在。
优势
1. 异步通信发送者将消息放入队列后即可继续进行其他操作无需等待接收者的响应。接收者可以在合适的时候从队列中获取消息进行处理实现了异步通信模式。
2. 解耦发送者和接收者之间通过消息队列进行通信彼此之间不直接耦合。发送者只需将消息发送到队列中而不需要知道消息是如何被处理的。接收者只需从队列中获取消息进行处理而不需要知道消息的来源。
3. 可靠性传输消息队列通常提供持久化机制确保消息在发送和接收过程中不会丢失。即使接收者暂时不可用消息也会在队列中等待直到接收者准备好接收为止。
4. 扩展性消息队列可以支持多个发送者和接收者实现系统的扩展性和高并发处理能力。
5. 缓冲和削峰填谷通过将消息缓存到队列中可以平衡发送者和接收者之间的处理速度差异从而避免系统过载。
消息队列在分布式系统、微服务架构、异步任务处理、事件驱动架构等场景中被广泛应用。一些常见的消息队列系统包括RabbitMQ、Apache Kafka、ActiveMQ、Amazon SQS等。它们提供了丰富的功能和配置选项可以根据应用需求选择合适的消息队列实现。
劣势
系统可用性降低 系统引入的外部依赖越多系统稳定性越差。一旦 MQ 宕机就会对业务造成影响。如何保证MQ的高可用 系统复杂度提高 MQ 的加入大大增加了系统的复杂度以前系统间是同步的远程调用现在是通过 MQ 进行异步调用。如何保证消息没有被重复消费怎么处理消息丢失情况那么保证消息传递的顺序性 一致性问题 A 系统处理完业务通过 MQ 给B、C、D三个系统发消息数据如果 B 系统、C 系统处理成功D 系统处理失败。如何保证消息数据处理的一致性
二、MQ的用途
四个用途 应用解耦提高系统容错性和可维护性 异步提速提升用户体验和系统吞吐量 削峰填谷提高系统稳定性 消息分发提高系统灵活性
1、应用解耦
应用解耦是指通过使用消息队列等中间件来降低应用程序之间的直接依赖性从而实现独立开发、部署和升级的能力。通过解耦每个应用程序可以通过消息队列发送和接收消息而不需要了解其他应用程序的具体实现细节。通过应用解耦可以实现系统的松耦合架构提高系统的可维护性、扩展性和容错性。 没有使用MQ
系统的耦合性越高容错性就越低可维护性就越低。使用 MQ 使得应用间解耦提升容错性和可维护性。
2、异步加速
异步提速是指通过将耗时的操作转化为异步执行从而提高系统的响应速度和吞吐量。通过异步处理应用程序可以在等待某个操作完成的同时继续执行其他任务而不需要阻塞等待结果返回。 例如当一个应用程序需要进行网络请求并等待响应时如果采用同步方式应用程序会被阻塞直到响应返回才能继续执行其他任务。而通过异步方式应用程序可以继续执行其他任务不需要等待网络请求的结果返回。这样可以提高系统的响应速度使用户获得更好的体验。 没有使用MQ 一个下单操作耗时20 300 300 300 920ms 用户点击完下单按钮后需要等待920ms才能得到下单响应太慢 使用MQ
用户点击完下单按钮后只需等待25ms就能得到下单响应 (20 5 25ms)。 提升用户体验和系统吞吐量单位时间内处理请求的数目。不需要的等待完成
3、削峰填谷
削峰填谷是一种通过平衡系统负载减轻峰值压力和填充低谷时的资源利用率的技术。它的目标是在系统负载波动较大的情况下合理利用资源确保系统的稳定性和高效性。 没有使用MQ
使用MQ 使用了 MQ 之后限制消费消息的速度为1000这样一来高峰期产生的数据势必会被积压在 MQ中高峰就被“削”掉了但是因为消息积压在高峰期过后的一段时间内消费消息的速度还是会维持在1000直到消费完积压的消息这就叫做填谷。简单来说就是慢慢分发 使用MQ后可以提高系统稳定性。
4、消息分发
消息分发是一种将消息从发送者传递到接收者的机制它在异步系统和事件驱动架构中起着重要的作用。消息分发可以实现解耦和灵活性允许不同组件或模块之间通过消息进行通信从而实现系统的松耦合和可扩展性。 下面是消息分发的一些关键概念和示例
发布者Publisher发布者是消息分发系统中的发送者它负责生成并发布消息。发布者将消息发送到消息分发系统而不需要知道消息的具体接收者。
订阅者Subscriber订阅者是消息分发系统中的接收者它通过订阅特定的消息或消息类型来表明自己对消息的兴趣。当有匹配的消息到达时消息分发系统会将消息传递给订阅者。
主题Topic主题是消息分发系统中用于分类和组织消息的标识符或名称。发布者可以将消息发布到特定的主题而订阅者可以选择订阅感兴趣的主题。通过主题可以实现消息的细粒度过滤和选择性订阅。
三、RabbitMQ是什么
RabbitMQ是使用Erlang语言开发的开源消息队列系统基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由包括点对点和发布/订阅、可靠性、安全。AMQP协议更多用在企业系统内对数据一致性、稳定性和可靠性要求很高的场景对性能和吞吐量的要求可能比较低了。
1、AMQP 协议
AMQP即 Advanced Message Queuing Protocol高级消息队列协议是应用层协议的一个开放标准为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息并不受客户端/中间件不同产品不同的开发语言等条件的限制。2006年AMQP规范发布。类比HTTP。 AMQP三层协议 Module Layer协议最高层主要定义了一些客户端调用的命令客户端可以用这些命令实现自己的业务逻辑。 Session Layer中间层主要负责客户端命会发送给服务器再将服务端应答返回客户端提供可靠性同步机制和错误处理。 TransportLayer最底层主要传输二进制数据流提供帧的处理、信道服用、错误检测和数据表示等。
AMQP组件 交换器(Exchange)消息代理服务器中用于把消息路由到队列的组件。 队列queue用来存储消息的数据结构位于硬盘或内存中。 绑定Binding一套规则告知交换器消息应该将消息投递给哪个队列。
2、RabbitMQ 包含的要素
生产者消息队列创建者发送消息到MQ 消费者连接到RabbitMQ订阅到队列上消费消息持续订阅和单条订阅 消息包含有效载荷和标签有效载荷指要传输的数据标签描述了有效载荷并且RabbitMQ用它来决定谁获得消息消费者只能拿到有效载荷并不知道生产者是谁
3、RabbitMQ 基础架构 Broker接收和分发消息的应用RabbitMQ Server就是 Message Broker Virtual host出于多租户和安全因素设计的把 AMQP 的基本组件划分到一个虚拟的分组中类似于网络中的 namespace概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时可以划分出多个vhost每个用户在自己的 vhost创建 exchangequeue 等 Connectionpublisherconsumer 和 broker 之间的 TCP 连接 ChannelChannel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销。是生产者、消费者与RabbitMQ通信的渠道生产者publish或是消费者subscribe 一个队列都是通过信道来通信的。 信道是建立在TCP连接上的虚拟连接就是说RabbitMQ在一条TCP上建立成百上千个信道来达到多个线程处理这个TCP被多个线程共享每个线程对应一个信道信道在RabbitMQ都有一个唯一的ID保证了信道私有性对应上唯一的线程使用。 Exchange交换机message 到达 broker 的第一站**根据分发规则匹配查询表中的 routing key分发消息到queue中去。生产者将消息发送到交换器有交换器将消息路由到一个或者多个队中。当路由不到时或返回给生产者或直接丟弃。 Queue消息最终被送到这里等待 consumer 取走 Bindingexchange 和 queue 之间的虚拟连接binding 中可以包含 routing key。Binding信息被保存到 exchange 中的查询表中用于 message 的分发依据
四、实战
RabbitMQ 提供了 6 种工作模式简单模式、work queues、Publish/Subscribe 发布与订阅模式、Routing 路由模式、Topics 主题模式、RPC 远程调用模式远程调用不太算 MQ暂不作介绍。
1、Simple模式(即最简单的收发模式)
消息的消费者监听消息队列如果队列中有消息就消费掉消息被拿走后自动从队列中删除(隐患消息可能没有被消费者正确处理已经从队列中消失了造成消息的丢失这里可以设置成手动的ack但如果设置成手动ack处理完后要及时发送ack消息给队列否则会造成内存溢出)。 消费者
package mainimport (loggithub.com/streadway/amqp
)func main() {// 连接到RabbitMQ服务器conn, err : amqp.Dial(amqp://guest:guestlocalhost:5672/)if err ! nil {log.Fatalf(无法连接到RabbitMQ服务器%s, err)}defer conn.Close()// 创建一个通道ch, err : conn.Channel()if err ! nil {log.Fatalf(无法创建通道%s, err)}defer ch.Close()// 声明一个队列queue, err : ch.QueueDeclare(hello, // 队列名false, // 持久性false, // 自动删除false, // 独占false, // 等待服务器确认nil, // 参数)if err ! nil {log.Fatalf(无法声明队列%s, err)}// 消费消息msgs, err : ch.Consume(queue.Name, // 队列名, // 消费者标签true, // 自动确认false, // 独占false, // 不等待服务器确认false, // 参数)if err ! nil {log.Fatalf(无法注册消费者%s, err)}// 处理接收到的消息for msg : range msgs {log.Printf(接收到消息%s, msg.Body)}
}上述代码首先建立了与RabbitMQ服务器的连接然后创建了一个通道和一个名为heo的队列。接下来通过ch.Consume函数注册一个消费者用于从队列中接收消息。在fo循环中我们处理接收到的消息这里只是简单地打印出来。 生产者
package mainimport (loggithub.com/streadway/amqp
)func main() {// 连接到RabbitMQ服务器conn, err : amqp.Dial(amqp://guest:guestlocalhost:5672/)if err ! nil {log.Fatalf(无法连接到RabbitMQ服务器%s, err)}defer conn.Close()// 创建一个通道ch, err : conn.Channel()if err ! nil {log.Fatalf(无法创建通道%s, err)}defer ch.Close()// 声明一个队列queue, err : ch.QueueDeclare(hello, // 队列名false, // 持久性false, // 自动删除false, // 独占false, // 等待服务器确认nil, // 参数)if err ! nil {log.Fatalf(无法声明队列%s, err)}// 发送消息body : Hello, RabbitMQ!err ch.Publish(, // 交换机queue.Name, // 队列名false, // 必须发送到队列false, // 不等待服务器确认amqp.Publishing{ContentType: text/plain,Body: []byte(body),},)if err ! nil {log.Fatalf(无法发送消息%s, err)}log.Printf(消息已发送%s, body)
}上述代码与消费者程序类似首先建立了与RabbitMQ服务器的连接然后创建了一个通道和一个名为hello的队列。接下来通过ch.Publishi函数向队列发送一条消息。
2、Work Queues 模型
消息产生者将消息放入队列消费者可以有多个消费者1消费者2同时监听同一个队列消息被消费。C1 C2共同争抢当前的消息队列内容谁先拿到谁负责消费消息(隐患高并发情况下默认会产生某一个消息被多个消费者共同使用可以设置一个开关[syncronize]保证一条消息只能被一个消费者使用)。 让多个消费者绑定到一个队列共同消费队列中的消息。队列中的消息一旦消费就会消失因此任务是不会被重复执行的。 消费者
package mainimport (fmtlogmath/randtimegithub.com/streadway/amqp
)func main() {// 连接到RabbitMQ服务器conn, err : amqp.Dial(amqp://guest:guestlocalhost:5672/)if err ! nil {log.Fatalf(无法连接到RabbitMQ服务器%s, err)}defer conn.Close()// 创建一个通道ch, err : conn.Channel()if err ! nil {log.Fatalf(无法创建通道%s, err)}defer ch.Close()// 启动多个消费者并行处理任务for i : 1; i 3; i {go startConsumer(i, ch)}// 阻塞主进程select {}
}func generateTask(id int) string {time.Sleep(time.Duration(rand.Intn(3)) * time.Second)return fmt.Sprintf(Task %d, id)
}func startConsumer(id int, ch *amqp.Channel) {// 声明一个队列queue, err : ch.QueueDeclare(tasks_queue, // 队列名true, // 持久性false, // 自动删除false, // 独占false, // 等待服务器确认nil, // 参数)if err ! nil {log.Fatalf(无法声明队列%s, err)}// 消费任务msgs, err : ch.Consume(queue.Name, // 队列名, // 消费者标签false, // 手动确认false, // 不等待服务器确认false, // 不使用内置的参数false, // 参数nil, // 参数)if err ! nil {log.Fatalf(无法注册消费者%s, err)}for msg : range msgs {task : string(msg.Body)log.Printf(消费者 %d 接收到任务%s, id, task)log.Printf(消费者 %d 完成任务%s, id, task)// 手动确认任务已处理msg.Ack(false)}
}
利用协城启动多个消费者进行消费。 结果如下
3、Publish/Subscribe 模型
每个消费者监听自己的队列。 生产者将消息发给broker由交换机将消息转发到绑定此交换机的每个队列每个绑定交换机的队列都将接收到消息。 在RabbitMQ的Publish/Subscribe模型中生产者将消息发送到交换机交换机负责将消息广播给所有绑定到它上面的队列。消费者创建队列并将其绑定到交换机上从而接收交换机发送的消息。这样一个消息可以被多个消费者接收。
在订阅模型中多了一个 Exchange 角色而且过程略有变化
P生产者也就是要发送消息的程序但是不再发送到队列中而是发给X交换机 C消费者消息的接收者会一直等待消息到来 Queue消息队列接收消息、缓存消息 Exchange交换机X。一方面接收生产者发送的消息。另一方面知道如何处理消息例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作取决于Exchange的类型。Exchange有常见以下3种类型 Fanout广播将消息交给所有绑定到交换机的队列 Direct定向把消息交给符合指定routing key 的队列 Topic通配符把消息交给符合routing pattern路由模式 的队列 Exchange交换机只负责转发消息不具备存储消息的能力因此如果没有任何队列与 Exchange 绑定或者没有符合路由规则的队列那么消息会丢失
package mainimport (loggithub.com/streadway/amqp
)func failOnError(err error, msg string) {if err ! nil {log.Fatalf(%s: %s, msg, err)}
}func main() {// 连接到RabbitMQ服务器conn, err : amqp.Dial(amqp://guest:guestlocalhost:5672/)failOnError(err, Failed to connect to RabbitMQ)defer conn.Close()// 创建一个通道ch, err : conn.Channel()failOnError(err, Failed to open a channel)defer ch.Close()// 声明一个交换机err ch.ExchangeDeclare(logs, // 交换机名称fanout, // 交换机类型true, // 是否持久化false, // 是否自动删除false, // 是否内部使用false, // 是否等待服务器响应nil, // 其他属性)failOnError(err, Failed to declare an exchange)// 发布消息到交换机body : Hello, RabbitMQ!err ch.Publish(logs, // 交换机名称, // 路由键留空表示广播给所有队列false, // 是否等待服务器响应false, // 其他属性amqp.Publishing{ContentType: text/plain,Body: []byte(body),},)failOnError(err, Failed to publish a message)log.Printf(Message sent: %s, body)
}连接到RabbitMQ服务器声明了一个名为logs的交换机并通过调用ch.Publish方法将消息发布到交换机上。 在示例代码中通过指定交换机名称为logs路由键为空字符串消息将被广播给所有绑定到该交换机的队列。
package mainimport (fmtloggithub.com/streadway/amqp
)func failOnError(err error, msg string) {if err ! nil {log.Fatalf(%s: %s, msg, err)}
}func main() {// 连接到RabbitMQ服务器conn, err : amqp.Dial(amqp://guest:guestlocalhost:5672/)failOnError(err, Failed to connect to RabbitMQ)defer conn.Close()// 创建一个通道ch, err : conn.Channel()failOnError(err, Failed to open a channel)defer ch.Close()// 声明一个交换机err ch.ExchangeDeclare(logs, // 交换机名称fanout, // 交换机类型true, // 是否持久化false, // 是否自动删除false, // 是否内部使用false, // 是否等待服务器响应nil, // 其他属性)failOnError(err, Failed to declare an exchange)// 声明一个临时队列q, err : ch.QueueDeclare(, // 队列名称留空表示由RabbitMQ自动生成false, // 是否持久化false, // 是否自动删除当没有任何消费者连接时true, // 是否排他队列仅限于当前连接false, // 是否等待服务器响应nil, // 其他属性)failOnError(err, Failed to declare a queue)// 将队列绑定到交换机上err ch.QueueBind(q.Name, // 队列名称, // 路由键留空表示接收交换机的所有消息logs, // 交换机名称false, // 是否等待服务器响应nil, // 其他属性)failOnError(err, Failed to bind a queue)// 订阅消息msgs, err : ch.Consume(q.Name, // 队列名称, // 消费者标识符留空表示由RabbitMQ自动生成true, // 是否自动应答false, // 是否独占模式仅限于当前连接false, // 是否等待服务器响应false, // 其他属性nil, // 其他属性)failOnError(err, Failed to register a consumer)// 接收消息的goroutinego func() {for d : range msgs {log.Printf(Received a message: %s, d.Body)}}()log.Printf(Waiting for messages. To exit press CTRLC)-make(chan struct{}) // 阻塞主goroutine
}它连接到RabbitMQ服务器声明一个fanout类型的交换机Exchange创建一个临时队列将队列绑定到交换机上并订阅消息。
在示例代码中创建的交换机名为logs交换机类型为fanout表示消息将被广播给所有绑定到该交换机的队列。
消费者创建了一个临时队列并将其绑定到交换机上这样交换机就会将消息发送到该队列中。
4、Routing 模型
在fanout模式中一条消息会被所有订阅的队列都消费。但是在某些场景下我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
在Direct模型下
1、队列与交换机的绑定不能是任意绑定了而是要指定一个RoutingKey路由key 2、消息的发送方在 向 Exchange发送消息时也必须指定消息的 RoutingKey。 3、Exchange不再把消息交给每一个绑定的队列而是根据消息的Routing Key进行判断只有队列的Routingkey与消息的 Routing key完全一致才会接收到消息
消息生产者将消息发送给交换机按照路由判断路由是字符串(info)当前产生的消息携带路由字符(对象的方法)交换机根据路由的key只能匹配上路由key对应的消息队列对应的消费者才能消费消息。 生产者
package mainimport (logosstringsgithub.com/streadway/amqp
)func failOnError(err error, msg string) {if err ! nil {log.Fatalf(%s: %s, msg, err)}
}func main() {// 连接到RabbitMQ服务器conn, err : amqp.Dial(amqp://guest:guestlocalhost:5672/)failOnError(err, Failed to connect to RabbitMQ)defer conn.Close()// 创建一个通道ch, err : conn.Channel()failOnError(err, Failed to open a channel)defer ch.Close()// 声明一个交换机err ch.ExchangeDeclare(logs_direct, // 交换机名称direct, // 交换机类型true, // 是否持久化false, // 是否自动删除false, // 是否内部使用false, // 是否等待服务器响应nil, // 其他属性)failOnError(err, Failed to declare an exchange)// 从命令行参数获取要发送的路由键和消息内容if len(os.Args) 3 {log.Fatalf(Usage: %s [info] [message], os.Args[0])}severity : os.Args[1]message : strings.Join(os.Args[2:], )// 发布消息到交换机并指定路由键err ch.Publish(logs_direct, // 交换机名称severity, // 路由键false, // 是否等待服务器响应false, // 是否立即将消息写入磁盘amqp.Publishing{ContentType: text/plain,Body: []byte(message),},)failOnError(err, Failed to publish a message)log.Printf(Sent message: %s, message)
}它连接到RabbitMQ服务器声明一个direct类型的交换机Exchange并通过指定路由键将消息发布到交换机。
在示例代码中创建的交换机名为logs_direct交换机类型为direct表示消息将根据指定的路由键进行选择性地发送给队列。
生产者从命令行参数获取要发送的路由键和消息内容。路由键可以是任意字符串用于标识消息的类型或者级别。消息内容可以是任意文本。 消费者
package mainimport (fmtlogosgithub.com/streadway/amqp
)func failOnError(err error, msg string) {if err ! nil {log.Fatalf(%s: %s, msg, err)}
}func main() {// 连接到RabbitMQ服务器conn, err : amqp.Dial(amqp://guest:guestlocalhost:5672/)failOnError(err, Failed to connect to RabbitMQ)defer conn.Close()// 创建一个通道ch, err : conn.Channel()failOnError(err, Failed to open a channel)defer ch.Close()// 声明一个交换机err ch.ExchangeDeclare(logs_direct, // 交换机名称direct, // 交换机类型true, // 是否持久化false, // 是否自动删除false, // 是否内部使用false, // 是否等待服务器响应nil, // 其他属性)failOnError(err, Failed to declare an exchange)// 声明一个临时队列q, err : ch.QueueDeclare(, // 队列名称留空表示由RabbitMQ自动生成false, // 是否持久化false, // 是否自动删除当没有任何消费者连接时true, // 是否排他队列仅限于当前连接false, // 是否等待服务器响应nil, // 其他属性)failOnError(err, Failed to declare a queue)// 从命令行参数获取要绑定的路由键if len(os.Args) 2 {log.Fatalf(Usage: %s [info] [warning] [error], os.Args[0])}severities : os.Args[1:]// 将队列绑定到交换机上并指定要接收的路由键for _, severity : range severities {err ch.QueueBind(q.Name, // 队列名称severity, // 路由键logs_direct, // 交换机名称false, // 是否等待服务器响应nil, // 其他属性)failOnError(err, Failed to bind a queue)}// 订阅消息msgs, err : ch.Consume(q.Name, // 队列名称, // 消费者标识符留空表示由RabbitMQ自动生成true, // 是否自动应答false, // 是否独占模式仅限于当前连接false, // 是否等待服务器响应false, // 其他属性nil, // 其他属性)failOnError(err, Failed to register a consumer)// 接收消息的goroutinego func() {for d : range msgs {log.Printf(Received a message: %s, d.Body)}}()log.Printf(Waiting for messages. To exit press CTRLC)-make(chan struct{}) // 阻塞主goroutine
}上述代码实现了一个Routing模型的消费者。它连接到RabbitMQ服务器声明一个direct类型的交换机Exchange创建一个临时队列并将队列绑定到交换机上同时指定要接收的路由键。
在RabbitMQ的Routing模型中生产者将消息发送到交换机并在发送消息时指定一个路由键routing key。交换机根据路由键将消息发送给与之绑定的队列。消费者创建队列并将其绑定到交换机上并通过指定要接收的路由键来选择性地接收消息。
在示例代码中创建的交换机名为logs_direct交换机类型为direct表示消息将根据指定的路由键进行选择性地发送给队列。
消费者创建了一个临时队列并通过循环将该队列绑定到交换机上并指定要接收的路由键。路由键可以是任意字符串用于标识消息的类型或者级别。在示例中我们通过命令行参数传入要绑定的路由键。
最后消费者通过调用ch.Consume方法订阅消息。该方法返回一个消息通道msgs消费者可以从该通道接收到消息。在示例中我们使用一个goroutine来异步接收消息并在收到消息时打印出来。
5、Topics 模型 Topic类型的Exchange与Direct相比都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符
这种模型Routingkey 一般都是由一个或多个单词组成多个单词之间以”.”分割例如 item.insert
统配符 * 匹配不多不少恰好1个词 # 匹配一个或多个词 如: fan.# 匹配 fan.one.two 或者 fan.one 等 fan.* 只能匹配 fan.one 生产者
func failOnError(err error, msg string) {if err ! nil {log.Fatalf(%s: %s, msg, err)}
}func main() {// 连接到RabbitMQ服务器conn, err : amqp.Dial(amqp://guest:guestlocalhost:5672/)failOnError(err, Failed to connect to RabbitMQ)defer conn.Close()// 创建一个通道ch, err : conn.Channel()failOnError(err, Failed to open a channel)defer ch.Close()// 声明一个交换机err ch.ExchangeDeclare(logs_topic, // 交换机名称topic, // 交换机类型true, // 是否持久化false, // 是否自动删除false, // 是否内部使用false, // 是否等待服务器响应nil, // 其他属性)failOnError(err, Failed to declare an exchange)// 定义要发送的消息的路由键和内容routingKey : example.key.dasmessage : Hello, RabbitMQ!// 发布消息到交换机并指定路由键err ch.Publish(logs_topic, // 交换机名称routingKey, // 路由键false, // 是否等待服务器响应false, // 是否立即发送amqp.Publishing{ContentType: text/plain,Body: []byte(message),},)failOnError(err, Failed to publish a message)log.Printf(Sent message: %s, message)
}消费者 func failOnError(err error, msg string) {if err ! nil {log.Fatalf(%s: %s, msg, err)}
}func main() {// 连接到RabbitMQ服务器conn, err : amqp.Dial(amqp://guest:guestlocalhost:5672/)failOnError(err, Failed to connect to RabbitMQ)defer conn.Close()// 创建一个通道ch, err : conn.Channel()failOnError(err, Failed to open a channel)defer ch.Close()// 声明一个交换机err ch.ExchangeDeclare(logs_topic, // 交换机名称topic, // 交换机类型true, // 是否持久化false, // 是否自动删除false, // 是否内部使用false, // 是否等待服务器响应nil, // 其他属性)failOnError(err, Failed to declare an exchange)// 声明一个临时队列q, err : ch.QueueDeclare(, // 队列名称留空表示由RabbitMQ自动生成false, // 是否持久化false, // 是否自动删除当没有任何消费者连接时true, // 是否排他队列仅限于当前连接false, // 是否等待服务器响应nil, // 其他属性)failOnError(err, Failed to declare a queue)// 将队列绑定到交换机上并指定要接收的路由键err ch.QueueBind(q.Name, // 队列名称example.#, // 路由键可以使用通配符*匹配多个单词logs_topic, // 交换机名称false, // 是否等待服务器响应nil, // 其他属性)failOnError(err, Failed to bind a queue)// 创建一个消费者通道msgs, err : ch.Consume(q.Name, // 队列名称, // 消费者标识符留空表示由RabbitMQ自动生成true, // 是否自动应答false, // 是否排他消费者false, // 是否阻塞false, // 是否等待服务器响应nil, // 其他属性)failOnError(err, Failed to register a consumer)// 接收和处理消息forever : make(chan bool)go func() {for d : range msgs {log.Printf(Received a message: %s, d.Body)}}()log.Printf(Waiting for messages...)-forever
}
文章转载自: http://www.morning.txmlg.cn.gov.cn.txmlg.cn http://www.morning.xhsxj.cn.gov.cn.xhsxj.cn http://www.morning.kxxld.cn.gov.cn.kxxld.cn http://www.morning.nafdmx.cn.gov.cn.nafdmx.cn http://www.morning.xrpwk.cn.gov.cn.xrpwk.cn http://www.morning.ckdgj.cn.gov.cn.ckdgj.cn http://www.morning.snnwx.cn.gov.cn.snnwx.cn http://www.morning.txmkx.cn.gov.cn.txmkx.cn http://www.morning.tbkqs.cn.gov.cn.tbkqs.cn http://www.morning.hous-e.com.gov.cn.hous-e.com http://www.morning.kdrjd.cn.gov.cn.kdrjd.cn http://www.morning.mnclk.cn.gov.cn.mnclk.cn http://www.morning.kbynw.cn.gov.cn.kbynw.cn http://www.morning.ybyln.cn.gov.cn.ybyln.cn http://www.morning.sgqw.cn.gov.cn.sgqw.cn http://www.morning.qqhfc.cn.gov.cn.qqhfc.cn http://www.morning.ftznb.cn.gov.cn.ftznb.cn http://www.morning.qclmz.cn.gov.cn.qclmz.cn http://www.morning.nsrtvu.com.gov.cn.nsrtvu.com http://www.morning.lsgjf.cn.gov.cn.lsgjf.cn http://www.morning.ygth.cn.gov.cn.ygth.cn http://www.morning.gjxr.cn.gov.cn.gjxr.cn http://www.morning.qfrsm.cn.gov.cn.qfrsm.cn http://www.morning.xrmwc.cn.gov.cn.xrmwc.cn http://www.morning.mnmrx.cn.gov.cn.mnmrx.cn http://www.morning.thnpj.cn.gov.cn.thnpj.cn http://www.morning.supera.com.cn.gov.cn.supera.com.cn http://www.morning.pqhfx.cn.gov.cn.pqhfx.cn http://www.morning.rlnm.cn.gov.cn.rlnm.cn http://www.morning.rttp.cn.gov.cn.rttp.cn http://www.morning.xyrss.cn.gov.cn.xyrss.cn http://www.morning.lwdzt.cn.gov.cn.lwdzt.cn http://www.morning.jjzbx.cn.gov.cn.jjzbx.cn http://www.morning.mfzyn.cn.gov.cn.mfzyn.cn http://www.morning.gyjld.cn.gov.cn.gyjld.cn http://www.morning.dydqh.cn.gov.cn.dydqh.cn http://www.morning.pqwjh.cn.gov.cn.pqwjh.cn http://www.morning.kwksj.cn.gov.cn.kwksj.cn http://www.morning.nhlnh.cn.gov.cn.nhlnh.cn http://www.morning.zjrnq.cn.gov.cn.zjrnq.cn http://www.morning.yrmpr.cn.gov.cn.yrmpr.cn http://www.morning.ltkms.cn.gov.cn.ltkms.cn http://www.morning.jhgxh.cn.gov.cn.jhgxh.cn http://www.morning.fhddr.cn.gov.cn.fhddr.cn http://www.morning.wqfj.cn.gov.cn.wqfj.cn http://www.morning.kqgqy.cn.gov.cn.kqgqy.cn http://www.morning.djbhz.cn.gov.cn.djbhz.cn http://www.morning.rlksq.cn.gov.cn.rlksq.cn http://www.morning.pmghz.cn.gov.cn.pmghz.cn http://www.morning.vjwkb.cn.gov.cn.vjwkb.cn http://www.morning.fmkbk.cn.gov.cn.fmkbk.cn http://www.morning.zrnph.cn.gov.cn.zrnph.cn http://www.morning.dkslm.cn.gov.cn.dkslm.cn http://www.morning.kfjnx.cn.gov.cn.kfjnx.cn http://www.morning.ndzhl.cn.gov.cn.ndzhl.cn http://www.morning.rytps.cn.gov.cn.rytps.cn http://www.morning.tkcct.cn.gov.cn.tkcct.cn http://www.morning.vjwkb.cn.gov.cn.vjwkb.cn http://www.morning.qqnjr.cn.gov.cn.qqnjr.cn http://www.morning.kllzy.com.gov.cn.kllzy.com http://www.morning.elmtw.cn.gov.cn.elmtw.cn http://www.morning.kjrp.cn.gov.cn.kjrp.cn http://www.morning.qgzmz.cn.gov.cn.qgzmz.cn http://www.morning.hsjfs.cn.gov.cn.hsjfs.cn http://www.morning.glwyn.cn.gov.cn.glwyn.cn http://www.morning.fgxnb.cn.gov.cn.fgxnb.cn http://www.morning.gdgylp.com.gov.cn.gdgylp.com http://www.morning.ggnjq.cn.gov.cn.ggnjq.cn http://www.morning.yrsg.cn.gov.cn.yrsg.cn http://www.morning.zryf.cn.gov.cn.zryf.cn http://www.morning.jgcyn.cn.gov.cn.jgcyn.cn http://www.morning.rhsr.cn.gov.cn.rhsr.cn http://www.morning.bpwfr.cn.gov.cn.bpwfr.cn http://www.morning.xrpjr.cn.gov.cn.xrpjr.cn http://www.morning.nlkhr.cn.gov.cn.nlkhr.cn http://www.morning.jgmlb.cn.gov.cn.jgmlb.cn http://www.morning.lbssg.cn.gov.cn.lbssg.cn http://www.morning.fnbtn.cn.gov.cn.fnbtn.cn http://www.morning.sgbsr.cn.gov.cn.sgbsr.cn http://www.morning.kqbzy.cn.gov.cn.kqbzy.cn