当前位置: 首页 > news >正文

河南省建设行业证书查询网站上海网站推广公司

河南省建设行业证书查询网站,上海网站推广公司,ubuntu系统做网站,免费ppt成品在接触RammitMQ时,好多文章都说在配置中设置属性 # rabbitmq 配置 rabbitmq:host: xxx.xxx.xxx.xxxport: xxxxusername: xxxpassword: xxxxxx## 生产端配置# 开启发布确认,就是confirm模式. 消费端ack应答后,才将消息从队列中删除#确认消息已发送到队列(Queue)pub…

在接触RammitMQ时,好多文章都说在配置中设置属性 

# rabbitmq 配置
rabbitmq:host: xxx.xxx.xxx.xxxport: xxxxusername: xxxpassword: xxxxxx## 生产端配置# 开启发布确认,就是confirm模式. 消费端ack应答后,才将消息从队列中删除#确认消息已发送到队列(Queue)publisher-returns: true#确认消息已发送到交换机(Exchange)publisher-confirm-type: correlatedlistener: #消费者 端配置retry:enabled: true # 是否支持重试default-requeue-rejected: falsemax-attempts: 5 #最大重试次数initial-interval: 3000 # 重试时间间隔direct:acknowledge-mode: manualsimple:acknowledge-mode: manual

消息接收消息失败时,可以重复调用5次;按照此操作,发现没有重复调用。

----------------------------------正确思路---------------------------------------------------------------------------------

设置完配置文件属性后,在代码中利用redis与channel.basicNack联合使用,将错误记录保存至数据库,方便查找原因;

---------------------------------------代码

package com.charg.listener;import com.charg.common.constant.CacheConstants;
import com.charg.common.constant.Constants;
import com.charg.common.utils.JsonUtils;
import com.charg.common.utils.redis.RedisUtils;
import com.charg.constant.RabbitConstants;
import com.charg.product.domain.bo.ProductDeviceBo;
import com.charg.product.domain.bo.RabMsgLogBo;
import com.charg.product.service.IProductDeviceService;
import com.charg.product.service.IRabMsgLogService;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.time.Duration;/*** rabbitmq 监听*/
@Slf4j
@Component
public class RabbitQueueListener {/*** 最大重试次数*/private static int maxReconsumeCount = 3;@Autowiredprivate StringRedisTemplate redisTemplate;/*** 监听  队列的处理器** @param message*/@RabbitListener(queues = "队列名称")@RabbitHandlerpublic void onMessage(Message message, Channel channel) {//唯一标识String messageId = message.getMessageProperties().getMessageId();try {//判断messageId在redis中是否存在if (verificationMessageId(messageId)) {log.error("消息已重复处理,拒绝再次接收...");// 拒绝消息channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);} else {//不存在 则处理消息// 接收消息if (StringUtils.isNotBlank(new String(message.getBody()))) {//修改业务逻辑if (!false) {log.error("消息即将再次返回队列处理...逻辑错误");// 处理最大回调次数getMaximumNumber(message, channel);} else {channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);//加入缓存addMessageId(message);}} else {log.info("消息为空拒绝接收...");// 拒绝消息channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);}}} catch (Exception e) {e.printStackTrace();try {if (message.getMessageProperties().getRedelivered()) {log.error("消息已重复处理,拒绝再次接收----...");// 拒绝消息channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);} else {log.error("消息即将再次返回队列处理...");// 处理最大回调次数getMaximumNumber(message, channel);}} catch (Exception exception) {exception.printStackTrace();}}}/*** 记录消息最大次数** @param message* @param channel* @throws IOException*/private void getMaximumNumber(Message message, Channel channel) {try {int recounsumeCounts = RedisUtils.getCacheObject("messageMaxKey"+message.getMessageProperties().getMessageId()) == null ? 0 : RedisUtils.getCacheObject("messageMaxKey"+message.getMessageProperties().getMessageId());if (maxReconsumeCount > recounsumeCounts) {log.info("maxMessageId(message.getMessageProperties().getMessageId())=" + recounsumeCounts);channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);// 记录重试次数maxMessageId(message.getMessageProperties().getMessageId());} else {log.info("次数达到三次了呢---------" + RedisUtils.getCacheObject(CacheConstants.MESSAGE_MAX_KEY + message.getMessageProperties().getMessageId()));// 将消息重新放回队列channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);// 清除缓存RedisUtils.deleteObject("messageMaxKey" + message.getMessageProperties().getMessageId());//重试三次后,还是失败 需记录到数据库addRabMsgLog(message);}} catch (Exception e) {e.printStackTrace();}}/*** 设置消息的最大重试次数*/public void maxMessageId(String messageId) {String messageMax ="messageMaxKey"+ messageId;// 存入缓存,用来记录该消息重试了几次if (RedisUtils.hasKey(messageMax)) {RedisUtils.incrAtomicValue(messageMax);} else {//错误的消息-插入数据库RedisUtils.setCacheObject(messageMax, 1, Duration.ofHours(Constants.MESSAGE_TIME));}}/*** 校验消息是否消费过该消息** @param messageId 消息id* @return*/public boolean verificationMessageId(String messageId) {// 消息是否存在keyString verifyIsExistKey ="messageExistKey" + messageId;if ((RedisUtils.hasKey(verifyIsExistKey))) {return true;}return false;}/*** 保存消费过消息** @param message 消息* @return*/public void addMessageId(Message message) {// 存入缓存RedisUtils.setCacheObject("messageExistKey" + message.getMessageProperties().getMessageId(), message.getMessageProperties().getMessageId(), 1);}/*** 消息队列 失败日志 操作* 自己存数据库逻辑*/public void addRabMsgLog(Message message) {log.info("====操作日志===");//将内容记录到数据库}}
--------------------------------数据库表

 

http://www.tj-hxxt.cn/news/86024.html

相关文章:

  • wordpress禁用媒体库seo企业站收录
  • 政府网站建设和数据开放共享现在最火的推广平台
  • 做电子商务网站需要学什么百度广告优化师
  • 绵阳网站建设软件有哪些百度seo关键词排名 s
  • 网站备份数据库综合搜索引擎
  • 做图骂人的图片网站网站推广营销运营方式
  • 视频直播间宁波seo外包公司
  • 房山新农村建设网站怎么找专业的营销团队
  • 动态网站开发服务器端脚本语言超级优化
  • 网站开发报告参考文献网站在线优化工具
  • 政府网站建设推进app下载免费安装
  • 公司做的网站费用计入什么科目深圳网络营销的公司哪家好
  • 高级网站开发工程师证网络销售挣钱吗
  • 用pageadmin做的网站用什么虚拟主机号网站查询网
  • 做网站人员工资建立网站平台需要多少钱
  • 学校网站建设调研报告收录提交入口
  • flash网站代做中国优秀网页设计案例
  • 网站找图片做海报侵权网站流量分析工具
  • 做电商怎么建网站怎么设计一个网页
  • 网站建设可以一次性进损益吗深圳百度关键
  • 网站建设论文选题背景steam交易链接怎么用
  • web前端开发中的web指的是企业网站优化推广
  • 3 建设营销型网站流程品牌seo推广
  • 网站主题类型廊坊seo排名收费
  • wordpress慢死了搜索引擎优化的方法和技巧
  • wordpress ckplayer独立站优化
  • 欧美在线网站设计教程爱用建站官网
  • 怎么更新网站备案资料石家庄seo报价
  • bl做视频网站seo招聘
  • 如何利用影视网站做cpa福州seo推广公司