当前位置: 首页 > news >正文

贺州市住房和城乡建设局网站网上智慧团建官网

贺州市住房和城乡建设局网站,网上智慧团建官网,免费域名注册网站,个人社保缴费基数查询分布式消息系统 RocketMQ概念#xff0c;用途#xff0c;特性安装RocketMQ掌握RocketMQ的api使用对producer、consumer进行详解了解RocketMQ的存储特点 简介及相关概念JavaAPISpringBoot整合RocketMQ消息的顺序收发消息系统的事务、存储、重试策略消息系统的集群 RocketMQ R… 分布式消息系统 RocketMQ概念用途特性安装RocketMQ掌握RocketMQ的api使用对producer、consumer进行详解了解RocketMQ的存储特点 简介及相关概念JavaAPISpringBoot整合RocketMQ消息的顺序收发消息系统的事务、存储、重试策略消息系统的集群 RocketMQ RocketMQ简介 采用java开发的分布式消息系统由阿里开发 地址http://rocketmq.apache.org/ 历史发展 阿里中间件Notify用于交易核心信息的流转2010年B2B开始大规模使用ActiveMQ作为消息内核急需支持顺序消息、拥有海量消息堆积能力的消息中间件——MetaQ 1.0 2011诞生2012 年MetaQ发展到了3.0版本抽象除了通用的消息引擎RorcketMQ2015年RocketMQ进过双十一在可用性可靠性和稳定性等方面都有出色表现。阿里消息中间件基于RocketMQ退出Aliware MQ1.0开始为阿里云上的企业提供消息服务2016年RocketMQ进入Apache孵化 概念 Producer 消息生产者生产消息一般由业务系统负责产生消息Producer Group一类Producer的集合名称这类Producer通常发送同一类消息且发送逻辑一致 Consumer 消费者负责消费消息一般由后台系统负责异步消费 分类 Push Consumer消费端被动接收由服务端Push的消息Pull Consumer消费端主动向服务端定时拉取消息 Consmer Group一类Consumer的集合名称这类Producer通常发送同一类消息且发送逻辑一致 Broker RocketMQ的核心消息的发送、接收、高可用等需要定时发送自身情况到NameServer,默认10s发送一次超过2分钟会认为该broker失效 NameServer 集群中的组织协调员收集broker的工作情况不负责消息的处理 Topic【逻辑概念】 不同类型的消息以不同的Topic名称进行区分如User、Order等Message Queue 消息队列用于存储消息 下载部署 非docker 下载地址https://archive.apache.org/dist/rocketmq/4.3.2/rocketmq-all-4.3.2-bin-release.zip cd /opt unzip rocketmq-all-4.3.2-bin-release.zip cd rocketmq-all-4.3.2-bin-release/# 启动nameserver bin/mqnamesrv #The Name Server boot success. serializeTypeJSON # 看到这个说明nameserver启动成功#启动broker bin/mqbroker -n 8.140.130.91:9876 #-n指定nameserver地址和端口 Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000005c0000000, 8589934592, 0) failed; errorCannot allocate memory (errno12)启动错误因为RocketMQ的配置默认是生产环境的配置设置jvm的内存值比较大需要调整默认值 #调整默认的内存大小参数 cd bin/ JAVA_OPT${JAVA_OPT} -server -Xms128m -Xmx128m -Xmn128m -XX:MetaspaceSize128m -XX:MaxMetaspaceSize128mvim runbroker.sh JAVA_OPT${JAVA_OPT} -server -Xms128m -Xmx128m -Xmn128m#重新启动测试 bin/mqbroker -n 8.140.130.91:9876 The broker[iZ2zeg4pktzjhp9h7wt6doZ, 172.17.0.1:10911] boot success. serializeTypeJSON and name server is 8.140.130.91:9876#启动成功发送消息测试 export NAMESRV_ADDR127.0.0.1:9876 cd /opt/rocketmq-all-4.3.2-bin-release/bin sh tools.sh org.apache.rocketmq.example.quickstart.Producer接收消息测试 sh tools.sh org.apache.rocketmq.example.quickstart.Consumerjava api测试 依赖 dependenciesdependencygroupIdorg.apache.rocketmq/groupIdartifactIdrocketmq-client/artifactIdversion4.3.2/version/dependency /dependenciesbuildpluginsplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-compiler-plugin/artifactIdversion3.2/versionconfigurationsource1.8/sourcetarget1.8/targetencodingUTF-8/encoding/configuration/plugin/plugins /build测试代码 package com.rocketmq;import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException;import java.io.UnsupportedEncodingException;public class SyncProducer {public static void main(String[] args) throws Exception {//Instantiate with a producer group name.DefaultMQProducer producer new DefaultMQProducer(test-group);//specify name server addressproducer.setNamesrvAddr(8.140.130.91:9876);//Lanuch the instanceproducer.start();for (int i 0; i 100; i) {//create message instance ,specify topic,tag and message bodyMessage msg new Message(TopicTest1,/*topic*/TAGA,/*tag*/(Hello RocketMQ i).getBytes(RemotingHelper.DEFAULT_CHARSET)/*message body*/);//Call send message to deliver message to one of brokers.SendResult sendResult producer.send(msg);System.out.printf(%s%n, sendResult);}//Shut down once the producer instance is not longer in use.producer.shutdown();} }发现报错 原因 broker的ip地址是172.17.0.1,为私有ip所以不可访问 解决修改broker配置文件,指定broker 的ip地址 cd /opt/rocketmq-all-4.3.2-bin-release/conf vim broker.confbrokerIP18.140.130.91 namesrvAddr8.140.130.91:9876 brokerNamebroker_haoke_im#启动broker通过 -c 指定配置文件 cd /opt/rocketmq-all-4.3.2-bin-release/ bin/mqbroker -c /opt/rocketmq-all-4.3.2-bin-release/conf/broker.confAPI测试成功 通过docker部署 #拉取镜像 docker pull foxiswho/rocketmq:server-4.3.2 docker pull foxiswho/rocketmq:broker-4.3.2#创建nameserver容器 docker create -p 9876:9876 --name rmqserver \ -e JAVA_OPT_EXT-server -Xms128m -Xmx128m -Xmn128m \ -e JAVA_OPTS-Duser.home/opt \ -v /data/rmq-data/rmqserver/logs:/opt/logs \ -v /data/rmq-data/rmqserver/store:/opt/store \ foxiswho/rocketmq:server-4.3.2#创建broker容器 #10911 生产者消费者端口 #10909 搭建集群主从端口 docker create -p 10911:10911 -p 10909:10909 --name rmqbroker \ -e JAVA_OPTS-Duser.home/opt \ -e JAVA_OPT_EXT-server -Xms128m -Xmx128m -Xmn128m \ -v /data/rmq-data/rmqbroker/conf/broker.conf:/etc/rocketmq/broker.conf \ -v /data/rmq-data/rmqbroker/logs:/opt/logs \ -v /data/rmq-data/rmqbroker/store:/opt/store \ foxiswho/rocketmq:broker-4.3.2#启动容器 docker start rmqserver rmqbroker#停止删除容器 docker stop rmqbroker rmqserver docker rm rmqbroker rmqserverbroker配置文件 #broker名 brokerNamebroker_haoke_im #broker IP brokerIP18.140.130.91 #当前broker托管的NameServer地址 namesrvAddr8.140.130.91:9876 #开启自定义属性支持 enablePropertyFiltertrue部署RocketMQ的管理工具 UI管理工具rocketmq-console,项目地址https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console #拉取镜像 docker pull apacherocketmq/rocketmq-console:2.0.0#创建并启动容器 docker run -e JAVA_OPTS-Drocketmq.config.namesrvAddr8.140.130.91:9876 -Drocketmq.config.isVIPChannelfalse -p 8082:8080 -t apacherocketmq/rocketmq-console:2.0.0访问http://8.140.130.91:8082/ Java API基本使用 创建topic package com.rocketmq;import org.apache.rocketmq.client.producer.DefaultMQProducer;public class TopicDemo {public static void main(String[] args) throws Exception{//设置NameServer地址DefaultMQProducer producer new DefaultMQProducer(test-group);//设置producer 的NameServerAddressproducer.setNamesrvAddr(8.140.130.91:9876);//启动NameServerproducer.start();/** 创建topic* param key broker name* param newTopic topic name* param queueNum topics queue number* */producer.createTopic(broker_haoke_im,test_topic,8);System.out.println(topic创建成功);producer.shutdown();} }发送消息 消息的属性 字段名默认 值说明Topicnull必填线下环境不需要申请线上环境需要申请后才能使用Bodynull必填二进制形式序列化由应用决定Producer 与 Consumer 要协商好 序列化形式。Tagsnull选填类似于 Gmail 为每封邮件设置的标签方便服务器过滤使用。目前只 支持每个消息设置一个 tag所以也可以类比为 Notify 的 MessageType 概 念Keysnull选填代表这条消息的业务关键词服务器会根据 keys 创建哈希索引设置 后可以在 Console 系统根据 Topic、Keys 来查询消息由于是哈希索引 请尽可能保证 key 唯一例如订单号商品 Id 等。Flag0选填完全由应用来设置RocketMQ 不做干预DelayTimeLevel0选填消息延时级别0 表示不延时大于 0 会延时特定的时间才会被消费WaitStoreMsgOKTRUE选填表示消息是否在服务器落盘后才返回应答。 同步 package com.rocketmq.message;import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message;public class SyncMessage {public static void main(String[] args) throws Exception{DefaultMQProducer producer new DefaultMQProducer(test-group);producer.setNamesrvAddr(8.140.130.91:9876);producer.start();String msgStr 测试消息1;/** String topic, String tags, byte[] body* */Message message new Message(test_topic,test,msgStr.getBytes(UTF-8));SendResult result producer.send(message);System.out.println(result);System.out.println(消息状态 result.getSendStatus());System.out.println(消息id result.getMsgId());System.out.println(消息queue result.getMessageQueue());System.out.println(消息offset result.getQueueOffset());producer.shutdown();} }异步 与同步区别在于回调函数的执行是滞后的主程序是顺序执行的 package com.rocketmq.message;import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message;public class AsyncMessage {public static void main(String[] args) throws Exception{DefaultMQProducer producer new DefaultMQProducer(test-group);producer.setNamesrvAddr(8.140.130.91:9876);producer.start();String msgStr 异步消息发送测试;/** String topic, String tags, byte[] body* */Message message new Message(test_topic,test,msgStr.getBytes(UTF-8));producer.send(message, new SendCallback() {Overridepublic void onSuccess(SendResult result) {System.out.println(result);System.out.println(消息状态 result.getSendStatus());System.out.println(消息id result.getMsgId());System.out.println(消息queue result.getMessageQueue());System.out.println(消息offset result.getQueueOffset());}Overridepublic void onException(Throwable e) {System.out.println(消息发送失败);}});// producer.shutdown()要注释掉否则发送失败。原因是异步发送还未来得及发送就被关闭了//producer.shutdown();} }消费信息 package com.rocketmq.consumer;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt;import java.io.UnsupportedEncodingException; import java.util.List;public class ConsumerDemo {public static void main(String[] args) throws Exception{/** push类型的消费者被动接收从broker推送的消息* */DefaultMQPushConsumer consumer new DefaultMQPushConsumer(test-group);consumer.setNamesrvAddr(8.140.130.91:9876);//订阅topic接收此topic下的所有消息consumer.subscribe(test_topic,*);consumer.registerMessageListener(new MessageListenerConcurrently() {//并发读取消息Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {try {System.out.println(new String(msg.getBody(),UTF-8));} catch (UnsupportedEncodingException e) {e.printStackTrace();}}System.out.println(收到消息-msgs);/** 返回给broker消费者的接收情况* CONSUME_SUCCESS 接收成功* RECONSUME_LATER 延时重发* */return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();} }测试接收历史消息 测试接收实时消息 消息的订阅方式 可以通过tag区分不同类型 #生产者 Message message new Message(test_topic,add,msgStr.getBytes(UTF-8));#消费者 //完整匹配 consumer.subscribe(test_topic,add); //或匹配 consumer.subscribe(test_topic,add || delete);消息过滤器 RocketMQ支持根据用户自定义属性进行过滤 类似与SQL MessageSelector.bySql(“age20 AND sex‘女’”)); 消息发送方 package com.rocketmq.filter;import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message;/*** author Auspice Tian* time 2021-04-04 15:10* current example-roketmq-com.rocketmq.filter*/ public class SyncProducer {public static void main(String[] args) throws Exception{DefaultMQProducer producer new DefaultMQProducer(test-group);producer.setNamesrvAddr(8.140.130.91:9876);producer.start();String msgStr 发送测试;Message msg new Message(test_topic,test,msgStr.getBytes(UTF-8));msg.putUserProperty(age,18);msg.putUserProperty(sex,女);SendResult result producer.send(msg);System.out.println(消息状态result.getSendStatus());System.out.println(消息id result.getMsgId());System.out.println(消息queueresult.getMessageQueue());System.out.println(消息offsetresult.getQueueOffset());producer.shutdown();} }消息接收方 package com.rocketmq.filter;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.MessageSelector; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt;import java.io.UnsupportedEncodingException; import java.util.List;public class ConsumerFilter {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer new DefaultMQPushConsumer(test-group);consumer.setNamesrvAddr(8.140.130.91:9876);consumer.subscribe(test_topic, MessageSelector.bySql(age20 AND sex女));consumer.registerMessageListener(new MessageListenerConcurrently() {//并发读取消息Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {try {System.out.println(new String(msg.getBody(),UTF-8));} catch (UnsupportedEncodingException e) {e.printStackTrace();}}System.out.println(收到消息-msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();} }测试 消息发送成功但是由于不满足条件被过滤器过滤消费者未接收到 修改生产者自定义属性 Message msg new Message(test_topic,test,msgStr.getBytes(UTF-8)); msg.putUserProperty(age,21); msg.putUserProperty(sex,女);可以接收到消息 消息的顺序发送与接收 原理 消息的顺序收发需要消费者与生产者二者配合 生产者发送的顺序消息都要放在同一消息队列中才能保证被顺序取出消费者接收的顺序消息需要从同一队列中获取 生产者 package com.rocketmq.order;import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message;public class OrderProducer {public static void main(String[] args) throws Exception{DefaultMQProducer producer new DefaultMQProducer(test-group);producer.setNamesrvAddr(8.140.130.91:9876);producer.start();for (int i 0; i 100; i) {int orderId i % 10;//生产10个订单的消息,每个订单10条消息String msgStr order--i orderId-- orderId;Message message new Message(test_topic,ORDER_MSG,msgStr.getBytes(UTF-8));/** public SendResult send(Message msg, MessageQueueSelector selector, Object arg)* MessageQueue select(final ListMessageQueue mqs, final Message msg, final Object arg);* */SendResult sendResult producer.send(message,(mqs,msg,arg)-{//匿名函数的作用为选择消息队列的idInteger id (Integer) arg;int index id % mqs.size();return mqs.get(index);},//arg与orderId对应orderId);System.out.println(sendResult);}producer.shutdown();} }消费者 public class OrderConsumer {public static void main(String[] args) throws Exception{DefaultMQPushConsumer consumer new DefaultMQPushConsumer(test-order-group);consumer.setNamesrvAddr(8.140.130.91:9876);consumer.subscribe(test_order_topic,*);consumer.registerMessageListener(new MessageListenerOrderly() {//顺序读取消息Overridepublic ConsumeOrderlyStatus consumeMessage(ListMessageExt msgs, ConsumeOrderlyContext context) {for (MessageExt msg : msgs) {try {System.out.println(Thread.currentThread().getName() msg.getQueueId() new String(msg.getBody(),UTF-8));} catch (UnsupportedEncodingException e) {e.printStackTrace();}}return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();} }可见订单id为3的消息会存入同一消息队列故在同一消息队列的消息可被同一消费线程监听 消息系统的事务 分布式事务分类 基于单个JVM数据库分库分表基于多个JVM服务拆分基于多JVM服务拆分且数据库分库分表 原理 Half(Prepare) Message 消息系统暂时不能投递的消息发送方将消息发送到了MQ服务端。MQ服务端未收到生产者对消息的二次确认此时该消息被标记为 暂不能投递状态 处于该状态的消息称为 半消息 Message Status Check 由于网络闪断、生产者应用重启等原因导致某条事务消息的二次确认丢失MQ服务端发现某条消息长期处于 半消息需要主动向消息生产者询问该消息的状态 发送方向MQ服务端发送消息 MQ Server将消息持久化成功后向发送方ACK确认消息已经发送成功此时消息为 半消息 发送方开始执行本地事务逻辑 发送方根据本地事务执行结果向MQ Server提交二次确认Commit或RollbackMQ Server 收到 Commit 则将半消息标记为 可投递订阅方最终收到该消息MQ Server收到 Rollback 则删除该半消息订阅方不会收到该消息 在断网或应用重启情况下上述4提交的二次确认最终未到达MQ Server经过固定时间后MQ Server将对该消息发起消息回查 发送方收到消息回查需要检查对应消息的本地事务执行的最终结果 发送方根据检查得到的本地事务的最终状态再次提交二次确认MQ Server仍按4对半消息进行确认 生产者 package com.rocketmq.trancation;public class TrancationProducer {public static void main(String[] args) throws Exception{TransactionMQProducer producer new TransactionMQProducer(test_transaction_producer);producer.setNamesrvAddr(8.140.130.91:9876);//设置事务监听器producer.setTransactionListener(new TransactionImpl());producer.start();//发送消息Message message new Message(pay_topic,用户A给用户B转钱.getBytes(UTF-8));producer.sendMessageInTransaction(message,null);Thread.sleep(99999);producer.shutdown();} }本地事务处理 package com.rocketmq.trancation;public class TransactionImpl implements TransactionListener {private static MapString, LocalTransactionState STATE_MAP new HashMap();/*** 本地执行业务具体的逻辑* param msg* param arg* return*/Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {try {Thread.sleep(500);System.out.println(用户A账户减500);// System.out.println(1/0);System.out.println(用户B账户加500元.);Thread.sleep(800);//二次提交确认STATE_MAP.put(msg.getTransactionId(),LocalTransactionState.COMMIT_MESSAGE);return LocalTransactionState.COMMIT_MESSAGE;} catch (InterruptedException e) {e.printStackTrace();}//回滚STATE_MAP.put(msg.getTransactionId(), LocalTransactionState.ROLLBACK_MESSAGE);return LocalTransactionState.ROLLBACK_MESSAGE;}/*** 消息回查* param msg* return*/Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {return STATE_MAP.get(msg.getTransactionId());} }消费者 package com.rocketmq.trancation;public class TransactionConsumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer new DefaultMQPushConsumer(test_transaction_consumer);consumer.setNamesrvAddr(8.140.130.91:9876);//订阅topic接收消息consumer.subscribe(pay_topic,*);consumer.registerMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {try {System.out.println(new String(msg.getBody(),UTF-8));} catch (UnsupportedEncodingException e) {e.printStackTrace();}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();} }测试 返回 commit 状态时消费者能够接收消息 返回 rollback 状态时消费者接收不到消息 消息回查测试 public class TransactionImpl implements TransactionListener {private static MapString, LocalTransactionState STATE_MAP new HashMap();/*** 本地执行业务具体的逻辑* param msg* param arg* return*/Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {try {System.out.println(用户A账户减500);Thread.sleep(500);// System.out.println(1/0);System.out.println(用户B账户加500元.);Thread.sleep(800);//二次提交确认STATE_MAP.put(msg.getTransactionId(),LocalTransactionState.COMMIT_MESSAGE);return LocalTransactionState.UNKNOW; // return LocalTransactionState.COMMIT_MESSAGE;} catch (Exception e) {e.printStackTrace();}//回滚STATE_MAP.put(msg.getTransactionId(), LocalTransactionState.ROLLBACK_MESSAGE);return LocalTransactionState.ROLLBACK_MESSAGE;}/*** 消息回查* param msg* return*/Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {System.out.println(状态回查-- msg.getTransactionId() STATE_MAP.get(msg.getTransactionId()));return STATE_MAP.get(msg.getTransactionId());} }Consumer Push和Pull模式 push模式客户端与服务端建立连接后当服务端有消息将消息推送到客户端pull模式客户端不断的轮询请求服务端来获取新的而消息 push模式需要消息系统与消费端之间建立长连接对消息系统是很大的负担所以在具体实现时都采用消费端主动拉取的方式即consumer轮询从broker拉取消息 在RocketMQ中push与pull的区别 PushDefaultPushConsumer 将轮询过程都封装了并注册MessageListener监听器取到消息后唤醒MessageListener监听器的consumeMessage()来消费对用户而言感觉消息是被推送来的。 Pull取消息过程需要自己写首先从目标topic中拿到MessageQueue集合并遍历然后针对每个MessageQueue批量取消息。一次Pull都要记录该队列的offset,知道去完MessageQueue再换另一个 长轮询保证Pull的实时性 长轮询长连接轮询客户端像传统轮询一样从服务端请求数据服务端会阻塞请求不会立刻返回直到有数据或超时才返回给客户端然后关闭连接客户端处理完响应信息后再向服务器发送新的请求 消息模式 DefaultMQPushConsumer实现了自动保存offset值及多个consumer的负载均衡 //设置组名 DefaultMQPushConsumer consumer new DefaultMQPushConsumer(HAOKE_IM);通过 groupname 将多个consumer组合在一起会存在消息的分配问题消息是发送到组还是每个消费者 集群模式默认 同一个ConsumerGroup里的每个Consumer只消费所订阅消息的一部分内容同一个ConsumerGroup里所有消费的内容合起来才是所订阅Topic内容的整体从而达到负载均衡的目的 广播模式 同一个ConsumerGroup里的每个Consumer都能消费到所订阅Topic的全部消息一个消息会被分发多次被多个Consumer消费 // 集群模式 consumer.setMessageModel(MessageModel.CLUSTERING); // 广播模式 consumer.setMessageModel(MessageModel.BROADCASTING);重复消息的解决方案 重复消息的产生情况 生产者不断发送重复消息到消息系统 网络不可达 只要通过网络交换数据就无法避免这个问题 由于接收到重复消息不可避免问题变为 消费端收到重复消息怎么处理 消费端处理消息的业务逻辑保持幂等性 幂等性无论执行多少次结果都一样 egwhile s!1在执行sql语句 保证每条消息都有唯一编号且保证消息处理成功与去重的日志同时出现 利用一张日志表来记录已经处理成功的消息的ID如果新到的消息ID已经在日志表中那么就不再处理这条消息 如果由消息系统来实现的话肯定会对消息系统的吞吐量和高可用有影响所以最好还是由业务端自己处理消息重复的问题这也是 RocketMQ不解决消息重复的问题 的原因 RocketMQ存储 RocketMQ中的消息数据存储采用了零拷贝技术mmap write方式文件系统采用 Linux Ext4文件系统进行存储。 消息数据的存储 在RocketMQ中消息数据是保存在磁盘文件中的使用RocketMQ尽可能保证顺序写入比随机写入效率高很多 ConsumeQueue索引文件存储数据指向物理文件的位置 CommitLog是真正存储数据的文件 消息主体及元数据都存储在CommitLog中 Consume Queue 是一个逻辑队列存储了这个Queue在CommitLog中的其实offset、log大小和MessageTag的hashcode 每次读取消息队列先读取ConsumerQueue然后再通过consumerQueue中拿到消息主体 同步刷盘和异步刷盘 RocketMQ为提高性能会尽可能保证磁盘的顺序读写。消息通过Producer写入RocketMQ的时候有两种写磁盘方式分别是同步刷盘与异步刷盘 同步刷盘——安全性 在返回写成功状态时消息已经写入磁盘执行流程消息写入内存的PAGECACHE后立刻通知刷盘线程刷盘等待刷盘完成刷盘线程执行完成后唤醒等待的线程返回消息写成功的状态 异步刷盘——效率 在返回写成功状态时消息可能只是被写入内存的PAGECACHE写操作的返回快吞吐量大当内存里的消息积累到一定程度统一触发写磁盘动作快速写入 修改刷盘方式 broker.conf flushDiskTypeASYNC_FLUSH——异步 flushDiskTypeSYNC_FLUSH——同步 重试策略 重试情况分析 在消息的发送和消费过程中都有可能出现错误如网络异常等出现了错误就需要进行错误重试这种消息的重试分为 producer端的重试 和 consumer端重试 producer端重试 指定重试次数指定超时时间 //消息发送失败时重试3次 producer.setRetryTimesWhenSendFailed(3);// 发送消息,并且指定超时时间 SendResult sendResult producer.send(msg, 1000);只有同步生产者才会进行错误重试。只有特定异常才会重试设置的超时时间小于实际执行时间则不会进行重试 #DefaultMQProducerImpl //设置发送总次数 int timesTotal communicationMode CommunicationMode.SYNC ? 1 this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;for (; times timesTotal; times) {try{if (timeout costTime) {callTimeout true;break;}}catch (RemotingException e) {...continue;}catch (MQClientException e) {...continue;}catch (MQBrokerException e){switch (e.getResponseCode()) {case ResponseCode.TOPIC_NOT_EXIST:case ResponseCode.SERVICE_NOT_AVAILABLE:case ResponseCode.SYSTEM_ERROR:case ResponseCode.NO_PERMISSION:case ResponseCode.NO_BUYER_ID:case ResponseCode.NOT_IN_CURRENT_UNIT:continue;}} }consumer端重试 消息处理的异常失败消息超时接收的超时失败 异常重试 消息正常到了消费者端处理失败发生异常。eg反序列化失败消息数据本身无法处理 消息状态 package org.apache.rocketmq.client.consumer.listener;public enum ConsumeConcurrentlyStatus {/*** Success consumption*/CONSUME_SUCCESS,/*** Failure consumption,later try to consume*/RECONSUME_LATER; }broker的启动日志 INFO main - messageDelayLevel1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h如果消息消费失败即broker收到 RECONSUME_LATER 则broker会对消息进行重试发送直至2h 演示 public class ConsumerDemo {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer new DefaultMQPushConsumer(test_consumer_group);consumer.setNamesrvAddr(8.140.130.91:9876);// 订阅topic接收此Topic下的所有消息consumer.subscribe(test_error_topic, *);consumer.registerMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs,ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {try {System.out.println(new String(msg.getBody(), UTF-8));} catch (UnsupportedEncodingException e) {e.printStackTrace();}}System.out.println(收到消息- msgs);if(msgs.get(0).getReconsumeTimes() 3){// 重试3次后不再进行重试return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}return ConsumeConcurrentlyStatus.RECONSUME_LATER;}});consumer.start();} }重试消息和原始发送消息不是同一条 timeout 由于消息没有从MQ发送到消费者上那么在MQ Server内部会不断的尝试发送这条消息直至发送成功位置 也就是服务端没有接收到消费端发来的消息的反馈定义为超时 RocketMQ的集群 集群模式 单个Master 风险较大一旦Broker重启或者宕机会导致整个服务不可用只做开发环境 多Master 一个集群无Slave全是Master例如2个Master或者3个Master单台机器宕机这台机器上未被消费的消息在机器恢复之前不可订阅消息的实时性受到影响 多Master多Slave异步复制 每个Master配置一个Slave有多个Master-Slave对HA双机集群系统采用异步复制方式主备有短暂消息延迟毫秒级优点即使磁盘损坏丢失的消息非常少实时性不会收到影响消费者仍可从Slave消费此过程对应用透明不需人工干预性能同多Master模式一样缺点Master宕机或磁盘损坏会丢失少量消息 多Master多Slave,同步双写 每个Master配置一个Slave有多个Master-Slave对HA双机集群系统采用同步双写方式主备都写成功向应用返回成功优点数据与服务无单点Master宕机情况下消息无延迟服务可用性和数据可用性非常高缺点性能比异步复制模式低 搭建2m2s集群 创建2个NameServer(master) #nameserver1 docker create -p 9876:9876 --name rmqserver01 \ -e JAVA_OPT_EXT-server -Xms128m -Xmx128m -Xmn128m \ -e JAVA_OPTS-Duser.home/opt \ -v /data/rmq-data/rmqserver01/logs:/opt/logs \ -v /data/rmq-data/rmqserver01/store:/opt/store \ foxiswho/rocketmq:server-4.3.2#nameserver2 docker create -p 9877:9876 --name rmqserver02 \ -e JAVA_OPT_EXT-server -Xms128m -Xmx128m -Xmn128m \ -e JAVA_OPTS-Duser.home/opt \ -v /data/rmq-data/rmqserver02/logs:/opt/logs \ -v /data/rmq-data/rmqserver02/store:/opt/store \ foxiswho/rocketmq:server-4.3.2搭建broker(2master) #broker01配置文件 namesrvAddr8.140.130.91:9876;8.140.130.91:9877 brokerClusterNameHaokeCluster brokerNamebroker01 brokerId0 deleteWhen04 fileReservedTime48 brokerRoleSYNC_MASTER flushDiskTypeASYNC_FLUSH brokerIP18.140.130.91 brokerIp28.140.130.91 listenPort11911#master broker01 docker create --net host --name rmqbroker01 \ -e JAVA_OPTS-Duser.home/opt \ -e JAVA_OPT_EXT-server -Xms128m -Xmx128m -Xmn128m \ -v /data/rmq-data/rmqbroker01/conf/broker.conf:/etc/rocketmq/broker.conf \ -v /data/rmq-data/rmqbroker01/logs:/opt/logs \ -v /data/rmq-data/rmqbroker01/store:/opt/store \ foxiswho/rocketmq:broker-4.3.2brokerId0表示主0表示Slave fileReservedTime消息保存时间 单位——h deleteWhen什么是时候对过期消息清理 24小时制 brokerRole[同步双写|异步双写]_[主] | [从] [SYNC|ASYNC_MASTER] | [SLAVE] flushDiskType刷盘方式 [同步|异步]_FLUSH [SYNC|ASYNC_FLUSH] brokerIP1访问broker的ip地址 brokerIP2主从同步的ip listenPort与客户端交互的端口(1-2) #broker02配置文件 namesrvAddr8.140.130.91:9876;8.140.130.91:9877 brokerClusterNameHaokeCluster brokerNamebroker02 brokerId0 deleteWhen04 fileReservedTime48 brokerRoleSYNC_MASTER flushDiskTypeASYNC_FLUSH brokerIP18.140.130.91 brokerIp28.140.130.91 listenPort11811#master broker02 docker create --net host --name rmqbroker02 \ -e JAVA_OPTS-Duser.home/opt \ -e JAVA_OPT_EXT-server -Xms128m -Xmx128m -Xmn128m \ -v /data/rmq-data/rmqbroker02/conf/broker.conf:/etc/rocketmq/broker.conf \ -v /data/rmq-data/rmqbroker02/logs:/opt/logs \ -v /data/rmq-data/rmqbroker02/store:/opt/store \ foxiswho/rocketmq:broker-4.3.2搭建从broker(slave) #slave broker01配置文件 namesrvAddr8.140.130.91:9876;8.140.130.91:9877 brokerClusterNameHaokeCluster brokerNamebroker01 brokerId1 deleteWhen04 fileReservedTime48 brokerRoleSLAVE flushDiskTypeASYNC_FLUSH brokerIP18.140.130.91 brokerIp28.140.130.91 listenPort11711#slave broker01 docker create --net host --name rmqbroker03 \ -e JAVA_OPTS-Duser.home/opt \ -e JAVA_OPT_EXT-server -Xms128m -Xmx128m -Xmn128m \ -v /data/rmq-data/rmqbroker03/conf/broker.conf:/etc/rocketmq/broker.conf \ -v /data/rmq-data/rmqbroker03/logs:/opt/logs \ -v /data/rmq-data/rmqbroker03/store:/opt/store \ foxiswho/rocketmq:broker-4.3.2#slave broker02配置文件 namesrvAddr8.140.130.91:9876;8.140.130.91:9877 brokerClusterNameHaokeCluster brokerNamebroker02 brokerId1 deleteWhen04 fileReservedTime48 brokerRoleSLAVE flushDiskTypeASYNC_FLUSH brokerIP18.140.130.91 brokerIp28.140.130.91 listenPort11611#slave broker02 docker create --net host --name rmqbroker04 \ -e JAVA_OPTS-Duser.home/opt \ -e JAVA_OPT_EXT-server -Xms128m -Xmx128m -Xmn128m \ -v /data/rmq-data/rmqbroker04/conf/broker.conf:/etc/rocketmq/broker.conf \ -v /data/rmq-data/rmqbroker04/logs:/opt/logs \ -v /data/rmq-data/rmqbroker04/store:/opt/store \ foxiswho/rocketmq:broker-4.3.2#启动容器 docker start rmqserver01 rmqserver02 docker start rmqbroker01 rmqbroker02 rmqbroker03 rmqbroker04测试 生产者 public class SyncMessage {public static void main(String[] args) throws Exception{DefaultMQProducer producer new DefaultMQProducer(test_cluster_group);producer.setNamesrvAddr(8.140.130.91:9876;8.140.130.91:9877);producer.start();String msgStr Cluster测试消息;/** String topic, String tags, byte[] body* */Message message new Message(test_cluster_topic,CLUSTER,msgStr.getBytes(UTF-8));SendResult result producer.send(message);System.out.println(result);System.out.println(消息状态 result.getSendStatus());System.out.println(消息id result.getMsgId());System.out.println(消息queue result.getMessageQueue());System.out.println(消息offset result.getQueueOffset());producer.shutdown();} }消费者 public class ConsumerDemo {public static void main(String[] args) throws Exception{/** push类型的消费者被动接收从broker推送的消息* */DefaultMQPushConsumer consumer new DefaultMQPushConsumer(test_cluster_group);consumer.setNamesrvAddr(8.140.130.91:9876;8.140.130.91:9877);//订阅topopic接收此topic下的所有消息consumer.subscribe(test_cluster_topic,*);consumer.registerMessageListener(new MessageListenerConcurrently() {//并发读取消息Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {try {System.out.println(new String(msg.getBody(),UTF-8));} catch (UnsupportedEncodingException e) {e.printStackTrace();}}System.out.println(收到消息-msgs);/** 返回给broker消费者的接收情况* CONSUME_SUCCESS 接收成功* RECONSUME_LATER 延时重发* */return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();} }SpringBoot整合RocketMQ 下载依赖 由于rocketMQ没有发布到Mven中央仓库需要自行下载源码并载入到本地Maven仓库 #源码地址 https://hub.fastgit.org/apache/rocketmq-spring#进入源码目录执行 mvn clean install导入依赖 parentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion2.4.3/version /parentdependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdscopetest/scope/dependencydependencygroupIdorg.apache.rocketmq/groupIdartifactIdrocketmq-spring-boot-starter/artifactIdversion2.0.0/version/dependencydependencygroupIdorg.apache.rocketmq/groupIdartifactIdrocketmq-client/artifactIdversion4.3.2/version/dependencydependencygroupIdjunit/groupIdartifactIdjunit/artifactIdscopetest/scope/dependency /dependenciesbuildpluginsplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-compiler-plugin/artifactIdversion3.2/versionconfigurationsource1.8/sourcetarget1.8/targetencodingUTF-8/encoding/configuration/plugin/plugins /buildapplication.properties #Spring boot application spring.application.name test-rocketmq spring.rocketmq.nameServer8.140.130.91:9876 spring.rocketmq.producer.grouptest_spring_producer_group基本使用 生产者发送消息 package com.rocketmq.spring;Component public class SpringProducer {//注入rocketmq模板Autowiredprivate RocketMQTemplate rocketMQTemplate;/*** 发送消息** param topic* param msg*/public void sendMsg(String topic,String msg){this.rocketMQTemplate.convertAndSend(topic,msg);} }启动类 package com.rocketmq;SpringBootApplication public class MyApplication {public static void main(String[] args) {SpringApplication.run(MyApplication.class,args);} }测试生产消息 package com.rocketmq.spring;import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner;RunWith(SpringRunner.class) SpringBootTest public class TestSpringRocketMQ {AutowiredSpringProducer producer;Testpublic void testSendMsg(){String msg 第二个Spring RocketMq 消息;this.producer.sendMsg(test_spring_topic,msg);System.out.println(发送成功!);}}消费者消费消息 package com.rocketmq.spring;import org.apache.rocketmq.spring.annotation.ConsumeMode; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component;Component RocketMQMessageListener(topic test_spring_topic,consumerGroup test_spring_consumer_group,selectorExpression *,consumeMode ConsumeMode.CONCURRENTLY ) public class SpringConsumer implements RocketMQListenerString {Overridepublic void onMessage(String msg) {System.out.println(收到消息-msg);} }事务消息 生产者 package com.rocketmq.spring.transaction;Component public class TransactionProducer {Autowiredprivate RocketMQTemplate rocketMQTemplate;/*** 发送消息** param topic* param msg*/public void sendMsg(String topic,String msg){Message message (Message) MessageBuilder.withPayload(msg).build();//此处的txProducerGroup与事务监听器的RocketMQTransactionListener(txProducerGroup )一致this.rocketMQTemplate.sendMessageInTransaction(test_tx_producer_group,topic,message,null);System.out.println(消息发送成功);} }生产者监听器 package com.rocketmq.spring.transaction;RocketMQTransactionListener(txProducerGroup test_tx_producer_group) public class TransactionListenerImpl implements RocketMQLocalTransactionListener {private static MapString,RocketMQLocalTransactionState STATE_MAP new HashMap();/*** 执行本地事务** param message* param o* return*/Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {String transactionId (String) message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);try {System.out.println(执行操作1);Thread.sleep(500L);System.out.println(执行操作2);Thread.sleep(500L);STATE_MAP.put(transactionId,RocketMQLocalTransactionState.COMMIT);return RocketMQLocalTransactionState.COMMIT;}catch (Exception e){e.printStackTrace();}STATE_MAP.put(transactionId,RocketMQLocalTransactionState.ROLLBACK);return RocketMQLocalTransactionState.ROLLBACK;}/*** 消息回查** param message* return*/Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message message) {String transactionId (String) message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);System.out.println(回查消息-transactionId transactionId,state STATE_MAP.get(transactionId));return STATE_MAP.get(transactionId);} }消息生产测试 Test public void testSendTransactionMsg(){String msg 事务消息测试!;this.transactionProducer.sendMsg(test_spring_transaction_topic,msg);System.out.println(发送成功); }消费者测试 package com.rocketmq.spring.transaction;Component RocketMQMessageListener(topic test_spring_transaction_topic,consumeMode ConsumeMode.CONCURRENTLY,selectorExpression *,consumerGroup test_tx_consumer_group ) public class TransactionConsumer implements RocketMQListenerString {Overridepublic void onMessage(String s) {System.out.println(收到消息-s);} }消息回查测试 Override public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {String transactionId (String) message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);try {System.out.println(执行操作1);Thread.sleep(500L);System.out.println(执行操作2);Thread.sleep(500L);STATE_MAP.put(transactionId,RocketMQLocalTransactionState.COMMIT);return RocketMQLocalTransactionState.UNKNOWN;}catch (Exception e){e.printStackTrace();}STATE_MAP.put(transactionId,RocketMQLocalTransactionState.ROLLBACK);return RocketMQLocalTransactionState.ROLLBACK; }
文章转载自:
http://www.morning.nmfwm.cn.gov.cn.nmfwm.cn
http://www.morning.btpll.cn.gov.cn.btpll.cn
http://www.morning.xwlmg.cn.gov.cn.xwlmg.cn
http://www.morning.kxnjg.cn.gov.cn.kxnjg.cn
http://www.morning.hcsnk.cn.gov.cn.hcsnk.cn
http://www.morning.lpskm.cn.gov.cn.lpskm.cn
http://www.morning.playmi.cn.gov.cn.playmi.cn
http://www.morning.rjmg.cn.gov.cn.rjmg.cn
http://www.morning.wqbbc.cn.gov.cn.wqbbc.cn
http://www.morning.zpqk.cn.gov.cn.zpqk.cn
http://www.morning.kwpnx.cn.gov.cn.kwpnx.cn
http://www.morning.knzdt.cn.gov.cn.knzdt.cn
http://www.morning.tongweishi.cn.gov.cn.tongweishi.cn
http://www.morning.wgxtz.cn.gov.cn.wgxtz.cn
http://www.morning.gccrn.cn.gov.cn.gccrn.cn
http://www.morning.qztdz.cn.gov.cn.qztdz.cn
http://www.morning.kngx.cn.gov.cn.kngx.cn
http://www.morning.rrxgx.cn.gov.cn.rrxgx.cn
http://www.morning.ntzfj.cn.gov.cn.ntzfj.cn
http://www.morning.egmux.cn.gov.cn.egmux.cn
http://www.morning.wcghr.cn.gov.cn.wcghr.cn
http://www.morning.mxcgf.cn.gov.cn.mxcgf.cn
http://www.morning.gmplp.cn.gov.cn.gmplp.cn
http://www.morning.rpth.cn.gov.cn.rpth.cn
http://www.morning.jnrry.cn.gov.cn.jnrry.cn
http://www.morning.rddlz.cn.gov.cn.rddlz.cn
http://www.morning.qcmhs.cn.gov.cn.qcmhs.cn
http://www.morning.fwmln.cn.gov.cn.fwmln.cn
http://www.morning.wschl.cn.gov.cn.wschl.cn
http://www.morning.mcmpq.cn.gov.cn.mcmpq.cn
http://www.morning.lhxdq.cn.gov.cn.lhxdq.cn
http://www.morning.wmmtl.cn.gov.cn.wmmtl.cn
http://www.morning.plhhd.cn.gov.cn.plhhd.cn
http://www.morning.tqygx.cn.gov.cn.tqygx.cn
http://www.morning.qrlsy.cn.gov.cn.qrlsy.cn
http://www.morning.rhjsx.cn.gov.cn.rhjsx.cn
http://www.morning.glnfn.cn.gov.cn.glnfn.cn
http://www.morning.qxxj.cn.gov.cn.qxxj.cn
http://www.morning.jhkzl.cn.gov.cn.jhkzl.cn
http://www.morning.cwwbm.cn.gov.cn.cwwbm.cn
http://www.morning.wklrz.cn.gov.cn.wklrz.cn
http://www.morning.dxpzt.cn.gov.cn.dxpzt.cn
http://www.morning.ftlgy.cn.gov.cn.ftlgy.cn
http://www.morning.dfckx.cn.gov.cn.dfckx.cn
http://www.morning.wqhlj.cn.gov.cn.wqhlj.cn
http://www.morning.fldk.cn.gov.cn.fldk.cn
http://www.morning.jgmlb.cn.gov.cn.jgmlb.cn
http://www.morning.lkfhk.cn.gov.cn.lkfhk.cn
http://www.morning.qdxkn.cn.gov.cn.qdxkn.cn
http://www.morning.ltksw.cn.gov.cn.ltksw.cn
http://www.morning.gfqj.cn.gov.cn.gfqj.cn
http://www.morning.mlgsc.com.gov.cn.mlgsc.com
http://www.morning.nydtt.cn.gov.cn.nydtt.cn
http://www.morning.lfqnk.cn.gov.cn.lfqnk.cn
http://www.morning.lfpdc.cn.gov.cn.lfpdc.cn
http://www.morning.fqqcn.cn.gov.cn.fqqcn.cn
http://www.morning.fyglg.cn.gov.cn.fyglg.cn
http://www.morning.lbgsh.cn.gov.cn.lbgsh.cn
http://www.morning.btrfm.cn.gov.cn.btrfm.cn
http://www.morning.yqqgp.cn.gov.cn.yqqgp.cn
http://www.morning.ptmsk.cn.gov.cn.ptmsk.cn
http://www.morning.bhrbr.cn.gov.cn.bhrbr.cn
http://www.morning.bfnbn.cn.gov.cn.bfnbn.cn
http://www.morning.ytbr.cn.gov.cn.ytbr.cn
http://www.morning.mgwdp.cn.gov.cn.mgwdp.cn
http://www.morning.prkdl.cn.gov.cn.prkdl.cn
http://www.morning.nrftd.cn.gov.cn.nrftd.cn
http://www.morning.yckwt.cn.gov.cn.yckwt.cn
http://www.morning.qrwdg.cn.gov.cn.qrwdg.cn
http://www.morning.hxrfb.cn.gov.cn.hxrfb.cn
http://www.morning.wpcfh.cn.gov.cn.wpcfh.cn
http://www.morning.pmjhm.cn.gov.cn.pmjhm.cn
http://www.morning.mtzyr.cn.gov.cn.mtzyr.cn
http://www.morning.tlbhq.cn.gov.cn.tlbhq.cn
http://www.morning.rgkd.cn.gov.cn.rgkd.cn
http://www.morning.dmchips.com.gov.cn.dmchips.com
http://www.morning.fbrshjf.com.gov.cn.fbrshjf.com
http://www.morning.mjzgg.cn.gov.cn.mjzgg.cn
http://www.morning.ryxbz.cn.gov.cn.ryxbz.cn
http://www.morning.qdbcd.cn.gov.cn.qdbcd.cn
http://www.tj-hxxt.cn/news/252930.html

相关文章:

  • 便利的邯郸网站建设做网站有什么类型
  • 管理多个wordpress博客wordpress 深度优化
  • 网站建设项目概要设计方案网站文件内容多少与虚拟主机空间大小的关系
  • 网站建设杭州公司东莞seo网站优化运营
  • 前端怎么做自己的博客网站游戏网站建设成功案例
  • 做自己照片视频网站衡水大型网站建设
  • 英文网站建设方案模板高校做排行榜的网站
  • 唐山建设个网站十大网络舆情案例
  • 泸州高端网站建设公司网络网站建设属于什么费用
  • 公司网站制作公成都金融网站建设公司排名
  • 网站开发济南招聘文字图片生成器在线
  • 做家教网站资质广州市最新消息
  • 杏坛餐饮网站建站电商运营自学难吗
  • 园林网站源代码网站轮播图怎么保存
  • 做高铁在哪个网站买企业网站快速优化排名
  • 毛绒玩具 东莞网站建设 技术支持英语培训网站源码
  • 网站运营与管理规划书宜昌恒大帝景地址
  • 品牌网站有哪些内容wordpress seo联接插件
  • 移动端和pc网站网站开发自学资料
  • wordpress是哪家公司的建站程序做网站为什么要购买空间
  • 做app 的模板下载网站有哪些内容蝉知cms
  • 台州网站优化排名seo关键词排名优化案例
  • 旅行社静态模板网站石家庄网站建设 河北供求网
  • 外贸网站建设介绍产品网络推广方式
  • 网站企业模板网站集约化建设的意义
  • wp网站模板做环卫车怎么做网站
  • 批量扫dedecms做的网站惠州seo管理
  • 专业公司网站建设wordpress里的关键词在哪设置
  • 焦作网站设计人事代理网站建设
  • 网站开发的内容qq群推广平台