当前位置: 首页 > news >正文

网站qq启动链接怎么做域名注册服务网站哪个好

网站qq启动链接怎么做,域名注册服务网站哪个好,网站运营是什么,wordpress主题出错大纲 Pom.xml监听队列实时返回消息测试完整代码工程代码 在之前的案例中,我们在管理后台收发消息都是通过短连接的形式。本文我们将探索对队列中消息的实时读取,并通过流式数据返回给客户端。 webflux是反应式Web框架,客户端可以通过一个长连…

大纲

  • Pom.xml
  • 监听队列
  • 实时返回消息
  • 测试
  • 完整代码
  • 工程代码

在之前的案例中,我们在管理后台收发消息都是通过短连接的形式。本文我们将探索对队列中消息的实时读取,并通过流式数据返回给客户端。
webflux是反应式Web框架,客户端可以通过一个长连接和服务端相连,后续服务端可以通过该连接持续给客户端发送消息。可以达到:发送一次,多次接收的效果。

Pom.xml

由于我们要使用Rabbitmq,所以要新增如下依赖

		<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-stream</artifactId></dependency>

webflux的依赖如下:

		<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId></dependency><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-core</artifactId><version>3.6.7</version></dependency>

监听队列

下面代码会返回一个监听队列的Container

    private SimpleMessageListenerContainer getListener(String queueName, MessageListener messageListener) {lock.lock();try {SimpleMessageListenerContainer listener = listeners.get(queueName);if (listener == null && messageListener != null) {listener = new SimpleMessageListenerContainer();listener.setConnectionFactory(connectionFactory);listener.setQueueNames(queueName);listener.setMessageListener(messageListener);listeners.put(queueName, listener);}return listener;} finally {lock.unlock();}}

实时返回消息

一旦消费者读取到消息,onMessage方法会被调用。然后Flux的消费者会将消息投递到流上。

    public Flux<String> listen(String queueName) {return Flux.create(emitter -> {SimpleMessageListenerContainer container = getListener(queueName, new MessageListener() {@Overridepublic void onMessage(Message message) {String msg = new String(message.getBody());System.out.println("listen function Received message: " + msg);emitter.next(msg);}});container.start();});}

测试

由于OpenApi不能支持实时展现流式数据,所以我们采用Postman来测试。
发送请求后,该页面一直处于滚动状态。
在这里插入图片描述
在管理后台发送一条消息
在这里插入图片描述
可以看到Postman收到了该消息
在这里插入图片描述
然后在发一条,Postman又会收到一条
在这里插入图片描述
这样我们就完成了“请求一次,多次返回”的效果。

完整代码

需要注意的是,返回的格式需要标记为produces = “text/event-stream”。

// controller
package com.rabbitmq.consumer.controller;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;import com.rabbitmq.consumer.service.ConsumerService;import reactor.core.publisher.Flux;@RestController
@RequestMapping("/consumer")
public class ConsumerController {@Autowiredprivate ConsumerService comsumerService;@GetMapping(value = "/listen", produces = "text/event-stream")public Flux<String> listen(@RequestParam String queueName) {return comsumerService.listen(queueName);}
}
// service
package com.rabbitmq.consumer.service;import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import jakarta.annotation.PostConstruct;
import reactor.core.publisher.Flux;@Service
public class ConsumerService {@Autowiredprivate RabbitTemplate rabbitTemplate;private ConnectionFactory connectionFactory;private final ReentrantLock lock = new ReentrantLock();private Map<String, SimpleMessageListenerContainer> listeners = new java.util.HashMap<>();@PostConstructpublic void init() {connectionFactory = rabbitTemplate.getConnectionFactory();}public Flux<String> listen(String queueName) {return Flux.create(emitter -> {SimpleMessageListenerContainer container = getListener(queueName, new MessageListener() {@Overridepublic void onMessage(Message message) {String msg = new String(message.getBody());System.out.println("listen function Received message: " + msg);emitter.next(msg);}});container.start();});}private SimpleMessageListenerContainer getListener(String queueName, MessageListener messageListener) {lock.lock();try {SimpleMessageListenerContainer listener = listeners.get(queueName);if (listener == null && messageListener != null) {listener = new SimpleMessageListenerContainer();listener.setConnectionFactory(connectionFactory);listener.setQueueNames(queueName);listener.setMessageListener(messageListener);listeners.put(queueName, listener);}return listener;} finally {lock.unlock();}}
}

工程代码

https://github.com/f304646673/RabbitMQDemo/tree/main/consumer

http://www.tj-hxxt.cn/news/40825.html

相关文章:

  • html在线记账网站模板网站权重查询工具
  • 网站代理软件优化电脑的软件有哪些
  • 企业网站推广阶段网站推广的作用在哪里
  • 太仓做企业网站it行业培训机构哪个好
  • 深圳福田网站建设公司哪家好十大禁止安装应用入口
  • centos安装wordpress引擎优化
  • 建设营销网站昆明关键词优化
  • ps网站主页按钮怎么做软文代写平台有哪些
  • 企业代办公司西安seo整站优化
  • 做彩票网站违法的吗咖啡的营销推广软文
  • 金溪那里可以做网站百度信息流投放技巧
  • 宝山网站建设服务网站外链有多重要
  • 网站建设的目标与期望最近发生的热点事件
  • wordpress主题转html如何软件网站优化公司
  • 宁津哪个网络公司做网站比较好项目平台
  • asp 建站友情链接免费发布平台
  • 做动漫头像的网站社交媒体营销案例
  • 装配式建筑网站制作一个网站步骤
  • c 网站开发技术基本seo
  • 从做系统后以前的网站打不开了怎么办理网站设计
  • 做门户网站的思路查询域名注册信息
  • 网站悬挂备案号网站建设合同
  • html制作新闻信息展示页面苏州seo优化公司
  • 哪个商城网站建设好关键词整站优化
  • 贵州省建设厅实名认证网站新闻头条最新消息摘抄
  • 做现货黄金看什么网站热搜榜排名前十
  • 网站开发网络公手机百度网盘网页版登录入口
  • 网站建设市场分析免费域名服务器
  • 做网站 成都sem与seo
  • 品牌营销型网站建设公司百度提交入口的网址