当前位置: 首页 > news >正文

wordpress双站 中英文微山做网站

wordpress双站 中英文,微山做网站,我找伟宏篷布我做的事ko家的网站,有什么网站有教师招聘考试题目做文章目录 一、消息消费实现二、消息消费过程1、消息拉取2、消息消费1#xff09;提交消费请求2#xff09;消费消息 一、消息消费实现 消息消费有2种实现#xff0c;分别为#xff1a;并发消费实现#xff08;ConsumeMessageConcurrentlyService#xff09;和顺序消费实现… 文章目录 一、消息消费实现二、消息消费过程1、消息拉取2、消息消费1提交消费请求2消费消息 一、消息消费实现 消息消费有2种实现分别为并发消费实现ConsumeMessageConcurrentlyService和顺序消费实现ConsumeMessageOrderlyService。本次以并发消费实现为切入进行探讨消息的消费流程。 二、消息消费过程 1、消息拉取 1在消息服务PullMessageService中完成将消息从远程服务器拉取到本地具体实现由DefaultMQPushConsumerImpl#pullMessage方法完成 //org.apache.rocketmq.client.impl.consumer.PullMessageService#pullMessageprivate void pullMessage(final PullRequest pullRequest) {//从MQClientInstance中获取内部实现类MQConsumerInnerfinal MQConsumerInner consumer this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());if (consumer ! null) {//强转换成PUSH消息消费服务然后消费消息DefaultMQPushConsumerImpl impl (DefaultMQPushConsumerImpl) consumer;impl.pullMessage(pullRequest);} else {log.warn(No matched consumer for the PullRequest {}, drop it, pullRequest);}}2DefaultMQPushConsumerImpl#pullMessage方法中定义了回调实现在成功拉取消息后先将消息放到processQueue中然后再提交消费请求DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest异步完成消息消费。 // org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage 回调部分代码PullCallback pullCallback new PullCallback() {Overridepublic void onSuccess(PullResult pullResult) {// ......//从服务器拉取到消息后回调 PullCallBack 回调方法后先将消息放入到 ProccessQueue中boolean dispatchToConsume processQueue.putMessage(pullResult.getMsgFoundList());// 然后把消息提交到消费线程池中执行// 也就是调用 ConsumeMessageService#submitConsumeRequest 开始进入到消息消费的事件中来DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(),processQueue,pullRequest.getMessageQueue(),dispatchToConsume);// ......}}}// ......};2、消息消费 1提交消费请求 pullMessage方法中回调提交消息消费submitConsumeRequest进入消息并发消费实现ConsumeMessageConcurrentlyService其实现代码如下 // org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#submitConsumeRequest方法Overridepublic void submitConsumeRequest(final ListMessageExt msgs,final ProcessQueue processQueue,final MessageQueue messageQueue,final boolean dispatchToConsume) {final int consumeBatchSize this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();if (msgs.size() consumeBatchSize) {ConsumeRequest consumeRequest new ConsumeRequest(msgs, processQueue, messageQueue);try {//异步线程池中执行this.consumeExecutor.submit(consumeRequest);} catch (RejectedExecutionException e) {//提交异常延迟5S再提交this.submitConsumeRequestLater(consumeRequest);}} else {//超过最大数量分批for (int total 0; total msgs.size(); ) {ListMessageExt msgThis new ArrayListMessageExt(consumeBatchSize);for (int i 0; i consumeBatchSize; i, total) {if (total msgs.size()) {msgThis.add(msgs.get(total));} else {break;}}ConsumeRequest consumeRequest new ConsumeRequest(msgThis, processQueue, messageQueue);try {this.consumeExecutor.submit(consumeRequest);} catch (RejectedExecutionException e) {for (; total msgs.size(); total) {msgThis.add(msgs.get(total));}this.submitConsumeRequestLater(consumeRequest);}}}}submitConsumeRequest方法中 先获取单次消费消息最大条数consumeBatchSize默认1条如果本次提交消息条数小于等于单次消费消息最大条数则直接创建ConsumeRequest并提交到线程池consumeExecutor中执行如果超过单次消费消息最大条数则按consumeBatchSize分割分批提交 2消费消息 ConsumeMessageConcurrentlyService中创建消息消费请求线程ConsumeRequest然后提交到线程池。 // org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest#runOverridepublic void run() {//在进行消息重新负载时如果该消息队列被分配给消费组内其他消费者drop设置为true阻止消费者消费不属于自己的消息队列if (this.processQueue.isDropped()) {log.info(the message queue not be able to consume, because its dropped. group{} {}, ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);return;}//类名.this:一般用于内部类需要使用其外部类的实例对象时候使用 ClassName.this 代表其外部类对象直接写this则代表内部类本身对象MessageListenerConcurrently listener ConsumeMessageConcurrentlyService.this.messageListener;ConsumeConcurrentlyContext context new ConsumeConcurrentlyContext(messageQueue);ConsumeConcurrentlyStatus status null;//恢复重试消息主题名// RocketMQ将消息存入 commitlog 文件时如果发现消息的延时级别 delayTimeLevel 大于0会//首先将重试主题存人在消息的属性中然后设置主题名称为 SCHEDULE TOPIC 以便时间到后重新参与消息消费defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());ConsumeMessageContext consumeMessageContext null;//执行钩子if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext new ConsumeMessageContext();consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());consumeMessageContext.setProps(new HashMapString, String());consumeMessageContext.setMq(messageQueue);consumeMessageContext.setMsgList(msgs);consumeMessageContext.setSuccess(false);ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);}long beginTimestamp System.currentTimeMillis();boolean hasException false;ConsumeReturnType returnType ConsumeReturnType.SUCCESS;try {if (msgs ! null !msgs.isEmpty()) {for (MessageExt msg : msgs) {MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));}}//内部消息监听器消费消息status listener.consumeMessage(Collections.unmodifiableList(msgs), context);} catch (Throwable e) {log.warn(String.format(consumeMessage exception: %s Group: %s Msgs: %s MQ: %s,RemotingHelper.exceptionSimpleDesc(e),ConsumeMessageConcurrentlyService.this.consumerGroup,msgs,messageQueue), e);hasException true;}long consumeRT System.currentTimeMillis() - beginTimestamp;if (null status) {if (hasException) {returnType ConsumeReturnType.EXCEPTION;} else {returnType ConsumeReturnType.RETURNNULL;}} else if (consumeRT defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {returnType ConsumeReturnType.TIME_OUT;} else if (ConsumeConcurrentlyStatus.RECONSUME_LATER status) {returnType ConsumeReturnType.FAILED;} else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS status) {returnType ConsumeReturnType.SUCCESS;}if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());}if (null status) {log.warn(consumeMessage return null, Group: {} Msgs: {} MQ: {},ConsumeMessageConcurrentlyService.this.consumerGroup,msgs,messageQueue);status ConsumeConcurrentlyStatus.RECONSUME_LATER;}//后置钩子if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext.setStatus(status.toString());consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS status);ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);}ConsumeMessageConcurrentlyService.this.getConsumerStatsManager().incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);//同步消息消费状态和offsetif (!processQueue.isDropped()) {ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);} else {log.warn(processQueue is dropped without process consume result. messageQueue{}, msgs{}, messageQueue, msgs);}}ConsumeRequest线程中执行步骤如下 判断消费的队列是否dropped如果为true则停止直接终止该消费请求恢复重试消息的topic和namespace如果存在钩子函数则执行前置钩子函数 ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext)调用消息监听器消费消息listener.consumeMessage(io.openmessaging.rocketmq.consumer.PushConsumerImpl.MessageListenerImpl)如果存在后置钩子则执行后置钩子函数消息消费结果处理
http://www.tj-hxxt.cn/news/226965.html

相关文章:

  • asp网站目录权限17一起广州做网站
  • 外贸视频网站湛江seo网站管理
  • 交通建设门户网站ps设计教程网
  • 机械制造设备类企业网站织梦模板榆林市住房和城市建设局网站
  • 电子商务网站规划建设方案手机优化软件
  • 捷信做单官方网站vc6.0做网站
  • 社交网站模板精美网页设计模板
  • 孝感市门户网站网站备案人授权书
  • 福州市建设厅网站如何构建自己的网站
  • 中小型企业网站模板个人简历模板下载word格式
  • 福田住房和建设局网站官网浙江专业网站seo
  • 网站建设及推广人员网站动态效果怎么做
  • 网站开发 自定义首页显示南京的电商网站设计
  • 中国城乡与建设部网站做家具定制的设计网站
  • dw做的网站链接不会跳转唐山彩钢中企动力提供网站建设
  • 2345电视剧网站免费为何用wdcp建立网站连不上ftp
  • 去哪个网站可以做写手建设网站用什么服务器
  • 网站建设如何搞活动咸阳做网站开发公司哪家好
  • 如何免费制作网站进服务器编辑网站怎么做
  • 分类网站开发漳州网站开发制作棋牌
  • 内江网站怎么做seo美了一站式变美共享平台
  • 网站推广软件工具百度广告推广
  • 建网站首选公司做羞羞的事情的网站
  • 开发网站公司推荐如何建设影视网站首页
  • 做网站建设工资高吗网站结构框架图怎么做
  • 什么建设网站好短视频app开发软件
  • 网站推广策划方案书ps网站首页直线教程
  • 沧浪企业建设网站电话建设三库一平台
  • 湖南长沙网站建设公司电话三个字的洋气商标名字
  • 网站将要准备建设的内容新手做网站视频讲解