当前位置: 首页 > news >正文

虫点子创意设计公司网站seo关键词排名推广

虫点子创意设计公司,网站seo关键词排名推广,昆明网络哪家好,怎么注册公司微信公众号RabbitMQ (4) 文章目录 1. 死信的概念2. 死信的来源3. 死信代码案例3.1 TTL 过期时间3.2 超过队列最大长度3.3 拒绝消息 前言   上文我们已经学习完 交换机 ,知道了几个交换机的使用 ,下面我们来学习一下 死信队列 1. 死信的概念 先从概念解释上搞清楚这…

RabbitMQ (4)

文章目录

  • 1. 死信的概念
  • 2. 死信的来源
  • 3. 死信代码案例
    • 3.1 TTL 过期时间
    • 3.2 超过队列最大长度
    • 3.3 拒绝消息

前言

上文我们已经学习完 交换机 ,知道了几个交换机的使用 ,下面我们来学习一下 死信队列

1. 死信的概念


先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到queue 里了,consumer 从 queue 取出消息 进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。

应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。还有比如说:用户在商城下单成功并点击去支付后在指定时间未支付时自动失效。

2. 死信的来源

  1. 消息 TTL 过期 : TTL 是 Time To Live 的缩写, TTL 就是 生存时间
  2. 队列达到最长长度 : 队列满了 , 无法添加数据到 MQ 中
  3. 消息被拒绝 (basic.reject 或 basic.nack) 并且 requeue = false

3. 死信代码案例

这里 创建一个 direct 交换机 ,两个消费者 , 一个生产者 , 两个 队列 (一个为 消息队列 , 一个为死信队列)


图:

在这里插入图片描述


代码 :

3.1 TTL 过期时间

生产者:

package org.example.seven;import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import org.example.utils.RabbitMQUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;// 生产者
public class Producer {// 普通交换机的名称public static final String NORMAL_EXCHANGE = "normal_exchange";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtils.getChannel();// 设置消息的过期时间 (TTL) 单位是 ms --> 设置消息的过期时间为 10sAMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();for (int i = 1; i < 11; i++) {String message = "info" + i;channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes());}}
}

消费者 c1 (启动之后关闭该消费者, 模拟其接受不到消息)

package org.example.seven;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.example.utils.RabbitMQUtils;import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeoutException;public class Consumer01 {// 普通交换机的名称public static final String NORMAL_EXCHANGE = "normal_exchange";// 死信交换机的名称public static final String DEAD_EXCHANGE = "dead_change";// 普通队列的名称public static final String NORMAL_QUEUE = "normal_queue";// 死刑队列的名称public static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtils.getChannel();// 声明死信和普通交换机 , 类型为 direct (直接交换机)// 普通交换机channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);// 死信交换机channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);// 声明普通队列Map<String, Object> arguments = new HashMap<>();// 过期时间 10s 由生产者指定 更加灵活
//        arguments.put("x-message-ttl", 10000);// 正常的队列设置死信交换机arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);// 设置死信 路由键 (routingKey)arguments.put("x-dead-letter-routing-key", "lisi");// 声明队列channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);// 申明死刑队列channel.queueDeclare(DEAD_QUEUE, false, false, false, null);// 绑定普通的交换机与队列channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");// 绑定死信的交换机与死信的队列channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");System.out.println("等待接受消息");DeliverCallback deliverCallback = (tag, message) -> {System.out.println("C1 接收到的消息为: " + new String(message.getBody(), "UTF-8"));};channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, (tag) -> {});}
}

先启动消费者 C1,创建出队列,然后停止该 C1 的运行,则 C1 将无法收到队列的消息,无法收到的消息 10 秒后进入死信队列。启动生产者 producer 生产消息

在这里插入图片描述


c1 看完,我们在来写 c2 消费者 ,将进入到死信队列的消息 进行消费.


消费者c2

package org.example.seven;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.example.utils.RabbitMQUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer02 {// 死信队列的名称public static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtils.getChannel();System.out.println("等待接受死信消息......");DeliverCallback deliverCallback = (tag, message) -> {System.out.println("C2 接收到的消息: " + new String(message.getBody(), "UTF-8"));};channel.basicConsume(DEAD_QUEUE, true, deliverCallback,(tag)->{});}
}


图:

在这里插入图片描述


看完 消息过期后 ,消息转发到 死信队列 被 c2 消费,下面我们来 尝试使用 死信最大长度 (队列满了,将多的消息转发到死信队列中)

3.2 超过队列最大长度

消息生产者代码 去掉 TTL 属性 , 将 basicPublish 的第三个参数改为 null


生产者:

package org.example.seven;import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import org.example.utils.RabbitMQUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;// 生产者
public class Producer {// 普通交换机的名称public static final String NORMAL_EXCHANGE = "normal_exchange";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtils.getChannel();// 设置消息的过期时间 (TTL) 单位是 ms --> 设置消息的过期时间为 10s
//        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();for (int i = 1; i < 11; i++) {String message = "info" + i;
//            channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes());channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, message.getBytes());}}
}


c1 消费者 (启动之后关闭该消费者 模拟其接收不到消息)

package org.example.seven;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.example.utils.RabbitMQUtils;import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeoutException;public class Consumer01 {// 普通交换机的名称public static final String NORMAL_EXCHANGE = "normal_exchange";// 死信交换机的名称public static final String DEAD_EXCHANGE = "dead_change";// 普通队列的名称public static final String NORMAL_QUEUE = "normal_queue";// 死刑队列的名称public static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtils.getChannel();// 声明死信和普通交换机 , 类型为 direct (直接交换机)// 普通交换机channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);// 死信交换机channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);// 声明普通队列Map<String, Object> arguments = new HashMap<>();// 过期时间 10s 由生产者指定 更加灵活
//        arguments.put("x-message-ttl", 10000);// 正常的队列设置死信交换机arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);// 设置死信 路由键 (routingKey)arguments.put("x-dead-letter-routing-key", "lisi");// 设置队列的限制 , 例如 发送 10 个消息 , 6 个为正常 , 4 个为死信arguments.put("x-max-length", 6);// 声明队列channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);// 申明死刑队列channel.queueDeclare(DEAD_QUEUE, false, false, false, null);// 绑定普通的交换机与队列channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");// 绑定死信的交换机与死信的队列channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");System.out.println("等待接受消息");DeliverCallback deliverCallback = (tag, message) -> {System.out.println("C1 接收到的消息为: " + new String(message.getBody(), "UTF-8"));};channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, (tag) -> {});}
}


注意:

这参数改变了(没有设置 ttl 时间,新增了 队列的 最大长度限制 为 6) ,所以 需要把原来队列删除


消费者c2 代码不变

package org.example.seven;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.example.utils.RabbitMQUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer02 {// 死信队列的名称public static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtils.getChannel();System.out.println("等待接受死信消息......");DeliverCallback deliverCallback = (tag, message) -> {System.out.println("C2 接收到的消息: " + new String(message.getBody(), "UTF-8"));};channel.basicConsume(DEAD_QUEUE, true, deliverCallback,(tag)->{});}
}


效果:

在这里插入图片描述


这里 之所以要启动 c1 后在关闭,是为了展示 6个消息放到 普通队列 ,4个消息放到死信队列, 如果不这么做,发送的 10个消息 都会被 c1 消费 ,(消息发送到 队列后 , 立马 转发给 c1 导致 队列就不会达到 6 个 ,队列不会满 ,也就不会将消息转化给 死信队列).

在这里插入图片描述

3.3 拒绝消息


消息生产者 和 消费者 c2 与上面的代码一样

这里我们 拒绝 info7 消息 ,想要 拒绝 info7 消息,我们可以采用手动应答.


消费者c1

package org.example.seven;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.example.utils.RabbitMQUtils;import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeoutException;public class Consumer01 {// 普通交换机的名称public static final String NORMAL_EXCHANGE = "normal_exchange";// 死信交换机的名称public static final String DEAD_EXCHANGE = "dead_change";// 普通队列的名称public static final String NORMAL_QUEUE = "normal_queue";// 死刑队列的名称public static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtils.getChannel();// 声明死信和普通交换机 , 类型为 direct (直接交换机)// 普通交换机channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);// 死信交换机channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);// 声明普通队列Map<String, Object> arguments = new HashMap<>();// 过期时间 10s 由生产者指定 更加灵活
//        arguments.put("x-message-ttl", 10000);// 正常的队列设置死信交换机arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);// 设置死信 路由键 (routingKey)arguments.put("x-dead-letter-routing-key", "lisi");// 设置队列的限制 , 例如 发送 10 个消息 , 6 个为正常 , 4 个为死信
//        arguments.put("x-max-length", 6);// 声明队列channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);// 申明死刑队列channel.queueDeclare(DEAD_QUEUE, false, false, false, null);// 绑定普通的交换机与队列channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");// 绑定死信的交换机与死信的队列channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");System.out.println("等待接受消息");DeliverCallback deliverCallback = (tag, message) -> {String msg = new String(message.getBody(), "UTF-8");if (msg.equals("info7")) {System.out.println("C1 接收到消息为: " + msg + " 此消息被 C1 拒绝");//requeue 设置为 false 代表拒绝重新入队 该队列如果配置了死信交换机将发送到死信队列中channel.basicReject(message.getEnvelope().getDeliveryTag(), false);} else {System.out.println("C1 接收到的消息为: " + msg);channel.basicAck(message.getEnvelope().getDeliveryTag(), false);}};
//        channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, (tag) -> {});// 开启 手动应答channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, (tag) -> {});}
}

在这里插入图片描述


文章转载自:
http://axiomatize.wjrtg.cn
http://anthrosphere.wjrtg.cn
http://aeolic.wjrtg.cn
http://asclepiadaceous.wjrtg.cn
http://bacco.wjrtg.cn
http://actinology.wjrtg.cn
http://cerdar.wjrtg.cn
http://canular.wjrtg.cn
http://bacteriotherapy.wjrtg.cn
http://boshbok.wjrtg.cn
http://christianly.wjrtg.cn
http://anonymously.wjrtg.cn
http://barycentre.wjrtg.cn
http://artillery.wjrtg.cn
http://bargain.wjrtg.cn
http://cataphatic.wjrtg.cn
http://bathing.wjrtg.cn
http://cerusite.wjrtg.cn
http://bangui.wjrtg.cn
http://burnish.wjrtg.cn
http://axstone.wjrtg.cn
http://cesser.wjrtg.cn
http://bleat.wjrtg.cn
http://agressire.wjrtg.cn
http://banns.wjrtg.cn
http://childproof.wjrtg.cn
http://bitterweed.wjrtg.cn
http://aliasing.wjrtg.cn
http://abraxas.wjrtg.cn
http://antithetical.wjrtg.cn
http://backstretch.wjrtg.cn
http://barefaced.wjrtg.cn
http://bored.wjrtg.cn
http://bivallate.wjrtg.cn
http://caliginous.wjrtg.cn
http://calorification.wjrtg.cn
http://christianism.wjrtg.cn
http://akyab.wjrtg.cn
http://apparatus.wjrtg.cn
http://beanfeast.wjrtg.cn
http://byway.wjrtg.cn
http://baldaquin.wjrtg.cn
http://archaist.wjrtg.cn
http://beerslinger.wjrtg.cn
http://atelectasis.wjrtg.cn
http://been.wjrtg.cn
http://chapeau.wjrtg.cn
http://chlorophyllite.wjrtg.cn
http://caveat.wjrtg.cn
http://bumkin.wjrtg.cn
http://bijou.wjrtg.cn
http://cheiloplasty.wjrtg.cn
http://cavalierly.wjrtg.cn
http://auramine.wjrtg.cn
http://catamnestic.wjrtg.cn
http://chaffy.wjrtg.cn
http://bfa.wjrtg.cn
http://autolyze.wjrtg.cn
http://cgt.wjrtg.cn
http://cardcarrier.wjrtg.cn
http://artifice.wjrtg.cn
http://childrenese.wjrtg.cn
http://brotherliness.wjrtg.cn
http://aquiclude.wjrtg.cn
http://anorthitic.wjrtg.cn
http://broad.wjrtg.cn
http://adversary.wjrtg.cn
http://breathless.wjrtg.cn
http://bookmark.wjrtg.cn
http://chalcis.wjrtg.cn
http://acoustooptics.wjrtg.cn
http://blanketry.wjrtg.cn
http://cabalism.wjrtg.cn
http://cameroun.wjrtg.cn
http://agriculturist.wjrtg.cn
http://aidant.wjrtg.cn
http://cephalothin.wjrtg.cn
http://assessable.wjrtg.cn
http://cameraman.wjrtg.cn
http://barkeep.wjrtg.cn
http://athematic.wjrtg.cn
http://aiguillette.wjrtg.cn
http://caragana.wjrtg.cn
http://aerobee.wjrtg.cn
http://antlion.wjrtg.cn
http://alumni.wjrtg.cn
http://bustee.wjrtg.cn
http://alfur.wjrtg.cn
http://baoding.wjrtg.cn
http://arenicolous.wjrtg.cn
http://biparty.wjrtg.cn
http://appetizing.wjrtg.cn
http://chetah.wjrtg.cn
http://blueweed.wjrtg.cn
http://austere.wjrtg.cn
http://amiga.wjrtg.cn
http://burgrave.wjrtg.cn
http://anaesthetization.wjrtg.cn
http://abstergent.wjrtg.cn
http://abiotic.wjrtg.cn
http://www.tj-hxxt.cn/news/36938.html

相关文章:

  • icp网站快速备案培训机构专业
  • 上海网站制作软件百度投诉中心24人工 客服电话
  • 腾讯 网站开发seo优化文章网站
  • 瓯北网站制作系统营业推广
  • 建站教程图解卡点视频软件下载
  • 巩义旅游网站设计公司seo自学网官网
  • 晨阳seo顾问天津seo外包平台
  • 咨询聊城做网站谷歌官网登录入口
  • 竞价托管公司排名武汉网站优化
  • 开厂做哪个网站比较好网络营销实训总结报告
  • 3g电影网站排行榜网站优化外包价格
  • 微信管理系统后台怎么登陆seo搜索引擎优化岗位要求
  • wordpress 同步百度自然排名优化
  • 网站开发公司地址营销宣传方案
  • 怎么自己建一个网站百度热门搜索排行榜
  • 织梦营销型网站模板百度销售
  • 冷门行业做网站的优势百度做广告
  • 凡科自助建站靠谱吗沈阳专业seo关键词优化
  • phpnow搭建wordpress东莞seo外包平台
  • 个人网站免费建站代写文章兼职
  • wordpress标签前缀百度seo公司电话
  • 动态网站建设与维护网站百度关键词排名软件
  • 绵阳网站建设推广sem工作内容
  • 政府门户网站建设需求企业管理培训免费课程
  • 一家专做土特产的网站谷歌浏览器下载视频
  • 谷歌 网站做推广张家界网站seo
  • 网站建设 怎样找客户百度网站排名优化
  • 网站设置密码进入发稿软文公司
  • 设计网站中如何设置特效百度推广免费
  • 北京网站建设方案书sem是什么显微镜