当前位置: 首页 > news >正文

杭州公司建设网站wordpress 查询页面

杭州公司建设网站,wordpress 查询页面,推广普通话手抄报简单又好看,html5网站建设平台❤ 作者主页#xff1a;欢迎来到我的技术博客#x1f60e; ❀ 个人介绍#xff1a;大家好#xff0c;本人热衷于Java后端开发#xff0c;欢迎来交流学习哦#xff01;(#xffe3;▽#xffe3;)~* #x1f34a; 如果文章对您有帮助#xff0c;记得关注、点赞、收藏、… ❤ 作者主页欢迎来到我的技术博客 ❀ 个人介绍大家好本人热衷于Java后端开发欢迎来交流学习哦(▽)~* 如果文章对您有帮助记得关注、点赞、收藏、评论⭐️⭐️⭐️ 您的支持将是我创作的动力让我们一起加油进步吧 文章目录 1、 MQ概念1.1 MQ 介绍1.2 MQ应用场景1.2.1异步解耦1.2.2 削峰填谷1.2.3 消息分发 1.3 常见的消息中间件1.4 RabbitMQ简介1.5 RabbitMQ中的核心概念 2、RabbitMQ的安装与配置3、RabbitMQ的快速入门4、RabbitMQ的常见工作模式4.1 Work queues工作队列模式4.1.1 模式说明4.1.2 实现步骤4.1.3 简单问题说明4.1.4 工作模式队列-消息公平分发(fair dispatch) 4.2 Pub/Sub 订阅模式4.2.1 模式说明4.2.2 实现步骤 4.3 Routing 路由模式4.3.1 模式说明4.3.2 实现步骤 4.4 Topic 模式4.4.1 模式介绍4.2.2 实现步骤 4.5 工作模式总结 5、Springboot环境快速集成RabbitMQ5.1 Hello World 简单模式5.2 Work 模式5.3 Pub/Sub模式5.4 Routing模式5.5 Topic模式 6、RabbitMQ实战案例6.1 案例一 退款加积分6.2 案例二 秒杀下单操作 7、 RabbitMQ高频面试题7.1 RabbitMQ如果出现消息重复消费怎么解决7.2 RabbitMQ中的死信队列7.3 RabbitMQ 怎么实现消息可靠性7.3.1 生产者投递可靠性7.3.2 消费者投递可靠性 7.4 RabbitMQ 如何实现延迟队列 1、 MQ概念 1.1 MQ 介绍 MQMessage Queue消息队列是基础数据结构中“先进先出”的一种数据结构。一般用来解决应用解耦异步消息流量削峰等问题实现高性能高可用可伸缩和最终一致性架构。 1.2 MQ应用场景 MQ的优势 1.2.1异步解耦 以电商应用为例应用中有订单系统、库存系统、物流系统、支付系统。用户创建订单后如果耦合 调用库存系统、物流系统、支付系统任何一个子系统出了故障都会造成下单操作异常。当转变成基于消息队列的方式后系统间调用的问题会减少很多比如物流系统因为发生故障需要几分钟来修复。在 这几分钟的时间里物流系统要处理的内存被缓存在消息队列中用户的下单操作可以正常完成。当物流 系统恢复后继续处理订单信息即可中单用户感受不到物流系统的故障提升系统的可用性。 1.2.2 削峰填谷 举个例子如果订单系统最多能处理一万次订单这个处理能力应付正常时段的下单时绰绰有余正 常时段我们下单一秒后就能返回结果。但是在高峰期如果有两万次下单操作系统是处理不了的只能限 制订单超过一万后不允许用户下单。使用消息队列做缓冲我们可以取消这个限制把一秒内下的订单分 散成一段时间来处理这时有些用户可能在下单十几秒后才能收到下单成功的操作但是比不能下单的体 验要好。 1.2.3 消息分发 在实际开发中一个系统的数据有的时候需要分发个不同的系统中 拿电商举例在双11的时候有很多会场每一个会场可能都需要用到一个商品的数据那么我们需要把数据分发到不同的会场中假设有加了一个会场我们还需要把数据分发给新的会场。 MQ的劣势 系统可用性降低 系统引入的外部依赖越多系统稳定性越差。一旦 MQ 宕机就会对业务造成影响。如何保证MQ的高可用 系统复杂度提高 MQ 的加入大大增加了系统的复杂度以前系统间是同步的远程调用现在是通过 MQ 进行异步调用。如何保证消息没有被重复消费怎么处理消息丢失情况那么保证消息传递的顺序性 一致性问题 A 系统处理完业务通过 MQ 给B、C、D三个系统发消息数据如果 B 系统、C 系统处理成功D 系统处理失败。如何保证消息数据处理的一致性 1.3 常见的消息中间件 目前市场上主流的消息中间件主要有ActivitiMQ、RabbitMQ、RocketMQ、kafka ActivitiMQ ActiveMQ是Apache出品,比较老的一个开源的消息中间件, 是一个完全支持JMS规范的消息中间件.API丰富,以前在中小企业应用广泛。 MQ衡量的指标:服务性能,数据存储,集群架构。 RabbitMQ: RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。 AMQP Advanced Message Queue高级消息队列协议。它是应用层协议的一个开放标准为面向消息的中间件设计基于此协议的客户端与消息中间件可传递消息并不受产品、开发语言等条件的限制。 RabbitMQ 最初起源于金融系统用于在分布式系统中存储转发消息在易用性、扩展性、高可用性等方面表现不俗。 对数据的一致性,稳定性和可靠性要求比较高的场景。 RocketMQ: RocketMQ 是阿里巴巴在 2012 年开源的分布式消息中间件目前已经捐赠给 Apache 软件基金会并于 2017 年 9 月 25 日成为 Apache 的顶级项目。作为经历过多次阿里巴巴双十一这种“超级工程”的洗礼并有稳定出色表现的国产中间件以其高性能、低延时和高可靠等特性近年来已经也被越来越多的国内企业使用。 Kafka: Kafka是由Apache软件基金会开发的一个开源流处理平台由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统它可以处理消费者规模的网站中的所有动作流数据。 这种动作网页浏览搜索和其他用户的行动是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop一样的日志数据和离线分析系统但又要求实时处理的限制这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理也是为了通过集群来提供实时的消息。 常见消息中间件对比图 1.4 RabbitMQ简介 AMQP即 Advanced Message Queuing Protocol高级消息队列协议是一个网络协议是应用层协议 的一个开放标准为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息并不受客户端/中 间件不同产品不同的开发语言等条件的限制。2006年AMQP 规范发布。类比HTTP。 2007年Rabbit 技术公司基于 AMQP 标准开发的 RabbitMQ 1.0 发布。RabbitMQ 采用 Erlang 语言开发。 Erlang 语言由 Ericson 设计专门为开发高并发和分布式系统的一种语言在电信领域使用广泛。 1.5 RabbitMQ中的核心概念 Broker接收和分发消息的应用RabbitMQ Server就是 Message Broker 。 Virtual host出于多租户和安全因素设计的把 AMQP 的基本组件划分到一个虚拟的分组中类似于网 络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时可以划分出多个vhost每个用户在自己的 vhost 创建 exchangequeue 等。 Connectionpublisherconsumer 和 broker 之间的 TCP 连接 。 Channel如果每一次访问 RabbitMQ 都建立一个 Connection在消息量大的时候建立 TCP Connection 的开销将是巨大的效率也较低。Channel 是在 connection 内部建立的逻辑连接如果应用程序支持多线 程通常每个thread创建单独的 channel 进行通讯AMQP method 包含了channel id 帮助客户端和 message broker 识别 channel所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销。 Exchangemessage 到达 broker 的第一站根据分发规则匹配查询表中的 routing key分发消息到 queue 中去。常用的类型有direct (point-to-point), topic (publish-subscribe) and fanout (multicast) 。 Queue消息最终被送到这里等待 consumer 取走 。 Bindingexchange 和 queue 之间的虚拟连接binding 中可以包含 routing key。Binding 信息被保存 到 exchange 中的查询表中用于 message 的分发依据。 RabbitMQ架构图 2、RabbitMQ的安装与配置 步骤1 下载和运行 https://www.rabbitmq.com/ 安装文档参考:https://www.rabbitmq.com/download.html 对于rabbitmq的安装方式有很多种, 我们这里使用通过yum命令的方式进行安装, 因为rabbitmq是使用Erlang语言进行编写的, 所以我们在安装的时候需要先搭建Erlang的一个运行环境。 步骤2 安装Erlang 我们安装的rabbitmq的版本是3.7的,要求使用的Erlang版本必须是20.3.x的版本,我们可以去github下载对应的Erlang的对应版本https://github.com/rabbitmq/erlang-rpmvi /etc/yum.repos.d/rabbitmq-erlang.repoErlang 20.3.x# In /etc/yum.repos.d/rabbitmq-erlang.repo[rabbitmq-erlang] namerabbitmq-erlang baseurlhttps://dl.bintray.com/rabbitmq-erlang/rpm/erlang/20/el/7 gpgcheck1 gpgkeyhttps://dl.bintray.com/rabbitmq/Keys/rabbitmq-release-signing-key.asc repo_gpgcheck0 enabled1安装命令yum install erlang -y步骤3 安装rabbitmq-server 导入对应的验证签名:rpm --import https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc 创建一个rabbitmq的仓库文件:vi /etc/yum.repo.d/rabbitmq.repo [bintray-rabbitmq-server] namebintray-rabbitmq-rpm baseurlhttps://dl.bintray.com/rabbitmq/rpm/rabbitmq-server/v3.7.x/el/7/ gpgcheck0 repo_gpgcheck0 enabled1 安装命令yum install rabbitmq-server -y步骤四开启服务 添加rabbitmq-server开机启动 chkconfig rabbitmq-server on 启动服务: service rabbitmq-server start 停止服务 service rabbitmq-server stop 开启管理页面的插件 rabbitmq-plugins enable rabbitmq_management rabbitmq-plugins 查看页面信息 rabbitmq-plugins list步骤五开启guest用户的远程访问 vi /etc/rabbitmq/rabbitmq.config[{rabbit[{loopback_users[]}]}]。3、RabbitMQ的快速入门 RabbitMQ 提供了 6 种工作模式简单模式、work queues、Publish/Subscribe 发布与订阅模式、Routing 路由模式、Topics 主题模式、RPC 远程调用模式远程调用不太算 MQ暂不作介绍。 官网 https://www.rabbitmq.com/getstarted.html 入门需求 利用生产者发送消息到MQ ,消费者消费消息 操作步骤 创建生产者工程和消费者工程 添加依赖 dependenciesdependencygroupIdcom.rabbitmq/groupIdartifactIdamqp-client/artifactIdversion5.6.0/version/dependency /dependencies编写生产者 public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//1 创建一个连接工厂ConnectionFactory connectionFactory new ConnectionFactory();//2 设置 rabbitmq ip 地址connectionFactory.setHost(localhost);//3 创建 Connection 对象Connection connection connectionFactory.newConnection();//4 创建 ChannelChannel channel connection.createChannel();//5 设置队列属性/*** 第一个参数队列名称* 第二个参数队列是否要持久化* 第三个参数是否排他性* 第四个参数是否自动删除* 第五个参数是否要设置一些额外参数*/channel.queueDeclare(01-hello,true,false,false,null);//6 发送消息/*** 第一个参数 交换机名称* 第二个参数 路由 key* 第三个参数 消息属性* 第四个参数 消息内容*/channel.basicPublish(,01-hello, MessageProperties.PERSISTENT_TEXT_PLAIN, hello rabbitmq.getBytes());//7 关闭资源channel.close();connection.close();} }编写消费者 public class Consumer {public static void main(String[] args) throws Exception {//1 创建一个连接工厂ConnectionFactory connectionFactory new ConnectionFactory();//2 设置 rabbitmq ip 地址connectionFactory.setHost(localhost);//3 创建 Connection 对象Connection connection connectionFactory.newConnection();//4 创建 ChannelChannel channel connection.createChannel();//5 设置队列属性/*** 第一个参数队列名称* 第二个参数队列是否要持久化* 第三个参数是否排他性* 第四个参数是否自动删除,如果没有消费者连接就自动删除* 第五个参数是否要设置一些额外参数*/channel.queueDeclare(01-hello,true,false,false,null);//6 使用 channel 去 rabbitmq 中去取消息进行消费/*** 第一个参数队列名称* 第二个参数是否自动签收*/channel.basicConsume(01-hello, true, new DeliverCallback() {/*** 当消息从 mq 中取出来了会回调这个方法* 消费者消费消息就在这个 handle中去进行处理*/public void handle(String consumerTag, Delivery message) throws IOException {System.out.println(消息内容为 new String(message.getBody()));}}, new CancelCallback() {/*** 当消息取消了会回调这个方法* param consumerTag* throws IOException*/public void handle(String consumerTag) throws IOException {System.out.println(1111);}});} }4、RabbitMQ的常见工作模式 4.1 Work queues工作队列模式 4.1.1 模式说明 通过Helloworld工程我们已经能够构建一个简单的消息队列的基本项目项目中存在几个角色:生产 者、消费者、队列而对于我们真实的开发中 对于消息的消费者通过是有多个的。 比如在实现用户注册功能时用户注册成功会给响对应用户发送邮件同时给用户发送手机短信告诉用户已成功注册网站或者app 应用这种功能在大部分项目开发中都比较常见 而对于helloworld 的应用中虽然能够对 消息进行消费但是有很大问题: 消息消费者只有一个当消息量非常大时单个消费者处理消息就会变得很慢同时给节点页带来很大压力导致消息堆积越来越多。对于这种情况RabbitMQ 提供了工作 队列模式通过工作队列提供做个消费者对MQ产生的消息进行消费提高MQ消息的吞吐率降低消息的处理时间。处理模型图如下: 4.1.2 实现步骤 生产者 public class Producer {public static void main(String[] args) throws Exception{//1 创建一个连接工厂ConnectionFactory connectionFactory new ConnectionFactory();//2 设置 rabbititmq ip 地址connectionFactory.setHost(localhost);//3 创建 Conection 对象Connection connection connectionFactory.newConnection();//4 创建 ChanelChannel channel connection.createChannel();//5 设置队列属性/*** 第一个参数队列名称* 第二个参数队列是否要持久化* 第三个参数是否排他性* 第四个参数是否自动删除* 第五个参数是否要设置一些额外参数*/channel.queueDeclare(02-work1,true,false,false,null);//6 发送消息/*** 第一个参数 交换机名称* 第二个参数 路由 key* 第三个参数 消息属性* 第四个参数 消息内容*/for (int i 0; i 20; i) {String message hello rabbitmq : i;channel.basicPublish(,02-work1, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());}//7 关闭资源channel.close();connection.close();} }消费者1 public class Consumer {public static void main(String[] args) throws Exception {//1 创建一个连接工厂ConnectionFactory connectionFactory new ConnectionFactory();//2 设置 rabbititmq ip 地址connectionFactory.setHost(localhost);//3 创建 Conection 对象Connection connection connectionFactory.newConnection();//4 创建 Chanelfinal Channel channel connection.createChannel();//5 设置队列属性/*** 第一个参数队列名称* 第二个参数队列是否要持久化* 第三个参数是否排他性* 第四个参数是否自动删除,如果没有消费者连接就自动删除* 第五个参数是否要设置一些额外参数*/channel.queueDeclare(02-work1,true,false,false,null);channel.basicQos(1);//6使用 chanel 去 rabbitmq 中去取消息进行消费/*** 第一个参数队列名称* 第二个参数是否自动签收*/channel.basicConsume(02-work1, false, new DeliverCallback() {/*** 当消息从 mq 中取出来了会回调这个方法* 消费者消费消息就在这个 handle中去进行处理*/public void handle(String consumerTag, Delivery message) throws IOException {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(消费者 1消息内容为为 new String(message.getBody()));channel.basicAck(message.getEnvelope().getDeliveryTag(), false);}}, new CancelCallback() {/*** 当消息取消了会回调这个方法* param consumerTag* throws IOException*/public void handle(String consumerTag) throws IOException {System.out.println(1111);}});} }消费者2 public class Consumer1 {public static void main(String[] args) throws Exception {//1 创建一个连接工厂ConnectionFactory connectionFactory new ConnectionFactory();//2 设置 rabbititmq ip 地址connectionFactory.setHost(localhost);//3 创建 Conection 对象Connection connection connectionFactory.newConnection();//4 创建 Chanelfinal Channel channel connection.createChannel();//5 设置队列属性/*** 第一个参数队列名称* 第二个参数队列是否要持久化* 第三个参数是否排他性* 第四个参数是否自动删除,如果没有消费者连接就自动删除* 第五个参数是否要设置一些额外参数*/channel.queueDeclare(02-work1,true,false,false,null);channel.basicQos(1);//6使用 chanel 去 rabbitmq 中去取消息进行消费/*** 第一个参数队列名称* 第二个参数是否自动签收*/channel.basicConsume(02-work1, false, new DeliverCallback() {/*** 当消息从 mq 中取出来了会回调这个方法* 消费者消费消息就在这个 handle中去进行处理*/public void handle(String consumerTag, Delivery message) throws IOException {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(消费者 2消息内容为为 new String(message.getBody()));channel.basicAck(message.getEnvelope().getDeliveryTag(), false);}}, new CancelCallback() {/*** 当消息取消了会回调这个方法* param consumerTag* throws IOException*/public void handle(String consumerTag) throws IOException {System.out.println(1111);}});} }4.1.3 简单问题说明 从结果可以看出消息被平均分配到两个消费方来对消息进行处理提高了消息处理效率创建多个消费者来对消息进行处理。这里RabbitMQ采用轮询来对消息进行分发时保证了消息被平均分配到每个消费方 。 但是引入新的问题:真正的生产环境下对于消息的处理基本不会像我们现在看到的这样每个消 费方处理的消息数量是平均分配的比如因为网络原因机器cpu 内存等硬问题消费方处理消息时 同类消息不同机器进行处理时消耗时间也是不一样的比如1号消费者消费1条消息时1秒2号消费者消费1条消息是5秒对于1号消费者比2号消费者处理消息快那么在分配消息时就应该让1号消费者多收 到消息进行处理也即是我们通常所说的”能者多劳”,同样Rabbitmq对于这种消息分配模式提供了支持。 问题 任务量很大消息虽然得到了及时的消费单位时间内消息处理速度加快提高了吞吐量可 是不同消费者处理消息的时间不同导致部分消费者的资源被浪费。 解决采用消息公平分发 总结工作队列消息轮询分发消费者收到的消息数量平均分配单位时间内消息处理速度加快提高了吞吐量。 4.1.4 工作模式队列-消息公平分发(fair dispatch) 在案例01中对于 消息分发采用的是默认轮询分发消息应答采用的自动应答模式这是因为当消息进 入队列RabbitMQ就会分派消息。它不看消费者为应答的数目只是盲目的将第n条消息发给第n个消费者。 为了解决这个问题我们使用 basicQos(prefetchCount 1) 方法来限RabbitMQ只发不超过1条的消息给同一个消费者。当消息处理完毕后有了反馈才会进行第二次发送。执行模型图如下: 4.2 Pub/Sub 订阅模式 4.2.1 模式说明 在订阅模型中多了一个 Exchange 角色而且过程略有变化 P生产者也就是要发送消息的程序但是不再发送到队列中而是发给X交换机 C消费者消息的接收者会一直等待消息到来 Queue消息队列接收消息、缓存消息 Exchange交换机X。一方面接收生产者发送的消息。另一方面知道如何处理消息例如递交给某个特别队列、 递交给所有队列、或是将消息丢弃。到底如何操作取决于Exchange的类型。Exchange有常见以下3种类型 Fanout广播将消息交给所有绑定到交换机的队列 Direct定向把消息交给符合指定routing key 的队列 Topic通配符把消息交给符合routing pattern路由模式 的队列 Exchange交换机只负责转发消息不具备存储消息的能力因此如果没有任何队列与 Exchange 绑定或者没有符合路由规则的队列那么消息会丢失 4.2.2 实现步骤 生产者 public class Producer {public static void main(String[] args) throws Exception {//1 创建一个连接工厂ConnectionFactory connectionFactory new ConnectionFactory();//2 设置 rabbititmq ip 地址connectionFactory.setHost(localhost);//3 创建 Conection 对象Connection connection connectionFactory.newConnection();//4 创建 ChanelChannel channel connection.createChannel();//5 设置队列属性 // channel.queueDeclare(01-hello1,true,false,true,null);/*** 第一个参数交换机名字* 第二个参数交换机类型*/channel.exchangeDeclare(03-pubsub1,fanout);//6 发送消息/*** 第一个参数 交换机名称* 第二个参数 路由 key* 第三个参数 消息属性* 第四个参数 消息内容*/channel.basicPublish(03-pubsub1,, MessageProperties.PERSISTENT_TEXT_PLAIN,hello rabbitmq.getBytes());//7 关闭资源channel.close();connection.close();} }消费者1 public class Consumer {public static void main(String[] args) throws Exception {//1 创建一个连接工厂ConnectionFactory connectionFactory new ConnectionFactory();//2 设置 rabbititmq ip 地址connectionFactory.setHost(localhost);//3 创建 Conection 对象Connection connection connectionFactory.newConnection();//4 创建 ChanelChannel channel connection.createChannel();//5 设置队列属性channel.exchangeDeclare(03-pubsub1,fanout);String queue channel.queueDeclare().getQueue();channel.queueBind(queue,03-pubsub1,);//6使用 chanel 去 rabbitmq 中去取消息进行消费/*** 第一个参数队列名称* 第二个参数是否自动签收*/channel.basicConsume(queue, true, new DeliverCallback() {/*** 当消息从 mq 中取出来了会回调这个方法* 消费者消费消息就在这个 handle中去进行处理*/public void handle(String consumerTag, Delivery message) throws IOException {System.out.println(消费者 1 消息内容为 new String(message.getBody()));}}, new CancelCallback() {/*** 当消息取消了会回调这个方法* param consumerTag* throws IOException*/public void handle(String consumerTag) throws IOException {System.out.println(1111);}});} }消费者2 public class Consumer1 {public static void main(String[] args) throws Exception {//1 创建一个连接工厂ConnectionFactory connectionFactory new ConnectionFactory();//2 设置 rabbititmq ip 地址connectionFactory.setHost(localhost);//3 创建 Conection 对象Connection connection connectionFactory.newConnection();//4 创建 ChanelChannel channel connection.createChannel();//5 设置队列属性channel.exchangeDeclare(03-pubsub1,fanout);String queue channel.queueDeclare().getQueue();channel.queueBind(queue,03-pubsub1,);//6使用 chanel 去 rabbitmq 中去取消息进行消费/*** 第一个参数队列名称* 第二个参数是否自动签收*/channel.basicConsume(queue, true, new DeliverCallback() {/*** 当消息从 mq 中取出来了会回调这个方法* 消费者消费消息就在这个 handle中去进行处理*/public void handle(String consumerTag, Delivery message) throws IOException {System.out.println(消费者 2 消息内容为 new String(message.getBody()));}}, new CancelCallback() {/*** 当消息取消了会回调这个方法* param consumerTag* throws IOException*/public void handle(String consumerTag) throws IOException {System.out.println(1111);}});} }4.3 Routing 路由模式 4.3.1 模式说明 队列与交换机的绑定不能是任意绑定了而是要指定一个 RoutingKey路由key 。 消息的发送方在向 Exchange 发送消息时也必须指定消息的 RoutingKey 。 Exchange 不再把消息交给每一个绑定的队列而是根据消息的 Routing Key 进行判断只有队列的 Routingkey 与消息的 Routing key 完全一致才会接收到消息。 图解 P生产者向 Exchange 发送消息发送消息时会指定一个routing key XExchange交换机接收生产者的消息然后把消息递交给与 routing key 完全匹配的队列 C1消费者其所在队列指定了需要 routing key 为 error 的消息 C2消费者其所在队列指定了需要 routing key 为 info、error、warning 的消息 4.3.2 实现步骤 生产者 public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//1 创建一个连接工厂ConnectionFactory connectionFactory new ConnectionFactory();//2 设置 rabbititmq ip 地址connectionFactory.setHost(localhost);//3 创建 Conection 对象Connection connection connectionFactory.newConnection();//4 创建 ChanelChannel channel connection.createChannel();//5 设置队列属性 // channel.queueDeclare(01-hello1,true,false,true,null);/*** 第一个参数交换机名字* 第二个参数交换机类型*/channel.exchangeDeclare(04-routing1,direct);//6 发送消息/*** 第一个参数 交换机名称* 第二个参数 路由 key* 第三个参数 消息属性* 第四个参数 消息内容*/channel.basicPublish(04-routing1,error, MessageProperties.PERSISTENT_TEXT_PLAIN,hello rabbitmq.getBytes());//7 关闭资源channel.close();connection.close();} }消费者1 public class Consumer {public static void main(String[] args) throws Exception{//1 创建一个连接工厂ConnectionFactory connectionFactory new ConnectionFactory();//2 设置 rabbititmq ip 地址connectionFactory.setHost(localhost);//3 创建 Conection 对象Connection connection connectionFactory.newConnection();//4 创建 ChanelChannel channel connection.createChannel();//5 设置队列属性channel.exchangeDeclare(04-routing1,direct);String queue channel.queueDeclare().getQueue();channel.queueBind(queue,04-routing1,info);channel.queueBind(queue,04-routing1,error);channel.queueBind(queue,04-routing1,waring);//6使用 chanel 去 rabbitmq 中去取消息进行消费/*** 第一个参数队列名称* 第二个参数是否自动签收*/channel.basicConsume(queue, true, new DeliverCallback() {/*** 当消息从 mq 中取出来了会回调这个方法* 消费者消费消息就在这个 handle中去进行处理*/public void handle(String consumerTag, Delivery message) throws IOException {System.out.println(消费者 1 消息内容为 new String(message.getBody()));}}, new CancelCallback() {/*** 当消息取消了会回调这个方法* param consumerTag* throws IOException*/public void handle(String consumerTag) throws IOException {System.out.println(1111);}});} }消费者2 public class Consumer1 {public static void main(String[] args) throws Exception {//1 创建一个连接工厂ConnectionFactory connectionFactory new ConnectionFactory();//2 设置 rabbititmq ip 地址connectionFactory.setHost(localhost);//3 创建 Conection 对象Connection connection connectionFactory.newConnection();//4 创建 ChanelChannel channel connection.createChannel();//5 设置队列属性channel.exchangeDeclare(04-routing1,direct);String queue channel.queueDeclare().getQueue();channel.queueBind(queue,04-routing1,trace);channel.queueBind(queue,04-routing1,error);//6使用 chanel 去 rabbitmq 中去取消息进行消费/*** 第一个参数队列名称* 第二个参数是否自动签收*/channel.basicConsume(queue, true, new DeliverCallback() {/*** 当消息从 mq 中取出来了会回调这个方法* 消费者消费消息就在这个 handle中去进行处理*/public void handle(String consumerTag, Delivery message) throws IOException {System.out.println(消费者 2 消息内容为 new String(message.getBody()));}}, new CancelCallback() {/*** 当消息取消了会回调这个方法* param consumerTag* throws IOException*/public void handle(String consumerTag) throws IOException {System.out.println(1111);}});} }4.4 Topic 模式 4.4.1 模式介绍 Topic 类型与 Direct 相比都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型 Exchange 可以让队列在绑定 Routing key 的时候使用通配符 Routingkey 一般都是有一个或多个单词组成多个单词之间以”.”分割例如 item.insert 通配符规则# 匹配一个或多个词* 匹配不多不少恰好1个词例如item.# 能够匹配 item.insert.abc 或者 item.insertitem.* 只能匹配 item.insert 图解 红色 Queue绑定的是 usa.# 因此凡是以 usa. 开头的 routing key 都会被匹配到 黄色 Queue绑定的是 #.news 因此凡是以 .news 结尾的 routing key 都会被匹配 4.2.2 实现步骤 生产者 public class Producer {public static void main(String[] args) throws Exception{//1 创建一个连接工厂ConnectionFactory connectionFactory new ConnectionFactory();//2 设置 rabbititmq ip 地址connectionFactory.setHost(localhost);//3 创建 Conection 对象Connection connection connectionFactory.newConnection();//4 创建 ChanelChannel channel connection.createChannel();//5 设置队列属性 // channel.queueDeclare(01-hello1,true,false,true,null);/*** 第一个参数交换机名字* 第二个参数交换机类型*/channel.exchangeDeclare(05-topic1,topic);//6 发送消息/*** 第一个参数 交换机名称* 第二个参数 路由 key* 第三个参数 消息属性* 第四个参数 消息内容*/channel.basicPublish(05-topic1,user.save, MessageProperties.PERSISTENT_TEXT_PLAIN,hello rabbitmq.getBytes());//7 关闭资源channel.close();connection.close();} }消费者1 public class Consumer {public static void main(String[] args) throws Exception {//1 创建一个连接工厂ConnectionFactory connectionFactory new ConnectionFactory();//2 设置 rabbititmq ip 地址connectionFactory.setHost(localhost);//3 创建 Conection 对象Connection connection connectionFactory.newConnection();//4 创建 ChanelChannel channel connection.createChannel();//5 设置队列属性channel.exchangeDeclare(05-topic1,topic);String queue channel.queueDeclare().getQueue();channel.queueBind(queue,05-topic1,employee.*);//6使用 chanel 去 rabbitmq 中去取消息进行消费/*** 第一个参数队列名称* 第二个参数是否自动签收*/channel.basicConsume(queue, true, new DeliverCallback() {/*** 当消息从 mq 中取出来了会回调这个方法* 消费者消费消息就在这个 handle中去进行处理*/public void handle(String consumerTag, Delivery message) throws IOException {System.out.println(消费者 1 employee消息内容为 new String(message.getBody()));}}, new CancelCallback() {/*** 当消息取消了会回调这个方法* param consumerTag* throws IOException*/public void handle(String consumerTag) throws IOException {System.out.println(1111);}});} }消费者2 public class Consumer1 {public static void main(String[] args) throws Exception{//1 创建一个连接工厂ConnectionFactory connectionFactory new ConnectionFactory();//2 设置 rabbititmq ip 地址connectionFactory.setHost(localhost);//3 创建 Conection 对象Connection connection connectionFactory.newConnection();//4 创建 ChanelChannel channel connection.createChannel();//5 设置队列属性channel.exchangeDeclare(05-topic1,topic);String queue channel.queueDeclare().getQueue();channel.queueBind(queue,05-topic1,user.*);//6使用 chanel 去 rabbitmq 中去取消息进行消费/*** 第一个参数队列名称* 第二个参数是否自动签收*/channel.basicConsume(queue, true, new DeliverCallback() {/*** 当消息从 mq 中取出来了会回调这个方法* 消费者消费消息就在这个 handle中去进行处理*/public void handle(String consumerTag, Delivery message) throws IOException {System.out.println(消费者 2 user消息内容为 new String(message.getBody()));}}, new CancelCallback() {/*** 当消息取消了会回调这个方法* param consumerTag* throws IOException*/public void handle(String consumerTag) throws IOException {System.out.println(1111);}});} }4.5 工作模式总结 1、简单模式 HelloWorld 一个生产者、一个消费者不需要设置交换机使用默认的交换机。 2、工作队列模式 Work Queue 一个生产者、多个消费者竞争关系不需要设置交换机使用默认的交换机。 3、发布订阅模式 Publish/subscribe 需要设置类型为 fanout 的交换机 并且交换机和队列进行绑定 当发送消息到交换机后交换机会将消息发送到绑定的队列。 4、路由模式 Routing 需要设置类型为 direct 的交换机 交换机和队列进行绑定 并且指定 routing key当发送消息到交换机 后 交换机会根据 routing key 将消息发送到对应的队列。 5、通配符模式 Topic 需要设置类型为 topic 的交换机 交换机和队列进行绑定 并且指定通配符方式的 routing key 当发送消息到交换机后 交换机会根据 routing key 将消息发送到对应的队列。 5、Springboot环境快速集成RabbitMQ 5.1 Hello World 简单模式 生产端操作步骤 创建生产者SpringBoot工程 引入start依赖坐标 parentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion2.3.2.RELEASE/version/parentdependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependencydependencygroupIdcom.rabbitmq/groupIdartifactIdamqp-client/artifactIdversion5.6.0/version/dependency/dependencies编写properties配置基本信息配置 定义交换机队列以及绑定关系的配置类 注入RabbitTemplate调用方法完成消息发送 生产者 RestController public class SendController {Autowiredprivate RabbitTemplate rabbitTemplate;RequestMapping(/sendMsgHello)public String sendMsg(String msg) {rabbitTemplate.convertAndSend(, boot-queue, msg);return 发送成功;} }消费端操作步骤 消费端 创建消费者SpringBoot工程 引入start依赖坐标 dependency groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-amqp/artifactId /dependency 编写yml配置基本信息配置 定义监听类使用RabbitListener注解完成队列监听。 消费者 Component public class Consumer {RabbitListener(queuesToDeclare Queue(boot-queue))public void consumer(String msg) {System.out.println(消息内容为 msg);} }5.2 Work 模式 配置项 spring:rabbitmq:listener:direct:acknowledge-mode: manualprefetch: 1生产者 RequestMapping(/sendWorkMsg)public String sendWorkMsg(String msg){for (int i 0; i 20; i ) {rabbitTemplate.convertAndSend(,springboot_work,msg);}return 发送成功;}消费者1 Component public class Consumer1 {RabbitListener(queuesToDeclare Queue(springboot_work))public void consumer(String msg, Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws IOException {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(消费者 1-消息内容为 msg);channel.basicAck(deliveryTag,true);} }消费者2 Component public class Consumer2 {RabbitListener(queuesToDeclare Queue(springboot_work))public void consumer(String msg, Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws IOException {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(消费者 2-消息内容为 msg);channel.basicAck(deliveryTag,true);}}5.3 Pub/Sub模式 生产者 RequestMapping(/sendPubSubMsg)public String sendPubSubMsg(String msg) {rabbitTemplate.convertAndSend(springboot-pubsub, , msg);return 发送成功;}消费者1 Component public class Consumer1 {RabbitListener(bindings QueueBinding(value Queue,exchange Exchange(value springboot-pubsub,type fanout)))public void consumer(String msg, Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws IOException {System.out.println(消费者 1-消息内容为 msg);channel.basicAck(deliveryTag,true);} }消费者2 Component public class Consumer2 {RabbitListener(bindings QueueBinding(value Queue,exchange Exchange(value springboot-pubsub,type fanout)))public void consumer(String msg, Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws IOException {System.out.println(消费者 2-消息内容为 msg);channel.basicAck(deliveryTag,true);}} 5.4 Routing模式 生产者 RequestMapping(/sendRoutingMsg)public String sendRoutingMSg(String msg, String key) {rabbitTemplate.convertAndSend(springboot-routing, key, msg);return 发送成功;}消费者1 Component public class Consumer1 {RabbitListener(bindings QueueBinding(value Queue,exchange Exchange(value springboot-routing,type direct),key {info,error}))public void consumer(String msg, Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws IOException {System.out.println(消费者 1-消息内容为 msg);channel.basicAck(deliveryTag,true);}}消费者2 public class Consumer2 {RabbitListener(bindings QueueBinding(value Queue,exchange Exchange(value springboot-routing,type direct),key {trace}))public void consumer(String msg, Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws IOException {System.out.println(消费者 2-消息内容为 msg);channel.basicAck(deliveryTag,true);} }5.5 Topic模式 生产者 RequestMapping(/sendTopicMsg)public String sendTopicMSg(String msg, String key) {rabbitTemplate.convertAndSend(springboot-topic, key, msg);return 发送成功;}消费者1 Component public class Consumer1 {RabbitListener(bindings QueueBinding(value Queue,exchange Exchange(value springboot-topic,type topic),key {employee.*}))public void consumer(String msg, Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws IOException {System.out.println(消费者 1-消息内容为employee msg);channel.basicAck(deliveryTag,true);} }消费者2 Component public class Consumer2 {RabbitListener(bindings QueueBinding(value Queue,exchange Exchange(value springboot-topic,type topic),key {user.*}))public void consumer(String msg, Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws IOException {System.out.println(消费者 2-消息内容为user msg);channel.basicAck(deliveryTag,true);}}6、RabbitMQ实战案例 6.1 案例一 退款加积分 需求 用户进行下单操作 下完单以后要进行加积分操作 要求下完单以后加积分采用RabbitMQ 进行加积分。 案例分析 1 、下完单以后把数据封装成消息利用生产者发送消息到RabbitMQ中。 2、在积分服务编写消费者实时监听RabbitMQ队列中消息监听到取出消息消费加积分。 没有RabbitMQ之前 有了RabbitMQ以后 代码实现 6.2 案例二 秒杀下单操作 需求 电商平台进行秒杀活动用户点击下单秒杀商品进行下单要求用RabbitMQ进行削峰填谷。 没有RabbitMQ之前 有了RabbitMQ以后 7、 RabbitMQ高频面试题 7.1 RabbitMQ如果出现消息重复消费怎么解决 采用幂等性解决幂等性指一次和多次请求某一个资源对于资源本身应该具有同样的结果。也就是说其任意多次执行对资源本身所产生的影响均与一次执行的影响相同。 在MQ中指消费多条相同的消息得到与消费该消息一次相同的结果。 7.2 RabbitMQ中的死信队列 死信队列英文缩写DLX 。Dead Letter Exchange死信交换机当消息成为Dead message后可以被重新发送到另一个交换机这个交换机就是DLX。 消息成为死信的三种情况 队列消息长度到达限制 消费者拒接消费消息basicNack/basicReject,并且不把消息重新放入原目标队列,requeuefalse 原队列存在消息过期设置消息到达超时时间未被消费 7.3 RabbitMQ 怎么实现消息可靠性 7.3.1 生产者投递可靠性 在使用 RabbitMQ 的时候作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。 confirm 确认模式 return 退回模式 rabbitmq 整个消息投递的路径为 producer—rabbitmq broker—exchange—queue—consumer 消息从 producer 到 exchange 则会返回一个 confirmCallback 。 消息从 exchange–queue 投递失败则会返回一个 returnCallback 。 我们将利用这两个 callback 控制消息的可靠性投递 7.3.2 消费者投递可靠性 ack指Acknowledge确认。 表示消费端收到消息后的确认方式。 有三种确认方式 自动确认acknowledge“none” 手动确认acknowledge“manual” 根据异常情况确认acknowledge“auto”这种方式使用麻烦不作讲解 其中自动确认是指当消息一旦被Consumer接收到则自动确认收到并将相应 message 从 RabbitMQ 的消息缓存中移除。但是在实际业务处理中很可能消息接收到业务处理出现异常那么该消息就会丢失。如果设置了手动确认方式则需要在业务处理成功后调用channel.basicAck()手动签收如果出现异常则调用channel.basicNack()方法让其自动重新发送消息。 7.4 RabbitMQ 如何实现延迟队列 延迟队列即消息进入队列后不会立即被消费只有到达指定时间后才会被消费。 需求 下单后30分钟未支付取消订单回滚库存。 新用户注册成功7天后发送短信问候。 实现方式 定时器 延迟队列 使用延时队列实现 延迟队列 指消息进入队列后可以被延迟一定时间再进行消费。 RabbitMQ没有提供延迟队列功能但是可以使用 TTL DLX 来实现延迟队列效果。 非常感谢您阅读到这里如果这篇文章对您有帮助希望能留下您的点赞 关注 分享 留言thanks
http://www.tj-hxxt.cn/news/224351.html

相关文章:

  • 儿童网站模板免费下载厦门营销型网站
  • 南通企业网站seo建站目的
  • 沈阳做网站在哪wordpress自动视频播放
  • 网站访问大小小程序搭建需要多久
  • 哪个网站可以做室内设计seo的公司排名
  • 北京 网站建设公司个人网站有哪些举例
  • 成都网站建设哪家强个人备案域名可以做哪些网站吗
  • 重庆网站建设怎么样深圳智加设计公司
  • 免费房屋建设图纸网站有哪些wordpress浏览器不兼容
  • 做信息流推广需要建立网站么app运营成本估算
  • 包头 网站建设北京营销型网站定制
  • 网站建设文化代理商用.net做网站好 还是用php
  • 机械厂网站建设方案活动网页怎么做
  • 大型网络建站公司公司网站制作需要找广告公司么
  • 怎么做网站访问统计深圳品牌网站制作推荐
  • 网站繁体jswordpress数据库文件导入
  • 东莞麻涌网站建设网站建设工作的函
  • 网站seo优化技巧宁夏建设工程造价网
  • 如何做网站管理维护wordpress主题付费
  • 网站下方一般放什么原因固始做网站的公司
  • 个旧做网站哪家公司好专业的标志设计公司
  • wordpress付费剧集网站怎么做网站的关键词库
  • 服装网站建设的技术可行性网络策划是什么意思
  • 上海网站制作网站建设vps如何放置网站
  • 做推广网站公司wordpress怎么获取数据库名
  • 请人做软件开发的网站h5移动网站开发
  • 建网站有多少种方式江苏工程信息网
  • 高端网站开发怎么选傻瓜式网页制作网站
  • 沈阳网站制作系统网站开发找公司好还是个人
  • 网站建设php实验报告山东做网站建设公司