当前位置: 首页 > news >正文

西安哪家网站做的好郑州网站排名推广

西安哪家网站做的好,郑州网站排名推广,怎么建立网站链接,域名的正确书写格式目录 1.SpringAMQP 1.1.导入demo工程 1.2.快速入门 1.2.1.消息发送 1.2.2.消息接收 1.2.3.测试 1.3.WorkQueues模型 1.4.交换机类型 1.4.1.Fanout交换机 1.4.2.Direct交换机 1.4.3.Topic交换机 1.5.声明队列和交换机 1.5.1.基于注解声明 1.6.消息转换器 1.6.1.测…

目录

1.SpringAMQP

1.1.导入demo工程

1.2.快速入门

1.2.1.消息发送

1.2.2.消息接收

1.2.3.测试

1.3.WorkQueues模型

1.4.交换机类型

1.4.1.Fanout交换机

1.4.2.Direct交换机

1.4.3.Topic交换机

1.5.声明队列和交换机

1.5.1.基于注解声明

1.6.消息转换器

1.6.1.测试默认转换器

1.6.2.配置JSON转换器

1.6.3.消费者接收Object


1.SpringAMQP

将来我们开发业务功能的时候,肯定不会在控制台收发消息,而是应该基于编程的方式。由于RabbitMQ采用了AMQP协议,因此它具备跨语言的特性。任何语言只要遵循AMQP协议收发消息,都可以与RabbitMQ交互。并且RabbitMQ官方也提供了各种不同语言的客户端。

但是,RabbitMQ官方提供的Java客户端编码相对复杂,一般生产环境下我们更多会结合Spring来使用。而Spring的官方刚好基于RabbitMQ提供了这样一套消息收发的模板工具:SpringAMQP。并且还基于SpringBoot对其实现了自动装配,使用起来非常方便。

SpringAMQP官方网址:Spring AMQP

SpringAMQP提供了三个功能:
- 自动声明队列、交换机及其绑定关系
- 基于注解的监听器模式,异步接收消息
- 封装了RabbitTemplate工具,用于发送消息

1.1.导入demo工程

为方便我们学习SpringAMQP的使用,导入demo工程:

然后用Idea打开,项目结构如图:

包括三部分:
- mq-demo:父工程,管理项目依赖
- publisher:消息的发送者
- consumer:消息的消费者

在该工程中已经配置好了SpringAMQP相关的依赖:
主要是AMQP的依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.itcast.demo</groupId><artifactId>mq-demo</artifactId><version>1.0-SNAPSHOT</version><modules><module>publisher</module><module>consumer</module></modules><packaging>pom</packaging><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.12</version><relativePath/></parent><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!--单元测试--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency></dependencies>
</project>

现在可以直接使用SpringAMQP了。

1.2.快速入门

在之前的案例中,我们都是经过交换机发送消息到队列,不过有时候为了测试方便,我们也可以直接向队列发送消息,跳过交换机。

在入门案例中,我们就演示这样的简单模型,如图:

也就是:
- publisher直接发送消息到队列
- 消费者监听并处理队列中的消息

注意:这种模式一般测试使用,很少在生产中使用。

为了方便测试,我们现在控制台新建一个队列:simple.queue

添加成功:

接下来,我们就可以利用Java代码收发消息了。
 

1.2.1.消息发送

首先配置MQ地址,在publisher服务的application.yml中添加配置:

spring:rabbitmq:host: 192.168.52.135 # 你的虚拟机IPport: 5672 # 端口virtual-host: /hmall # 虚拟主机  之前已经创建的username: hmall # 用户名password: 123 # 密码

然后在publisher服务中编写测试类SpringAmqpTest,并利用RabbitTemplate实现消息发送:
核心代码:

打开控制台,可以看到消息已经发送到队列中:

接下来,我们再来实现消息接收。
 

1.2.2.消息接收

首先配置MQ地址,在consumer服务的application.yml中添加配置:

spring:rabbitmq:host: 192.168.52.135 # 你的虚拟机IPport: 5672 # 端口virtual-host: /hmall # 虚拟主机username: hmall # 用户名password: 123 # 密码

然后在consumer服务的com.itheima.consumer.listener包中新建一个类SpringRabbitListener核心代码如下:

@Component
public class SpringRabbitListener {// 利用RabbitListener来声明要监听的队列信息// 将来一旦监听的队列中有了消息,就会推送给当前服务,调用当前方法,处理消息。// 可以看到方法体中接收的就是消息体的内容@RabbitListener(queues = "simple.queue")public void listenSimpleQueueMessage(String msg) throws InterruptedException {System.out.println("spring 消费者接收到消息:【" + msg + "】");}
}

1.2.3.测试

启动consumer服务,然后在publisher服务中运行测试代码,发送MQ消息。最终consumer收到消息:

1.3.WorkQueues模型

Work queues,任务模型。简单来说就是多个消费者绑定到一个队列,共同消费队列中的消息

当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。
此时就可以使用work 模型,多个消费者共同处理消息处理,消息处理的速度就能大大提高了。

接下来,我们就来模拟这样的场景。

首先,我们在控制台创建一个新的队列,命名为work.queue

(1)消息发送

这次我们循环发送,模拟大量消息堆积现象。

在publisher服务中的SpringAmqpTest类中添加一个测试方法:

(2)消息接收

要模拟多个消费者绑定同一个队列,我们在consumer服务的SpringRabbitListener中添加2个新的方法:

注意到这两消费者,都设置了Thead.sleep,模拟任务耗时:
- 消费者1 sleep了20毫秒,相当于每秒钟处理50个消息
- 消费者2 sleep了200毫秒,相当于每秒处理5个消息

(3)测试

启动ConsumerApplication后,在执行publisher服务中刚刚编写的发送测试方法testWorkQueue。最终结果如下:

可以看到消费者1和消费者2每人都消费了25条消息
- 消费者1很快完成了自己的25条消息
- 消费者2却在缓慢的处理自己的25条消息。

也就是说消息是平均分配给每个消费者(轮询,并没有考虑到消费者的处理能力。导致1个消费者空闲,另一个消费者忙的不可开交。没有充分利用每一个消费者的能力,最终消息处理的耗时远远超过了1秒。这样显然是有问题的。

为了解决这样的问题,有以下配置:

(4)能者多劳

在spring中有一个简单的配置,可以解决这个问题。我们修改consumer服务的application.yml文件,添加配置:

spring:rabbitmq:listener:simple:prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

再次测试可以发现:由于消费者1处理速度较快,所以处理了更多的消息;消费者2处理速度较慢,只处理了6条消息。而最终总的执行耗时也在1秒左右,大大提升。

正所谓能者多劳,这样充分利用了每一个消费者的处理能力,可以有效避免消息积压问题。

(5)总结
Work模型的使用:

  • 多个消费者绑定到一个队列,加快处理速度,同一条消息只会被一个消费者处理

  • 通过设置prefetch来控制消费者预取的消息数量,处理完一条再拿下一条,能者多劳。

1.4.交换机类型

在之前的两个测试中,都没有交换机,生产者直接发送消息到队列。而一旦引入交换机,消息发送的模式会有很大变化:

可以看到,在订阅模型中,多了一个exchange角色,而且过程略有变化:
- Publisher:生产者,不再发送消息到队列中,而是发给交换机
- Exchange:交换机,一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。
- Queue:消息队列也与以前一样,接收消息、缓存消息。不过队列一定要与交换机绑定。
- Consumer:消费者,与以前一样,订阅队列,没有变化

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

交换机的类型有四种:

  • Fanout广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机

  • Direct订阅,基于RoutingKey(路由key)发送给订阅了消息的队列

  • Topic通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符

  • Headers头匹配,基于MQ的消息头匹配,用的较少。

1.4.1.Fanout交换机

广播模式下,消息发送流程是这样的:

- 1)  可以有多个队列
- 2)  每个队列都要绑定到Exchange(交换机)
- 3)  生产者发送的消息,只能发送到交换机
- 4)  交换机把消息发送给绑定过的所有队列
- 5)  订阅队列的消费者都能拿到消息

接下来通过一个案例入门:

(1)声明队列和交换机

在控制台创建队列fanout.queue1,再以同样的方法创建队列fanoutqueue2:

然后再创建一个交换机:

然后绑定队列fanout.queue1到交换机:再以同样的方式绑定fanout.queue2到交换机

(2)消息发送

在publisher服务的SpringAmqpTest类中添加测试方法:

@Test
public void testFanoutExchange() {// 交换机名称String exchangeName = "hmall.fanout";// 消息String message = "hello, everyone!";rabbitTemplate.convertAndSend(exchangeName, "", message);//与之前的测试不同的是,这个方法需要有三个参数。//三个参数,中间的参数可以为空。
}

(3)消息接收

在consumer服务的SpringRabbitListener中添加两个方法,作为消费者:

@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}

(4)总结

交换机的作用是什么?

(1)接收publisher发送的消息
(2)将消息按照规则路由到与之绑定的队列
(3)不能缓存消息,路由失败,消息丢失
(4)FanoutExchange的会将消息路由到每个绑定的队列

1.4.2.Direct交换机

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
基本流程如下:

在Direct模型下:
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
- 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey。
- Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

接下来通过一个案例入门:

(1)声明队列和交换机

首先在控制台声明两个队列direct.queue1direct.queue2,这里不再展示过程:

然后声明一个direct类型的交换机,命名为hmall.direct:

然后使用redblue作为key,绑定direct.queue1hmall.direct

同理,使用redyellow作为key,绑定direct.queue2hmall.direct,步骤略,最终结果:

(2)消息接收

在consumer服务的SpringRabbitListener中添加方法:

@RabbitListener(queues = "direct.queue1")
public void listenDirectQueue1(String msg) {System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}@RabbitListener(queues = "direct.queue2")
public void listenDirectQueue2(String msg) {System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}

(3)消息发送

在publisher服务的SpringAmqpTest类中添加测试方法:

@Test
public void testSendDirectExchange() {// 交换机名称String exchangeName = "hmall.direct";// 消息String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "red", message);
}

由于使用的red这个key,所以两个消费者都收到了消息:

我们再切换为blue这个key:

@Test
public void testSendDirectExchange() {// 交换机名称String exchangeName = "hmall.direct";// 消息String message = "最新报道,哥斯拉是居民自治巨型气球,虚惊一场!";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "blue", message);
}

你会发现,只有消费者1收到了消息:

(4)总结
描述下Direct交换机与Fanout交换机的差异?

- Fanout交换机将消息路由给每一个与之绑定的队列
- Direct交换机根据RoutingKey判断路由给哪个队列
- 如果多个队列具有相同的RoutingKey,则与Fanout功能类似

1.4.3.Topic交换机

Topic类型的ExchangeDirect相比,都是可以根据RoutingKey把消息路由到不同的队列。
只不过Topic类型Exchange可以让队列在绑定BindingKey 的时候使用通配符!

BindingKey 一般都是有一个或多个单词组成,多个单词之间以.分割,例如: item.insert

通配符规则:
- #:匹配一个或多个词
- *:匹配不多不少恰好1个词

举例:
- item.#:能够匹配item.spu.insert 或者 item.spu
- item.*:只能匹配item.spu

图示:

假如此时publisher发送的消息使用的RoutingKey共有四种:
- china.news 代表有中国的新闻消息;
- china.weather 代表中国的天气消息;
- japan.news 则代表日本新闻
- japan.weather 代表日本的天气消息;

解释:
- topic.queue1:绑定的是china.# ,凡是以 china.开头的routing key 都会被匹配到,包括:
  - china.news
  - china.weather
- topic.queue2:绑定的是#.news ,凡是以 .news结尾的 routing key 都会被匹配。包括:
  - china.news
  - japan.news

接下来,我们就按照上图所示,来演示一下Topic交换机的用法。

首先,在控制台按照图示例子创建队列、交换机,并利用通配符绑定队列和交换机。此处步骤略。最终结果如下:

(1)消息发送

在publisher服务的SpringAmqpTest类中添加测试方法:

/*** topicExchange*/
@Test
public void testSendTopicExchange() {// 交换机名称String exchangeName = "hmall.topic";// 消息String message = "喜报!孙悟空大战哥斯拉,胜!";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}

(2)消息接收

在consumer服务的SpringRabbitListener中添加方法:

@RabbitListener(queues = "topic.queue1")
public void listenTopicQueue1(String msg){System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
}@RabbitListener(queues = "topic.queue2")
public void listenTopicQueue2(String msg){System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
}

(3)总结
描述下Direct交换机与Topic交换机的差异?

(1)Topic交换机接收的消息RoutingKey必须是多个单词,以 . 分割
(2)Topic交换机与队列绑定时的bindingKey可以指定通配符
(3)#:代表0个或多个词
(4)*:代表1个词

1.5.声明队列和交换机

在之前我们都是基于RabbitMQ控制台来创建队列、交换机。但是在实际开发时,队列和交换机是程序员定义的,将来项目上线,又要交给运维去创建。那么程序员就需要把程序中运行的所有队列和交换机都写下来,交给运维。在这个过程中是很容易出现错误的。

因此推荐的做法是由程序启动时检查队列和交换机是否存在,如果不存在自动创建。

1.5.1.基于注解声明

基于@Bean的方式声明队列和交换机比较麻烦,这里我们只讲解基于注解方式来声明。

注意声明的文件是Listener下的,在监听者位置声明

我们同样声明Direct模式的交换机和队列:

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}

再试试Topic模式:

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),key = "china.#"
))
public void listenTopicQueue1(String msg){System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),key = "#.news"
))
public void listenTopicQueue2(String msg){System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
}

非常简便

1.6.消息转换器

Spring的消息发送代码接收的消息体是一个Object:

而在数据传输时,它会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。
只不过,默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:
- 数据体积过大
- 有安全漏洞
- 可读性差

我们来测试一下。

1.6.1.测试默认转换器

1)创建测试队列
首先,我们在consumer服务中声明一个新的配置类:

利用@Bean的方式创建一个队列,
具体代码:

注意,这里我们先不要给这个队列添加消费者,我们要查看消息体的格式。
重启consumer服务以后,该队列就会被自动创建出来了:

2)发送消息
我们在publisher模块的SpringAmqpTest中新增一个消息发送的代码,发送一个Map对象:

发送消息后查看控制台:

可以看到消息格式非常不友好。

1.6.2.配置JSON转换器

显然,JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。

publisherconsumer两个服务中都引入依赖:

<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version>
</dependency>

注意!如果项目中引入了spring-boot-starter-web依赖,则无需再次引入Jackson依赖。

配置消息转换器,在publisherconsumer两个服务的启动类中添加一个Bean即可:

@Bean
public MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;
}

消息转换器中添加的messageId可以便于我们将来做幂等性判断。

此时,我们到MQ控制台删除object.queue中的旧的消息。然后再次执行刚才的消息发送的代码,到MQ的控制台查看消息结构:

1.6.3.消费者接收Object

我们在consumer服务中定义一个新的消费者,publisher是用Map发送,那么消费者也一定要用Map接收,格式如下:

@RabbitListener(queues = "object.queue")
public void listenSimpleQueueMessage(Map<String, Object> msg) throws InterruptedException {System.out.println("消费者接收到object.queue消息:【" + msg + "】");
}

 

http://www.tj-hxxt.cn/news/172.html

相关文章:

  • 大连网站制作怎么做seo建站是什么
  • 做地方网站收益怎么样百度关键词seo
  • 齐河网站建设公司百度指数的数值代表什么
  • 网站说服力 营销...seo流量工具
  • 网页上的视频如何下载厦门seo排名公司
  • 网站用什么技术实现win7优化工具哪个好用
  • 做蔬菜配送有什么网站可下载了解百度服务热线电话
  • 重庆永川网站建设价格电脑培训速成班多少钱
  • 山东省住房和城乡建设厅网站主页电商网站策划
  • 网站建设公司软件开发容易被百度收录的网站
  • 建设企业网站包含什么做外贸怎么推广
  • 做门窗做什么网站好怎么做百度关键词排名
  • 手机网站域名开头qq群引流推广平台免费
  • 怎么通过做网站来赚钱吗上google必须翻墙吗
  • 做网站用的pm是啥seo排名优化app
  • 注册商标官网入口zac seo博客
  • 有哪些可以做调查的网站西安seo托管
  • 网站结构图怎么做seo的主要工作内容
  • 网站上地图怎么做的市场推广计划怎么写
  • 坂田做网站建设好的网络公司湖南网站制作公司
  • 大型网站团队人数今日nba比赛直播
  • 男女做恩爱视频网站整站优化工具
  • 武汉做网站公司有哪些网页设计与制作模板
  • 世界500强企业排名2020网站seo置顶 乐云践新专家
  • 泰安市网站建设如何做好产品网络推广
  • 唐山市城市建设规划局网站seo人人网
  • 怎么做网约车短视频seo
  • 阿里小说网站模板seo网站诊断方案
  • 福建省品牌建设促进会网站seo如何提高排名
  • 专业做网站厂家网络营销常见术语