wordpress双站 中英文,微山做网站,我找伟宏篷布我做的事ko家的网站,有什么网站有教师招聘考试题目做文章目录 一、消息消费实现二、消息消费过程1、消息拉取2、消息消费1#xff09;提交消费请求2#xff09;消费消息 一、消息消费实现 消息消费有2种实现#xff0c;分别为#xff1a;并发消费实现#xff08;ConsumeMessageConcurrentlyService#xff09;和顺序消费实现… 文章目录 一、消息消费实现二、消息消费过程1、消息拉取2、消息消费1提交消费请求2消费消息 一、消息消费实现 消息消费有2种实现分别为并发消费实现ConsumeMessageConcurrentlyService和顺序消费实现ConsumeMessageOrderlyService。本次以并发消费实现为切入进行探讨消息的消费流程。
二、消息消费过程
1、消息拉取 1在消息服务PullMessageService中完成将消息从远程服务器拉取到本地具体实现由DefaultMQPushConsumerImpl#pullMessage方法完成
//org.apache.rocketmq.client.impl.consumer.PullMessageService#pullMessageprivate void pullMessage(final PullRequest pullRequest) {//从MQClientInstance中获取内部实现类MQConsumerInnerfinal MQConsumerInner consumer this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());if (consumer ! null) {//强转换成PUSH消息消费服务然后消费消息DefaultMQPushConsumerImpl impl (DefaultMQPushConsumerImpl) consumer;impl.pullMessage(pullRequest);} else {log.warn(No matched consumer for the PullRequest {}, drop it, pullRequest);}}2DefaultMQPushConsumerImpl#pullMessage方法中定义了回调实现在成功拉取消息后先将消息放到processQueue中然后再提交消费请求DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest异步完成消息消费。
// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage 回调部分代码PullCallback pullCallback new PullCallback() {Overridepublic void onSuccess(PullResult pullResult) {// ......//从服务器拉取到消息后回调 PullCallBack 回调方法后先将消息放入到 ProccessQueue中boolean dispatchToConsume processQueue.putMessage(pullResult.getMsgFoundList());// 然后把消息提交到消费线程池中执行// 也就是调用 ConsumeMessageService#submitConsumeRequest 开始进入到消息消费的事件中来DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(),processQueue,pullRequest.getMessageQueue(),dispatchToConsume);// ......}}}// ......};2、消息消费
1提交消费请求 pullMessage方法中回调提交消息消费submitConsumeRequest进入消息并发消费实现ConsumeMessageConcurrentlyService其实现代码如下
// org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#submitConsumeRequest方法Overridepublic void submitConsumeRequest(final ListMessageExt msgs,final ProcessQueue processQueue,final MessageQueue messageQueue,final boolean dispatchToConsume) {final int consumeBatchSize this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();if (msgs.size() consumeBatchSize) {ConsumeRequest consumeRequest new ConsumeRequest(msgs, processQueue, messageQueue);try {//异步线程池中执行this.consumeExecutor.submit(consumeRequest);} catch (RejectedExecutionException e) {//提交异常延迟5S再提交this.submitConsumeRequestLater(consumeRequest);}} else {//超过最大数量分批for (int total 0; total msgs.size(); ) {ListMessageExt msgThis new ArrayListMessageExt(consumeBatchSize);for (int i 0; i consumeBatchSize; i, total) {if (total msgs.size()) {msgThis.add(msgs.get(total));} else {break;}}ConsumeRequest consumeRequest new ConsumeRequest(msgThis, processQueue, messageQueue);try {this.consumeExecutor.submit(consumeRequest);} catch (RejectedExecutionException e) {for (; total msgs.size(); total) {msgThis.add(msgs.get(total));}this.submitConsumeRequestLater(consumeRequest);}}}}submitConsumeRequest方法中
先获取单次消费消息最大条数consumeBatchSize默认1条如果本次提交消息条数小于等于单次消费消息最大条数则直接创建ConsumeRequest并提交到线程池consumeExecutor中执行如果超过单次消费消息最大条数则按consumeBatchSize分割分批提交
2消费消息 ConsumeMessageConcurrentlyService中创建消息消费请求线程ConsumeRequest然后提交到线程池。
// org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest#runOverridepublic void run() {//在进行消息重新负载时如果该消息队列被分配给消费组内其他消费者drop设置为true阻止消费者消费不属于自己的消息队列if (this.processQueue.isDropped()) {log.info(the message queue not be able to consume, because its dropped. group{} {}, ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);return;}//类名.this:一般用于内部类需要使用其外部类的实例对象时候使用 ClassName.this 代表其外部类对象直接写this则代表内部类本身对象MessageListenerConcurrently listener ConsumeMessageConcurrentlyService.this.messageListener;ConsumeConcurrentlyContext context new ConsumeConcurrentlyContext(messageQueue);ConsumeConcurrentlyStatus status null;//恢复重试消息主题名// RocketMQ将消息存入 commitlog 文件时如果发现消息的延时级别 delayTimeLevel 大于0会//首先将重试主题存人在消息的属性中然后设置主题名称为 SCHEDULE TOPIC 以便时间到后重新参与消息消费defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());ConsumeMessageContext consumeMessageContext null;//执行钩子if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext new ConsumeMessageContext();consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());consumeMessageContext.setProps(new HashMapString, String());consumeMessageContext.setMq(messageQueue);consumeMessageContext.setMsgList(msgs);consumeMessageContext.setSuccess(false);ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);}long beginTimestamp System.currentTimeMillis();boolean hasException false;ConsumeReturnType returnType ConsumeReturnType.SUCCESS;try {if (msgs ! null !msgs.isEmpty()) {for (MessageExt msg : msgs) {MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));}}//内部消息监听器消费消息status listener.consumeMessage(Collections.unmodifiableList(msgs), context);} catch (Throwable e) {log.warn(String.format(consumeMessage exception: %s Group: %s Msgs: %s MQ: %s,RemotingHelper.exceptionSimpleDesc(e),ConsumeMessageConcurrentlyService.this.consumerGroup,msgs,messageQueue), e);hasException true;}long consumeRT System.currentTimeMillis() - beginTimestamp;if (null status) {if (hasException) {returnType ConsumeReturnType.EXCEPTION;} else {returnType ConsumeReturnType.RETURNNULL;}} else if (consumeRT defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {returnType ConsumeReturnType.TIME_OUT;} else if (ConsumeConcurrentlyStatus.RECONSUME_LATER status) {returnType ConsumeReturnType.FAILED;} else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS status) {returnType ConsumeReturnType.SUCCESS;}if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());}if (null status) {log.warn(consumeMessage return null, Group: {} Msgs: {} MQ: {},ConsumeMessageConcurrentlyService.this.consumerGroup,msgs,messageQueue);status ConsumeConcurrentlyStatus.RECONSUME_LATER;}//后置钩子if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext.setStatus(status.toString());consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS status);ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);}ConsumeMessageConcurrentlyService.this.getConsumerStatsManager().incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);//同步消息消费状态和offsetif (!processQueue.isDropped()) {ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);} else {log.warn(processQueue is dropped without process consume result. messageQueue{}, msgs{}, messageQueue, msgs);}}ConsumeRequest线程中执行步骤如下
判断消费的队列是否dropped如果为true则停止直接终止该消费请求恢复重试消息的topic和namespace如果存在钩子函数则执行前置钩子函数 ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext)调用消息监听器消费消息listener.consumeMessage(io.openmessaging.rocketmq.consumer.PushConsumerImpl.MessageListenerImpl)如果存在后置钩子则执行后置钩子函数消息消费结果处理