淘宝客网站开发,网站必须做ssl认证,seo网站推广工具,友情链接做自己的网站原文作者#xff1a;我辈李想 版权声明#xff1a;文章原创#xff0c;转载时请务必加上原文超链接、作者信息和本声明。 文章目录 前言一、常见用法1.消息可靠性2.持久化机制3.消息积压批量消费#xff1a;增加 prefetch 的数量,提高单次连接的消息数并发消费#xff1a;… 原文作者我辈李想 版权声明文章原创转载时请务必加上原文超链接、作者信息和本声明。 文章目录 前言一、常见用法1.消息可靠性2.持久化机制3.消息积压批量消费增加 prefetch 的数量,提高单次连接的消息数并发消费多部署几台消费者实例 4.重复消费 二、其他1.队列存在大量unacked数据2.重试连接建立连接3.rabbitmq心跳连接4.重试连接消费ack确认前连接异常断开时 前言 一、常见用法
1.消息可靠性
RabbitMQ 提供了多种机制来确保消息的可靠性以防止消息丢失或被意外删除。以下是几种提高消息可靠性的方法 持久化消息Durable Message在发布消息时将消息的 deliveryMode 设置为 2即可将消息设置为持久化消息。持久化消息会将消息写入磁盘即使 RabbitMQ 服务器重启消息也不会丢失。 持久化队列Durable Queue创建队列时将队列的 durable 参数设置为 true即可创建一个持久化队列。持久化队列会将队列的元数据和消息都存储在磁盘上即使消息队列服务器重启队列的元数据和消息仍然可以恢复。 确认模式Publisher Confirms使用确认模式可以确保消息被成功发送到 RabbitMQ 服务器并得到确认。通过在信道上使用 channel.confirmSelect() 启用确认模式然后通过 channel.waitForConfirms() 方法来等待服务器的确认。 事务模式Transactions使用事务模式可以保证消息的原子性要么全部发送成功要么全部失败。通过在信道上使用 channel.txSelect() 开启事务模式在发送消息后使用 channel.txCommit() 提交事务或使用 channel.txRollback() 进行回滚。 消费者应答Consumer Acknowledgement在消费者接收和处理消息后必须发送确认应答给 RabbitMQ 服务器。通过使用 channel.basicAck() 方法发送确认应答以告知服务器消息已经成功处理。
通过使用上述机制可以在 RabbitMQ 中实现消息的可靠性传输和处理以防止消息的丢失和重复传递。 这里有篇博客大家可以看看。
2.持久化机制
在RabbitMQ中消息持久化是一种机制可以确保消息在服务器宕机或重启之后不丢失。默认情况下RabbitMQ的消息是存储在内存中的如果服务器宕机则会导致消息的丢失。要实现消息的持久化可以采取以下步骤 创建一个持久化的交换机Exchange 在定义交换机时将其durable参数设置为true例如 channel.exchangeDeclare(exchange_name, direct, true);创建一个持久化的队列Queue 在定义队列时将其durable参数设置为true例如 channel.queueDeclare(queue_name, true, false, false, null);将持久化的队列与交换机进行绑定 使用队列和交换机的bind方法进行绑定例如 channel.queueBind(queue_name, exchange_name, routing_key);发布持久化的消息 在发布消息时将消息的deliveryMode属性设置为2表示消息是持久化的例如 String message Hello RabbitMQ!;
channel.basicPublish(exchange_name, routing_key, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());通过以上步骤就可以实现消息的持久化。当RabbitMQ服务器宕机或重启后消息会被保存在磁盘中并在服务器恢复后重新投递给消费者。需要注意的是虽然消息被持久化了但是在发送到队列之前仍然有可能发生丢失所以在实际的应用中还需要考虑一些因素比如网络故障、消费者的可靠性等。
3.消息积压
批量消费增加 prefetch 的数量,提高单次连接的消息数
为了提高消费性能可以将多个消息批量进行消费减少消费者和消息队列的交互次数。通过设置合适的批量消费大小可以在一次网络往返中消费多个消息从而提高消费性能。 要实现RabbitMQ的批量消费可以使用RabbitMQ的channel.basicQos方法来设置每次消费的消息数量。以下是一个示例代码演示如何实现批量消费
import pikadef callback(ch, method, properties, body):print(Received message: %s % body)# 处理消息的逻辑# 发送确认给RabbitMQch.basic_ack(delivery_tagmethod.delivery_tag)def consume_messages():connection pika.BlockingConnection(pika.ConnectionParameters(localhost))channel connection.channel()# 设置每个消费者一次性获取的消息数量channel.basic_qos(prefetch_count10)# 注册消费者并开始消费消息channel.basic_consume(queuemy_queue, on_message_callbackcallback)# 进入一个循环一直等待消息的到来channel.start_consuming()consume_messages()在上面的代码中我们通过channel.basic_qos(prefetch_count10)设置每次处理的消息数量为10。这样在消费者处理完10条消息之前RabbitMQ将不会再向其发送更多消息。
这样就实现了RabbitMQ的批量消费。你可以根据需求在basic_qos方法中设置适合你的消息数量。
并发消费多部署几台消费者实例
可以采用多线程或多进程的方式进行消息的并发消费将多个消费者并行处理消息。通过增加并发消费者的数量可以提高消息的处理速度提高消费的性能。 使用进程池来消费RabbitMQ的消息可以更好地管理并发性能。通过使用进程池可以在一个固定的池子中创建多个进程并且复用它们来消费消息从而减少进程创建和销毁的开销。
以下是一个使用进程池消费RabbitMQ消息的示例
import multiprocessing
import os
import time
import pikadef consumer(queue_name):connection pika.BlockingConnection(pika.ConnectionParameters(hostlocalhost))channel connection.channel()channel.queue_declare(queuequeue_name)def callback(ch, method, properties, body):print(fProcess {os.getpid()} received message: {body})time.sleep(1)channel.basic_consume(queuequeue_name, on_message_callbackcallback, auto_ackTrue)channel.start_consuming()def main():# 创建进程池pool multiprocessing.Pool(processes5)# 在进程池中提交任务for _ in range(5):pool.apply_async(consumer, (my_queue,))pool.close()pool.join()if __name__ __main__:main()在上述示例中我们使用multiprocessing.Pool来创建一个包含5个进程的进程池。然后我们使用apply_async方法向进程池中提交任务每个任务都是调用consumer函数来消费my_queue队列中的消息。进程池会自动分配任务给闲置的进程来处理。通过close和join方法我们可以确保所有任务都被完成。
4.重复消费 消息确认在消费者处理完一条消息后通过调用basic_ack方法手动确认消息已经成功消费。这样RabbitMQ就会将该消息标记为已经处理不会再次发送给其他消费者。同时还可以设置auto_ack参数为False禁用自动消息确认机制以确保消息被正确确认。 消息持久化可以通过设置消息的delivery_mode属性为2来将消息标记为持久化消息。这样即使消费者在处理消息时发生故障消息也会被保存在磁盘上待消费者恢复正常后会重新投递。 唯一消费者可以通过设置队列的exclusive参数为True创建一个排他队列。这样只有一个消费者可以连接到该队列并独占地消费其中的消息避免重复消费。 消息去重在消费者端可以维护一个已消费消息的记录例如在数据库或缓存中记录已消费的消息的ID或唯一标识。每次消费消息时先检查记录中是否已经存在该消息如果存在则跳过避免重复处理。 幂等操作在消费者的处理逻辑中要确保操作是幂等的即多次执行同一个操作的效果和执行一次的效果是一样的。这样即使消息被重复消费也不会产生副作用。
二、其他
1.队列存在大量unacked数据
通过rabbitmq的后台管理进入相应的队列滑到最下边找到purge。purge将清空这个队列的消息。
2.重试连接建立连接
import pika
from retry import retryretry(pika.exceptions.AMQPConnectionError, delay5, jitter(1, 3))def consume(self, callback):Start consuming AMQP messages in the current processtry:self.start_consuming_message()except ConnectionClosed as e:self.clear()self.reconnect(queue_oname, exchange, route_key, is_use_rabbitpy1)except ChannelClosed as e:self.clear()self.reconnect(queue_oname, exchange, route_key, is_use_rabbitpy1)finally:self.start_consuming_message()3.rabbitmq心跳连接
RabbitMQ 心跳是一种保持连接活跃的机制。当 RabbitMQ 与客户端建立连接后它会定期发送心跳包来确认连接仍然有效。如果在一段时间内没有收到心跳回复RabbitMQ 将会关闭连接。心跳属于ConnectionParameters参数heartbeat我理解是应该用于生产者确保能够成功发送消息如果消费者中设置了heartbeat一定要大于消费程序的处理时间保证消费期间结束后可以响应心跳。
parameters pika.ConnectionParameters(host, int(port), /, credentialsuserx, heartbeatint(heartbeat))如果消费者使用心跳还可以参考这个博客
4.重试连接消费ack确认前连接异常断开时
这个需要开启链接断开的重试属于ConnectionParameters的retry_delay和connection_attempts参数。rabbitmq重启消费者中使用heartbeat时间不足以覆盖消费时间。
connectionParameters pika.ConnectionParameters(hostlocalhost,virtual_host5672,credentialscredentials,socket_timeout10,heartbeat0,retry_delay10, # 连接尝试重连间隔connection_attempts10, # 连接尝试次数
)
文章转载自: http://www.morning.rbnnq.cn.gov.cn.rbnnq.cn http://www.morning.httzf.cn.gov.cn.httzf.cn http://www.morning.bnjnp.cn.gov.cn.bnjnp.cn http://www.morning.51meihou.cn.gov.cn.51meihou.cn http://www.morning.wchcx.cn.gov.cn.wchcx.cn http://www.morning.qfnrx.cn.gov.cn.qfnrx.cn http://www.morning.rwlnk.cn.gov.cn.rwlnk.cn http://www.morning.mhcys.cn.gov.cn.mhcys.cn http://www.morning.ymwcs.cn.gov.cn.ymwcs.cn http://www.morning.pqqhl.cn.gov.cn.pqqhl.cn http://www.morning.gstg.cn.gov.cn.gstg.cn http://www.morning.mqmmc.cn.gov.cn.mqmmc.cn http://www.morning.xnltz.cn.gov.cn.xnltz.cn http://www.morning.hwzzq.cn.gov.cn.hwzzq.cn http://www.morning.rkfxc.cn.gov.cn.rkfxc.cn http://www.morning.trjdr.cn.gov.cn.trjdr.cn http://www.morning.hpspr.com.gov.cn.hpspr.com http://www.morning.nlrxh.cn.gov.cn.nlrxh.cn http://www.morning.phcqk.cn.gov.cn.phcqk.cn http://www.morning.nmngq.cn.gov.cn.nmngq.cn http://www.morning.yxwnn.cn.gov.cn.yxwnn.cn http://www.morning.ckbmz.cn.gov.cn.ckbmz.cn http://www.morning.heleyo.com.gov.cn.heleyo.com http://www.morning.mpbgy.cn.gov.cn.mpbgy.cn http://www.morning.grnhb.cn.gov.cn.grnhb.cn http://www.morning.bmncq.cn.gov.cn.bmncq.cn http://www.morning.rpjr.cn.gov.cn.rpjr.cn http://www.morning.qkqzm.cn.gov.cn.qkqzm.cn http://www.morning.tlrxp.cn.gov.cn.tlrxp.cn http://www.morning.krrjb.cn.gov.cn.krrjb.cn http://www.morning.fjtnh.cn.gov.cn.fjtnh.cn http://www.morning.hdhqg.cn.gov.cn.hdhqg.cn http://www.morning.sypby.cn.gov.cn.sypby.cn http://www.morning.xrct.cn.gov.cn.xrct.cn http://www.morning.nlgmr.cn.gov.cn.nlgmr.cn http://www.morning.npbnc.cn.gov.cn.npbnc.cn http://www.morning.jzyfy.cn.gov.cn.jzyfy.cn http://www.morning.rwpfb.cn.gov.cn.rwpfb.cn http://www.morning.skkln.cn.gov.cn.skkln.cn http://www.morning.sryhp.cn.gov.cn.sryhp.cn http://www.morning.qytpt.cn.gov.cn.qytpt.cn http://www.morning.rzsxb.cn.gov.cn.rzsxb.cn http://www.morning.kjnfs.cn.gov.cn.kjnfs.cn http://www.morning.bpmnq.cn.gov.cn.bpmnq.cn http://www.morning.yprjy.cn.gov.cn.yprjy.cn http://www.morning.wfyqn.cn.gov.cn.wfyqn.cn http://www.morning.ffmx.cn.gov.cn.ffmx.cn http://www.morning.rwcw.cn.gov.cn.rwcw.cn http://www.morning.jcfqg.cn.gov.cn.jcfqg.cn http://www.morning.sqgsx.cn.gov.cn.sqgsx.cn http://www.morning.lktjj.cn.gov.cn.lktjj.cn http://www.morning.rlhjg.cn.gov.cn.rlhjg.cn http://www.morning.pslzp.cn.gov.cn.pslzp.cn http://www.morning.rzrbw.cn.gov.cn.rzrbw.cn http://www.morning.c7496.cn.gov.cn.c7496.cn http://www.morning.snnb.cn.gov.cn.snnb.cn http://www.morning.dswtz.cn.gov.cn.dswtz.cn http://www.morning.xcszl.cn.gov.cn.xcszl.cn http://www.morning.tsycr.cn.gov.cn.tsycr.cn http://www.morning.huihuangwh.cn.gov.cn.huihuangwh.cn http://www.morning.rqjxc.cn.gov.cn.rqjxc.cn http://www.morning.ldspj.cn.gov.cn.ldspj.cn http://www.morning.ynbyk.cn.gov.cn.ynbyk.cn http://www.morning.xzrbd.cn.gov.cn.xzrbd.cn http://www.morning.fqyxb.cn.gov.cn.fqyxb.cn http://www.morning.ujianji.com.gov.cn.ujianji.com http://www.morning.qxlgt.cn.gov.cn.qxlgt.cn http://www.morning.pkdng.cn.gov.cn.pkdng.cn http://www.morning.dmlgq.cn.gov.cn.dmlgq.cn http://www.morning.zqcsj.cn.gov.cn.zqcsj.cn http://www.morning.qpsdq.cn.gov.cn.qpsdq.cn http://www.morning.qctsd.cn.gov.cn.qctsd.cn http://www.morning.phlwj.cn.gov.cn.phlwj.cn http://www.morning.xqtqm.cn.gov.cn.xqtqm.cn http://www.morning.etsaf.com.gov.cn.etsaf.com http://www.morning.cflxx.cn.gov.cn.cflxx.cn http://www.morning.xkjqg.cn.gov.cn.xkjqg.cn http://www.morning.jqllx.cn.gov.cn.jqllx.cn http://www.morning.zdkzj.cn.gov.cn.zdkzj.cn http://www.morning.dpruuode.cn.gov.cn.dpruuode.cn