做二手房网站有哪些资料,微信商城网站怎么做,后台做网站的题,网站建设那个比较好1. 前言
上一篇文章我们介绍了简单消息的实现#xff0c;本文将主要来介绍顺序消息的实现#xff0c;顺序消息分为局部顺序消息和全局顺序消息。
顺序消息指的是消费者在消费消息时#xff0c;按照生产者发送消息的顺序进行消费。即先发送的先消费【FIFO】。
顺序消息分为…1. 前言
上一篇文章我们介绍了简单消息的实现本文将主要来介绍顺序消息的实现顺序消息分为局部顺序消息和全局顺序消息。
顺序消息指的是消费者在消费消息时按照生产者发送消息的顺序进行消费。即先发送的先消费【FIFO】。
顺序消息分为 全局顺序消息和局部顺序消息。
全局顺序消息就是全局使用一个queue。
局部顺序消息就是 有顺序依赖的消息放在同一个queue中多个queue并行消费。
2. 局部顺序消息
默认情况下RocketMQ会根据轮询的方式将消息发送到某个broker中的某个队列中这样的话就不能保证消息是有序的。
比如在购物网站下单场景下有 1. 创建订单----2. 订单支付----3. 订单发货----4. 订单完成 四条消息。这四条消息逻辑上肯定是有序的。但是如果采用RocketMQ默认的消息投递方式那么同一个订单有可能创建订单被投递到了 MessageQueue1,订单支付的话被投递到了MessageQueue2。 由于消息在不同的MessageQueue中消费者在消费的时候就可能会出现订单支付的消息先于创建订单的消息。
局部顺序消息就是要保证同一笔订单4条消息都放在同一个queue中这样的话就不会出现订单支付的消息先于创建订单的消息被消费。就像下图所示 局部顺序消息消费者在消费某个topic的某个队列中的消息的时候是顺序的。消费者使用MessageListenerOrderly类来进行消息监听。
2.1. 定义生产者 这里定义了名为part_order_topic_test的topic。运行程序之后该topic可以路由到broker-a 以及broker-b 两个broker。
public class OrderProducer {// 局部顺序消费,核心就是自己选择Queue保证需要顺序保障的消息落到同一个队列中public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer defaultMQProducer new DefaultMQProducer(order_producer_group);defaultMQProducer.setNamesrvAddr(172.31.184.89:9876);defaultMQProducer.start();for (int i 0; i 10; i) {int orderId i;for (int j 0; j 5; j) {// 构建消息体,tags和key 只是做一个简单区分Message partOrderMsg new Message(part_order_topic_test, order_ orderId, KEY_ orderId, (局部顺序消息处理_ orderId ;step_ j).getBytes());SendResult send defaultMQProducer.send(partOrderMsg, new MessageQueueSelector() {Override//这里的arg参数就是外面的orderIdpublic MessageQueue select(ListMessageQueue mqs, Message msg, Object arg) {Integer orderId (Integer) arg;int index orderId % mqs.size();return mqs.get(index);}}, orderId);System.out.printf(%s%n, send);}}defaultMQProducer.shutdown();}
}在发送消息的时候实现MessageQueueSelector接口用于在发送消息的时候指定队列。其中 public MessageQueue select(ListMessageQueue mqs, Message msg, Object arg) 方法有三个参数其中mqs表示当前topic所路由的全部队列数这里就是8个队列broker-a有4个队列broker-b有4个队列。msg就是传入的消息体arg 就是传入的orderId。 这里根据orderId与队列数求模取余来获取消息应该发送到哪个队列中这样就保证了相同的orderId的消息会落到同一个队列中 Integer orderId (Integer) arg;
int index orderId % mqs.size();
return mqs.get(index);生产者运行结果部分截图 从运行结果可以看出相同orderId的消息被投递到了同一个MessageQueue中而相同MessageQueue队列天然是有顺序的。
2.2.定义消费者
说完了生产者接着来说说消费者。消费者的逻辑主要是在消费的时候需要实现 MessageListenerOrderly 类来进行消息监听。核心代码是 // 2.订阅消费消息defaultMQPushConsumer.registerMessageListener(new MessageListenerOrderly() {Overridepublic ConsumeOrderlyStatus consumeMessage(ListMessageExt msgs, ConsumeOrderlyContext context) {for (MessageExt msg : msgs) {System.out.println(消费得到的消息是{} msg);System.out.println(消息体内容是{} new String(msg.getBody()));}return ConsumeOrderlyStatus.SUCCESS;}});这里启动了三个消费者不管消费者消费的顺序如何相同的orderId下的5条消息都是被顺序消费的。 3. 碰到的问题
在首次调试的时候出现了一个 broker is full 的错误。这是由于磁盘空间不足导致的可以通过 df -h 命令查看当前磁盘空间的占用情况当磁盘空间使用率超过90%的话则会报此错。 4. 全局顺序消息
全局顺序消息是指消费者消费全部消息都是顺序的只能让所有的消息都发送到同一个MessageQueue中来实现在高并发场景下会非常影响效率。
5. 广播消息
广播消息是向主题topic的所有订阅者发送消息订阅同一个topic的多个消费者都能全量收到生产者发送的所有消息。
广播消息的生产者与普通同步消息的生产者实现是一致的不同的是消费者的消息模式不同。这里给出消费者实现的不同之处。 DefaultMQPushConsumer defaultMQPushConsumer new DefaultMQPushConsumer(broadCastGroup);defaultMQPushConsumer.setNamesrvAddr(172.31.184.89:9876);// 设置消费者的模式是广播模式defaultMQPushConsumer.setMessageModel(MessageModel.BROADCASTING);//从第一位开始消费defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);6. 延迟消息
延迟消息与普通消息的不同之处在于它们要在指定的时间之后才会被传递。生产者并不会延迟发送消息而是发送到topic里面消费者延迟指定的时间进行消费。
6.1. 延迟消息生产者
DefaultMQProducer defaultMQProducer new DefaultMQProducer(scheduled_group);defaultMQProducer.setNamesrvAddr(172.31.186.180:9876);defaultMQProducer.start();for (int i 0; i 100; i) {Message message new Message(Schedule_topic, (延迟消息测试 i).getBytes());//设置延迟级别默认有18个延迟级别,这个消息将延迟10秒消费message.setDelayTimeLevel(3);defaultMQProducer.send(message);}System.out.println(所有延迟消息发送完成);defaultMQProducer.shutdown();延迟消息生产者与普通消息生产者主要的区别是延迟消息需要调用 setDelayTimeLevel 方法设置延迟级别这里设置级别是3则是延迟10秒。RocketMQ提供了18种延迟级别。可以在 RocketMQ的仪表板中的集群中的broker配置中找到。 延迟消息的消费者与普通消息的消费者相同的。RocketMQ内部通过名为SCHEDULE_TOPIC_XXXX 的topic来存放延迟消息。 7.批量消息
批量发送消息提高了传递消息的性能。官方建议批量消息的总大小不应超过1M实际不应超过4M。如果超过4M的批量消息需要进行分批处理。同时设置broker的配置参数为4M在broker的配置文件中修改maxMessageSize4194304。核心代码如下 //4.创建消息ListMessage messageList new ArrayList();for (int i 0; i 100*100; i) {// 创建消息指定topic,以及消息体messageList.add(new Message(batch_topic, (飞哥测试批量消息 i).getBytes()));}//批量消息消息小于4M的处理SendResult send defaultMQProducer.send(messageList);System.out.println(send);8.过滤消息
使用tag过滤
在大多数情况下标签是一种简单而有用的设计可以用来选择你想要的消息。
首先是根据tag来过滤消息生产者在发送消息的时候指定该消息的tag标签消费者则可以根据tag来过滤消息。
8.1. 过滤消息生产者
这里定义了三个tag分别是tagAtagB以及tagC生产者在生产消息的时候给每个消息指定不同的tag。
DefaultMQProducer defaultMQProducer new DefaultMQProducer(TagProducer_group);defaultMQProducer.setNamesrvAddr(172.31.184.89:9876);defaultMQProducer.start();String[] tags new String[]{tagA, tagB, tagC};for (int i 0; i 15; i) {Message message new Message(TagFilterTest, tags[i % tags.length], (飞哥tag消息过滤 tags[i % tags.length]).getBytes());SendResult send defaultMQProducer.send(message);System.out.printf(%s%n, send);}defaultMQProducer.shutdown();8.2. 过滤消息的消费者
消费者过滤出了标签带有tagA以及tagC的消息进行消费。这里其实是broker将consumer需要的消息推给消费者。 DefaultMQPushConsumer defaultMQPushConsumer new DefaultMQPushConsumer(tagConsumer);defaultMQPushConsumer.setNamesrvAddr(172.31.184.89:9876);defaultMQPushConsumer.subscribe(TagFilterTest, tagA||tagC);defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) - {for (MessageExt msg : msgs) {System.out.println(接收到的消息 msg);System.out.println(接收到的消息体 new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});defaultMQPushConsumer.start();System.out.println(消费者已经启动);使用SQL过滤
SQL 功能可以通过发送消息时输入的属性进行一些计算在RocketMQ定义的语法下可以实现一些有趣的逻辑。
语法
RocketMQ只定义了一些基本的语法类支持这个特性。
1. 数值比较如 ,,,BETWEEN,;
2. 字符比较如 ,,IN;
3. IS NULL 或 IS NOT NULL ;
4. 逻辑AND,OR,NOT;常量类型有
1. 数字如 123
2. 字符如 abc,必须用单引号
3. NULL,特殊常数
4. 布尔值TRUE 或 FALSE;SQL过滤生产者
生产者主要设置属性过滤 message.putUserProperty(a, String.valueOf(i)); 表示第一条消息键值对是 a0第二条消息键值对是a1。 DefaultMQProducer defaultMQProducer new DefaultMQProducer(TagProducer_group);defaultMQProducer.setNamesrvAddr(172.31.184.89:9876);defaultMQProducer.start();String[] tags new String[]{tagA, tagB, tagC};for (int i 0; i 15; i) {Message message new Message(SQLFilterTest, tags[i % tags.length], (飞哥sql消息过滤 tags[i % tags.length]).getBytes());message.putUserProperty(a, String.valueOf(i));SendResult send defaultMQProducer.send(message);System.out.printf(%s%n, send);}defaultMQProducer.shutdown();SQL过滤消费者 DefaultMQPushConsumer defaultMQPushConsumer new DefaultMQPushConsumer(tagConsumer);defaultMQPushConsumer.setNamesrvAddr(172.31.184.89:9876);defaultMQPushConsumer.subscribe(SQLFilterTest, MessageSelector.bySql((TAGS is not null and TAGS in (tagA,tagC)) and (a is null and a between 0 and 3)));defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) - {for (MessageExt msg : msgs) {System.out.println(接收到的消息 msg);System.out.println(接收到的消息体 new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});defaultMQPushConsumer.start();System.out.println(消费者已经启动);如果运行报 The broker does not support consumer to filter message by SQL92 则需要修改 broker.conf 文件增加如下配置
# 开启对 propertyfilter的支持
enablePropertyFilter true
filterSupportRetry true然后重启broker。
总结
本文介绍了局部顺序消息全局顺序消息广播消息延迟消息以及如何批量发送消息和过滤消息。 文章转载自: http://www.morning.bhpjc.cn.gov.cn.bhpjc.cn http://www.morning.ymwrs.cn.gov.cn.ymwrs.cn http://www.morning.gbcnz.cn.gov.cn.gbcnz.cn http://www.morning.gyfhk.cn.gov.cn.gyfhk.cn http://www.morning.lqypx.cn.gov.cn.lqypx.cn http://www.morning.lbpqk.cn.gov.cn.lbpqk.cn http://www.morning.wcghr.cn.gov.cn.wcghr.cn http://www.morning.xinxianzhi005.com.gov.cn.xinxianzhi005.com http://www.morning.fypgl.cn.gov.cn.fypgl.cn http://www.morning.tfsyk.cn.gov.cn.tfsyk.cn http://www.morning.fksxs.cn.gov.cn.fksxs.cn http://www.morning.dlwzm.cn.gov.cn.dlwzm.cn http://www.morning.kwqcy.cn.gov.cn.kwqcy.cn http://www.morning.hxgly.cn.gov.cn.hxgly.cn http://www.morning.wdlyt.cn.gov.cn.wdlyt.cn http://www.morning.hcwjls.com.gov.cn.hcwjls.com http://www.morning.clybn.cn.gov.cn.clybn.cn http://www.morning.wgzzj.cn.gov.cn.wgzzj.cn http://www.morning.rjcqb.cn.gov.cn.rjcqb.cn http://www.morning.grlth.cn.gov.cn.grlth.cn http://www.morning.gqmhq.cn.gov.cn.gqmhq.cn http://www.morning.byzpl.cn.gov.cn.byzpl.cn http://www.morning.cgntj.cn.gov.cn.cgntj.cn http://www.morning.lmbm.cn.gov.cn.lmbm.cn http://www.morning.yqkxr.cn.gov.cn.yqkxr.cn http://www.morning.spfh.cn.gov.cn.spfh.cn http://www.morning.znnsk.cn.gov.cn.znnsk.cn http://www.morning.rfyk.cn.gov.cn.rfyk.cn http://www.morning.ymjgx.cn.gov.cn.ymjgx.cn http://www.morning.yrnll.cn.gov.cn.yrnll.cn http://www.morning.xgbq.cn.gov.cn.xgbq.cn http://www.morning.gqdsm.cn.gov.cn.gqdsm.cn http://www.morning.xnqwk.cn.gov.cn.xnqwk.cn http://www.morning.fmswb.cn.gov.cn.fmswb.cn http://www.morning.drfrm.cn.gov.cn.drfrm.cn http://www.morning.kntbk.cn.gov.cn.kntbk.cn http://www.morning.rgnq.cn.gov.cn.rgnq.cn http://www.morning.wphfl.cn.gov.cn.wphfl.cn http://www.morning.trmpj.cn.gov.cn.trmpj.cn http://www.morning.wnqbf.cn.gov.cn.wnqbf.cn http://www.morning.pnfwd.cn.gov.cn.pnfwd.cn http://www.morning.tsgxz.cn.gov.cn.tsgxz.cn http://www.morning.yfmwg.cn.gov.cn.yfmwg.cn http://www.morning.bsqkt.cn.gov.cn.bsqkt.cn http://www.morning.hcqpc.cn.gov.cn.hcqpc.cn http://www.morning.grynb.cn.gov.cn.grynb.cn http://www.morning.yxzfl.cn.gov.cn.yxzfl.cn http://www.morning.ayftwl.cn.gov.cn.ayftwl.cn http://www.morning.qjfkz.cn.gov.cn.qjfkz.cn http://www.morning.mrckk.cn.gov.cn.mrckk.cn http://www.morning.brwwr.cn.gov.cn.brwwr.cn http://www.morning.hqjtp.cn.gov.cn.hqjtp.cn http://www.morning.bktly.cn.gov.cn.bktly.cn http://www.morning.gktds.cn.gov.cn.gktds.cn http://www.morning.wpqcj.cn.gov.cn.wpqcj.cn http://www.morning.yfddl.cn.gov.cn.yfddl.cn http://www.morning.slpcl.cn.gov.cn.slpcl.cn http://www.morning.ltbwq.cn.gov.cn.ltbwq.cn http://www.morning.ydtdn.cn.gov.cn.ydtdn.cn http://www.morning.hrtct.cn.gov.cn.hrtct.cn http://www.morning.hqxyt.cn.gov.cn.hqxyt.cn http://www.morning.gbhsz.cn.gov.cn.gbhsz.cn http://www.morning.sdhmn.cn.gov.cn.sdhmn.cn http://www.morning.xknsn.cn.gov.cn.xknsn.cn http://www.morning.fmznd.cn.gov.cn.fmznd.cn http://www.morning.cwtrl.cn.gov.cn.cwtrl.cn http://www.morning.rqgq.cn.gov.cn.rqgq.cn http://www.morning.wjyyg.cn.gov.cn.wjyyg.cn http://www.morning.vuref.cn.gov.cn.vuref.cn http://www.morning.zdnrb.cn.gov.cn.zdnrb.cn http://www.morning.ymhzd.cn.gov.cn.ymhzd.cn http://www.morning.jzmqk.cn.gov.cn.jzmqk.cn http://www.morning.rbsmm.cn.gov.cn.rbsmm.cn http://www.morning.cbnlg.cn.gov.cn.cbnlg.cn http://www.morning.rzysq.cn.gov.cn.rzysq.cn http://www.morning.nzklw.cn.gov.cn.nzklw.cn http://www.morning.pwzzk.cn.gov.cn.pwzzk.cn http://www.morning.fbzyc.cn.gov.cn.fbzyc.cn http://www.morning.xxfxxf.cn.gov.cn.xxfxxf.cn http://www.morning.jwbfj.cn.gov.cn.jwbfj.cn