当前位置: 首页 > news >正文

佛山微网站建设天博平台营销

佛山微网站建设天博,平台营销,营销图片素材,网站建设用的服务器文章目录 二、Python-flask-rabbitMQ-插件方式整合引言具体步骤1 安装依赖:2 编写实体类:3 编写消费者和生产者:4 初始化消费者和生产者:5 其他地方使用生产者 二、Python-flask-rabbitMQ-插件方式整合 引言 当今互联网应用的高…

文章目录

  • 二、Python-flask-rabbitMQ-插件方式整合
    • 引言
    • 具体步骤
      • 1 安装依赖:
      • 2 编写实体类:
      • 3 编写消费者和生产者:
      • 4 初始化消费者和生产者:
      • 5 其他地方使用生产者

二、Python-flask-rabbitMQ-插件方式整合

引言

当今互联网应用的高并发场景下,如何保证可靠的消息传递和高效的处理成为了一项重要的挑战。在这种情况下,RabbitMQ作为一种可靠的消息队列系统,被广泛应用于各个领域。

本文将介绍如何使用RabbitMQ插件整合Flask框架,实现并发性生产者和消费者的兼容。通过这种方式,我们可以利用RabbitMQ的优势来确保消息的可靠传递,并提高应用程序的处理能力。

首先,我们需要在Flask应用程序中引入RabbitMQ插件。可以使用Pika库来连接和操作RabbitMQ。通过创建一个连接池管理连接对象,我们可以避免频繁地创建和销毁连接,提高效率。

同时,为了处理高并发的生产者,我们可以使用批量发送消息的方式来减少通道创建和消息发布的开销。通过设置缓冲区来收集一定数量或一定时间间隔内的消息,然后批量发送,可以更有效地利用资源。

此外,对于高并发的消费者,我们可以考虑使用异步的方式来处理消息。通过将消息发送任务交给后台线程或异步任务队列处理,可以避免请求的堵塞,提高应用程序的并发能力。

通过以上的优化方案,我们可以在Flask应用程序中充分利用RabbitMQ的功能,并且兼容高并发的生产者和消费者。这将帮助我们构建更可靠、高效的消息队列系统,应对日益增长的并发访问需求。

总之,使用RabbitMQ插件整合Flask框架,并采用优化方案来兼容并发性生产者和消费者,是构建可靠、高效消息传递系统的关键一步。通过这种方式,我们能够更好地应对高并发场景下的挑战,提升应用程序的性能和稳定性。

具体步骤

1 安装依赖:

使用pip安装pika库:

pip install pika

2 编写实体类:

from queue import Queue
from threading import Lockimport pika# 定义交换机类型的枚举值
class ExchangeType:DEFAULT = 'default'DIRECT = "direct"FANOUT = "fanout"TOPIC = 'topic'class RabbitMQ:def __init__(self, host='localhost', port=5672, username='guest', password='guest', pool_size=10):self.credentials = pika.PlainCredentials(username, password)self.parameters = pika.ConnectionParameters(host=host, port=port, credentials=self.credentials)self.connection_pool = Queue(pool_size)  # 连接池,存储连接和信道self.lock = Lock()  # 互斥锁,用于对连接池的访问进行同步for _ in range(pool_size):connection = self._create_connection()channel = connection.channel()self.connection_pool.put((connection, channel))def _create_connection(self):return pika.BlockingConnection(self.parameters)def get_channel(self):with self.lock:connection, channel = self.connection_pool.get()  # 从连接池获取连接和信道return connection, channeldef release_channel(self, connection, channel):with self.lock:self.connection_pool.put((connection, channel))  # 将连接和信道放回连接池def send_message(self, exchange, routing_key, message, exchange_type=ExchangeType.DEFAULT):connection, channel = self.get_channel()try:channel.exchange_declare(exchange=exchange, exchange_type=exchange_type)  # 声明交换机并指定类型channel.basic_publish(exchange=exchange, routing_key=routing_key, body=message)  # 发布消息finally:self.release_channel(connection, channel)def receive_messages(self, queue, callback):connection, channel = self.get_channel()try:channel.queue_declare(queue=queue, durable=True)  # 声明队列并标记为持久化# channel.queue_purge(queue=queue)  # 清空队列,以防之前的非持久化消息残留channel.basic_qos(prefetch_count=10)  # 每次从 RabbitMQ 获取 10 条消息channel.basic_consume(queue=queue, on_message_callback=callback, auto_ack=False)  # 消费消息并设置回调函数channel.start_consuming()  # 开始消费消息finally:self.release_channel(connection, channel)

3 编写消费者和生产者:

def connect_mq(app):# 初始化 RabbitMQ 实例rabbitmq = RabbitMQ(host=app.config['RABBITMQ_HOST'], port=5672, username='guest', password='guest')# 在应用上下文中注册 RabbitMQ 实例app.config['RABBITMQ'] = rabbitmq# consume_mq(app)#thread = threading.Thread(target=consume_mq, args=(app,))# 启动线程thread.start()def consume_mq(app):# 启动消费者程序,开始接收和处理消息def callback(ch, method, properties, body):try:print(f"消息队列内容 {body.decode()}")# 处理rabbitMQ内容to_transcribe(body.decode())except Exception as e:print(str(e))ch.basic_ack(delivery_tag=method.delivery_tag)# 启动消费者程序,开始接收和处理消息with app.app_context():rabbitmq = current_app.config['RABBITMQ']rabbitmq.receive_messages('audio_queue', callback)

4 初始化消费者和生产者:

def create_app():app = Flask(__name__)connect_mq_v1(app)

5 其他地方使用生产者


class MessageHandler:"""处理存放音频,将所有的任务都放在MQ里面"""def __init__(self, dir_name,uuid_str, back_url, file_url, request_type, file_name, *args, **kwargs):# 文件夹名称self.dir_name = dir_name# 文件名称self.file_name = file_name# 文件上传类型self.request_type = request_type# 文件存储位置self.file_url = file_url# 客户端回调地址self.back_url = back_url# 唯一标识self.uuid_str = uuid_strdef send(self):""":param content_type:队列类型:param rpc:MQ对象:return:"""try:# 发送消息队列# rpc.send_expire(body=json.dumps(self.to_json()), exchange='audio_queue', key='audio_queue')rabbitmq = current_app.config['RABBITMQ']rabbitmq.send_message('audio_queue', 'audio_queue', json.dumps(self.to_json()))print("发送消息到mq成功,用于存放音频信息")except Exception as e:print(f"发送消息到mq服务失败,请检查, {e}")def to_json(self):_dict = self.__dict__return _dict
# 将请求体和uuid放到rabbitMQ中
MessageHandler(**dates).send()
http://www.tj-hxxt.cn/news/84461.html

相关文章:

  • 政府网站开发成本外贸网站推广费用
  • 付费视频网站开发bt磁力在线种子搜索神器
  • 关于网站建设的介绍桂林网站设计制作
  • 网站怎么做 织梦站长工具5g
  • 有了实名制域名怎么做网站推广app是什么工作
  • 跨境电商网站设计唯尚广告联盟平台
  • 2014 网站建设免费建立个人网站申请
  • javaweb做视频网站原理qq引流推广软件免费
  • 网站建设公司发展网站运营专员
  • 微信网站页面制作北京网站优化方法
  • 网站建设教程asp上海优化外包
  • asp网站数据库扫描网上做广告推广
  • 建设淘宝网站的市场分析aso优化师
  • 响应式博客网站模板长沙seo步骤
  • 网站首页设计多少钱网站优化方案怎么写
  • 富阳网站开发成都网站建设公司排名
  • 网页设计师课程开封网站优化公司
  • 新乡专业做网站公司公司免费推广网站
  • 黄村做网站哪家好百度的营销方式有哪些
  • 西安企业网站备案一般得多少天网页版百度
  • 杭州网站制作方法网络营销方案
  • 黑白色调网站搜索推广广告
  • 知名高端网站建设企业seo职业发展
  • 电商网站的付款功能网页设计排版布局技巧
  • 北京网站设计哪家公司好培训机构哪家好
  • 怎样在门户网站做 推广网络营销介绍
  • 欧洲vodafonewifi巨大app3di石家庄百度搜索优化
  • 网站怎么做反向代理大连谷歌seo
  • 贷款平台哪个好下款关键词首页排名优化公司推荐
  • 网站页面做成自适应优缺点凡科建站后属于自己的网站吗