当前位置: 首页 > news >正文

对单位网站建设的建议如何做google推广

对单位网站建设的建议,如何做google推广,怎么在阿里巴巴网站做公司,专门做金融的招聘网站文章目录 1. 前言2. 消费者启动2.1 DefaultMQPushConsumer#start 启动消费者2.2 DefaultMQPushConsumerImpl#start2.2.1 checkConfig 检查消费者配置2.2.2 copySubscription 拷贝订阅关系2.2.3 加载消费偏移量2.2.4 更新消费者订阅 topic 路由信息2.2.5 checkClientInBroker 校… 文章目录 1. 前言2. 消费者启动2.1 DefaultMQPushConsumer#start 启动消费者2.2 DefaultMQPushConsumerImpl#start2.2.1 checkConfig 检查消费者配置2.2.2 copySubscription 拷贝订阅关系2.2.3 加载消费偏移量2.2.4 更新消费者订阅 topic 路由信息2.2.5 checkClientInBroker 校验 SQL92 表达式2.2.6 broker 处理 CHECK_CLIENT_CONFIG 请求 3. 小结 本文章基于 RocketMQ 4.9.3 1. 前言 【RocketMQ】- 源码系列目录【RocketMQ 生产者消费者】- 同步、异步、单向发送消费消息 上面几篇文章我们探讨了生产者的启动和发送消息的源码这篇文章开始我们就来看下消费者的启动源码也开始消费者的源码分析。 2. 消费者启动 在上面文章《同步、异步、单向发送消费消息》中我们也讲述了生产者如何发送消息消费者如何消费消息当然在文章中我们还是以 Push 模式为主那所谓的消费者 Push 模式就是服务端主动推送消息给客户端其实说是服务端推送实际上 Push 就是 Pull 的一层封装 Pull 模式是用户自己去拉取消息Push 模式就是客户端帮用户控制拉取频率消费者需要做的就是消费当然了这样坏处就是客户端如果没有做好流控就有可能导致消息堆积导致系统崩溃消费者的源码还是以 Push 为主也就是 DefaultMQPushConsumer。 2.1 DefaultMQPushConsumer#start 启动消费者 /*** 启动消费者** throws MQClientException if there is any client error.*/ Override public void start() throws MQClientException {// 根据命名空间设置消费者组setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));// 默认消费者启动this.defaultMQPushConsumerImpl.start();if (null ! traceDispatcher) {try {// 消息轨迹追踪服务启动traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());} catch (MQClientException e) {log.warn(trace dispatcher start failed , e);}} }启动消费者的逻辑和生产者差不多直接看 defaultMQPushConsumerImpl 的 start 方法。 2.2 DefaultMQPushConsumerImpl#start /** * 消费者启动 * throws MQClientException */ public synchronized void start() throws MQClientException {// 根据不同状态走不同逻辑switch (this.serviceState) {// 消费服务刚刚创建出来就是这个状态case CREATE_JUST:log.info(the consumer [{}] start beginning. messageModel{}, isUnitMode{}, this.defaultMQPushConsumer.getConsumerGroup(),this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());// 首先修改下服务状态为 START_FAILED, 等启动成功之后再修改服务状态为 RUNNINGthis.serviceState ServiceState.START_FAILED;// 校验消费者配置this.checkConfig();// 拷贝订阅关系this.copySubscription();// 如果是集群消费模式如果 instanceName 为默认值 DEFAULT那么改成 UtilAll.getPid() # System.nanoTime()// 简单来说就是集群模式下消费者的实例名称修改成进程 PID#当前时间, 也就是永远不会重复, 这玩意就是用来构建 clientID 的, 而// clientID 和消费者的消息队列分配有关, 一个 topic 会将消息队列分配给对应集群下面的消费者, 根据不同的策略进行分配, 如果是// 广播模式, 由于每一个消费者都会消费同一条消息, 自然就不需要考虑这些, clientID 有大用, 这里重点计下if (this.defaultMQPushConsumer.getMessageModel() MessageModel.CLUSTERING) {this.defaultMQPushConsumer.changeInstanceNameToPID();}// 获取 MQClientInstance在里面会根据 clientId 获取或者创建 MQClientInstance 实例this.mQClientFactory MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);// 设置负载均衡相关的属性: 消费者组、消费模式、分配逻辑、消费者客户端this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);// 消息拉取服务, 在里面去进行消息拉取以及处理消息拉取的结果this.pullAPIWrapper new PullAPIWrapper(mQClientFactory,this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);// 根据消息模式设置不同的 OffsetStore用于实现消费者的消息消费偏移量 offset 的管理if (this.defaultMQPushConsumer.getOffsetStore() ! null) {this.offsetStore this.defaultMQPushConsumer.getOffsetStore();} else {switch (this.defaultMQPushConsumer.getMessageModel()) {case BROADCASTING:// 广播模式会将消费者的消费进度存储到本地this.offsetStore new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;case CLUSTERING:// 如果是集群模式消费者的消费进度会存储到 broker 上面this.offsetStore new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;default:break;}// 设置消费进度管理类 OffsetStorethis.defaultMQPushConsumer.setOffsetStore(this.offsetStore);}// 加载消费偏移量LocalFileOffsetStore 会加载本地磁盘中的数据RemoteBrokerOffsetStore 没有实现, RemoteBrokerOffsetStore// 的消费偏移量是存储到 broker 的, 所以不会在这里加载this.offsetStore.load();// 如果消费者的监听器是 MessageListenerOrderly, 说明这个消费者是顺序消费, 那么消费的逻辑就通过 ConsumeMessageOrderlyService 完成if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {this.consumeOrderly true;this.consumeMessageService new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {// 如果消费者的监听器是 MessageListenerConcurrently, 说明这个消费者是并发消费, 那么消费的逻辑就通过 ConsumeMessageConcurrentlyService 完成this.consumeOrderly false;this.consumeMessageService new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());}// 启动消息消费服务this.consumeMessageService.start();// 将当前消费者注册到 MQClientInstance 的 consumerTable 中key 是消费者组这里也透露出一件事一个进程里面的消费// 者必须都是不同消费者组的boolean registerOK mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);if (!registerOK) {// 这里就是发现了有多个 DefaultMQConsumerImpl 往一个消费者组里面配置this.serviceState ServiceState.CREATE_JUST;// 关闭消费服务this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());// 抛出异常throw new MQClientException(The consumer group[ this.defaultMQPushConsumer.getConsumerGroup() ] has been created before, specify another name please. FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),null);}// 启动 MQClientInstance 客户端实例mQClientFactory.start();log.info(the consumer [{}] start OK., this.defaultMQPushConsumer.getConsumerGroup());// 设置消费者服务状态为 RUNNINGthis.serviceState ServiceState.RUNNING;break;case RUNNING:case START_FAILED:case SHUTDOWN_ALREADY:throw new MQClientException(The PushConsumer service state not OK, maybe started once, this.serviceState FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),null);default:break;}// 从 nameserver 中获取当前消费者订阅的 topic 路由信息并尝试更新本地缓存this.updateTopicSubscribeInfoWhenSubscriptionChanged();// 随机挑选一个 broker 发送请求, 判断 broker 是否支持 SQL92 过滤以及通过 SqlFilter 校验 SQL92 表达式是否正确this.mQClientFactory.checkClientInBroker();// 发送心跳信息给 broker, 当消费者把自己的信息上报给 broker 之后, broker 会通知其他消费者进行负载均衡this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();// 唤醒负载均衡服务, 主动进行重平衡this.mQClientFactory.rebalanceImmediately(); }跟生产者一样首先需要检查下消费者配置也就是 checkConfig 方法这里面会去校验一些消费者的属性是否合法当然里面的内容有点多留到后面再去讲。 然后下面是 copySubscription这个方法用于拷贝订阅关系DefaultMQPushConsumer 里面有一个属性 subscription这个就是消费者订阅关系这个方法会把订阅关系拷贝到 RebalanceImpl 下面的 subscriptionInner 里面消费者重平衡分配队列就是通过 RebalanceImpl 来实现的且注意每个消费者都有自己的 RebalanceImplRebalanceImpl 是一个 protected 属性然后顺便在 copySubscription 中将这个消费者所在消费者组的重传 topic 也设置到 subscriptionInner 中这个也是后面再细说。 接下来如果是集群消费模式如果 instanceName 为默认值 “DEFAULT”那么改成 UtilAll.getPid() “#” System.nanoTime()简单来说就是集群模式下消费者的实例名称修改成进程 PID#当前时间也就是永远不会重复这玩意就是用来构建 clientID 的而 clientID 和消费者的消息队列分配有关一个 topic 会将消息队列分配给对应集群下面的消费者根据不同的策略进行分配如果是广播模式由于每一个消费者都会消费同一条消息自然就不需要考虑这些。 接下来获取 MQClientInstance在里面会根据 clientId 获取或者创建 MQClientInstance 实例注意这里如果生产者和消费者运行在同一个进程中比如启动一个 jar 包这个 jar 包里面生产者消费者也不会共用一个 MQClientInstance因为这个版本的 RocketMQ 设置 instanceName 的时候加上了当前时间所以获取的 MQClientInstance 也是不同的对于 MQClientInstance 如果想要了解更多的可以看这篇文章【RocketMQ 生产者和消费者】- 生产者启动源码-创建 MQClientInstance2。 下面是负载均衡的一些相关属性消费者组、消费模式、分配逻辑、消费者客户端负载均衡是消费者很重要的一个功能决定了消费者可以消费这个 topic 下面的哪些队列关于负载均衡的原理后面文章会去细说。 创建消息拉取服务 pullAPIWrapper在里面去进行消息拉取以及处理消息拉取的结果消息拉取服务是消费者的重要服务首先消费者不可能每一次拉取消息都向 broker 实时请求 所以消费者都会有一个定时任务定时拉取消息下来存储到本地消息集合中消费消息都是消费本地消息集合的。 下面根据消息模式设置不同的 OffsetStore用于实现消费者的消息消费偏移量 offset 的管理注意下这里消费者的偏移量值得是下一条要消费的消息在 ConsumeQueue 中的索引下标如果要找到具体的消息在 ConsumeQueue 的位置还需要 * 20。 接下来继续最重要的来了如果消费者的监听器是 MessageListenerOrderly, 说明这个消费者是顺序消费, 那么消费的逻辑就通过 ConsumeMessageOrderlyService 完成也就是创建 ConsumeMessageOrderlyService而如果消费者的监听器是 MessageListenerConcurrently, 说明这个消费者是并发消费, 那么消费的逻辑就通过 ConsumeMessageConcurrentlyService 完成这两个消费模式有什么区别后面讲里面的消费源码的时候也会说的接下来通过 this.consumeMessageService.start() 启动消费服务开始消费消息。 将当前消费者注册到 MQClientInstance 的 consumerTable 中key 是消费者组这里也透露出一件事一个进程里面的消费者必须都是不同消费者组的。 最后通过 mQClientFactory.start() 启动客户端实例设置消费者服务状态为 RUNNING使用 updateTopicSubscribeInfoWhenSubscriptionChanged 方法从 nameserver 中获取当前消费者订阅的 topic 路由信息并尝试更新本地缓存同时随机挑选一个 broker 发送请求, 判断 broker 是否支持 SQL92 过滤以及通过 SqlFilter 校验 SQL92 表达式是否正确调用 sendHeartbeatToAllBrokerWithLock 上报心跳信息消费者的心跳信息就是订阅信息。最后唤醒负载均衡服务, 主动进行重平衡。 上面的流程还是比较多的重点是这个方法跟生产者启动的确实有点像下面来单个看下里面的逻辑。 2.2.1 checkConfig 检查消费者配置 上面也说过了 checkConfig 会检查消费者启动的一些配置信息首先就是校验消费者组是否合法比如不能为空长度不能超过 255是否包含了非法字符合法的字符只有下面中括号里面这些 [%|a-zA-Z0-9_-]。 // 校验消费者组属性是否合法长度合法、字符合法 Validators.checkGroup(this.defaultMQPushConsumer.getConsumerGroup());// 如果消费者组为空, 抛出异常 if (null this.defaultMQPushConsumer.getConsumerGroup()) {throw new MQClientException(consumerGroup is null FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null); }接下来判断如果消费者组等于 DEFAULT_CONSUMER, 也抛出异常, 这是系统默认组名用户不能设置消费者组成这个。 // 如果消费者组等于 DEFAULT_CONSUMER, 也抛出异常, 这是系统默认组名 if (this.defaultMQPushConsumer.getConsumerGroup().equals(MixAll.DEFAULT_CONSUMER_GROUP)) {throw new MQClientException(consumerGroup can not equal MixAll.DEFAULT_CONSUMER_GROUP , please specify another one. FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null); }然后判断如果 messageModel 为空, 也抛出异常, messageModel 是 CLUSTERING 或者 BROADCASTING代表集群模式还是广播模式当然默认就是集群模式也就是不设置的话默认就是集群。 // 如果 messageModel 为空, 也抛出异常, messageModel 是 CLUSTERING 或者 BROADCASTING if (null this.defaultMQPushConsumer.getMessageModel()) {throw new MQClientException(messageModel is null FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null); }接下来判断消费者的消费点位不能为空消费者启动默认消费模式是 CONSUME_FROM_LAST_OFFSET也就是默认从上次消费的点位开始继续消费。消费点位意思是消费者从哪个地方开始消费。 // 如果 messageModel 为空, 也抛出异常, messageModel 是 CLUSTERING 或者 BROADCASTING if (null this.defaultMQPushConsumer.getMessageModel()) {throw new MQClientException(messageModel is null FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null); }然后下面判断 consumeTimestamp 属性一个新的订阅组第一次启动从指定时间点开始消费和 consumer.setConsumeTimestamp() 配合使用默认是启动的时间往前退半个小时当消费模式使用 CONSUME_FROM_TIMESTAMP 的时候会用到这玩意来计算下一次消费者从哪里开始拉取消息。 // 一个新的订阅组第一次启动从指定时间点开始消费和 consumer.setConsumeTimestamp() 配合使用默认是半个小时以前 Date dt UtilAll.parseDate(this.defaultMQPushConsumer.getConsumeTimestamp(), UtilAll.YYYYMMDDHHMMSS); if (null dt) {throw new MQClientException(consumeTimestamp is invalid, the valid format is yyyyMMddHHmmss,but received this.defaultMQPushConsumer.getConsumeTimestamp() FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), null); }接着判断消费者的 MessageQueue 分配策略这玩意默认是 AllocateMessageQueueAveragely用来消费者重平衡分配队列。 // 消费者的 MessageQueue 分配策略 if (null this.defaultMQPushConsumer.getAllocateMessageQueueStrategy()) {throw new MQClientException(allocateMessageQueueStrategy is null FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null); }继续判断消费者的订阅信息是否为空消费者订阅消息肯定不能为空的毕竟消费者需要依靠订阅信息来获取消息消费不过这个配置项应该不用了但是这里也不用管因为这个属性默认就是不为空的。 /*** 消费者订阅信息 topic - 订阅表达式*/ private MapString /* topic */, String /* sub expression */ subscription new HashMapString, String();// 消费者的订阅信息不能为空 if (null this.defaultMQPushConsumer.getSubscription()) {throw new MQClientException(subscription is null FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null); }然后判断消息监听器当有消息的时候就可以回调不过一般都是实现这玩意的子接口 MessageListenerOrderly回调 consumeMessage 方法。 // 消息监听器, 当有消息的时候就可以回调, 不过一般都是实现这玩意的子接口 MessageListenerOrderly, 回调 consumeMessage 方法 if (null this.defaultMQPushConsumer.getMessageListener()) {throw new MQClientException(messageListener is null FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null); }消息监听器可以通过 register 方法注册。 接下来判断消息监听器有没有设置成 MessageListenerOrderly 或者 MessageListenerConcurrently 的子类分别是顺序消费和并发消费。 // 这里就是判断消息监听器是 MessageListenerOrderly 或者 MessageListenerConcurrently 的子类, 分别是顺序消费和并发消费 boolean orderly this.defaultMQPushConsumer.getMessageListener() instanceof MessageListenerOrderly; boolean concurrently this.defaultMQPushConsumer.getMessageListener() instanceof MessageListenerConcurrently; if (!orderly !concurrently) {throw new MQClientException(messageListener must be instanceof MessageListenerOrderly or MessageListenerConcurrently FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null); }接着往下看下面就是消费者的线程数判断消费者消费线程最小值在 [1, 1000] 之间消费者线程最大值在 [1, 1000] 之间且消费者线程最小值不能大于消费者线程最大值。 // 消费者线程最小值在 [1, 1000] 之间 if (this.defaultMQPushConsumer.getConsumeThreadMin() 1|| this.defaultMQPushConsumer.getConsumeThreadMin() 1000) {throw new MQClientException(consumeThreadMin Out of range [1, 1000] FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null); }// 消费者线程最大值在 [1, 1000] 之间 if (this.defaultMQPushConsumer.getConsumeThreadMax() 1 || this.defaultMQPushConsumer.getConsumeThreadMax() 1000) {throw new MQClientException(consumeThreadMax Out of range [1, 1000] FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null); }// 消费者线程最小值不能大于消费者线程最大值 if (this.defaultMQPushConsumer.getConsumeThreadMin() this.defaultMQPushConsumer.getConsumeThreadMax()) {throw new MQClientException(consumeThreadMin ( this.defaultMQPushConsumer.getConsumeThreadMin() ) is larger than consumeThreadMax ( this.defaultMQPushConsumer.getConsumeThreadMax() ),null); }接下来判断并发消费的最大拉取跨度要在 [1, 65535] 之间并发消费拉取消息的跨度意思是消费者会把消息存储到一个 msgTree 中里面消息的偏移量跨度如果默认超过 2000那么 RocketMQ 就会先暂停拉取消息, 避免消息堆积过多用户可以自己设置不设置默认就是 2000。 // 并发消费的最大拉取跨度要在 [1, 65535] 之间 if (this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan() 1|| this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan() 65535) {throw new MQClientException(consumeConcurrentlyMaxSpan Out of range [1, 65535] FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null); }除了上面的拉取跨度还有流控阈值消息的流控主要是为了防止消费者处理消息的速度跟不上消息的生产速度导致消息在消费者端过度堆积从而影响系统性能甚至导致系统崩溃默认情况下每个消息队列会缓存 1000 条消息所以消费者最多会将 1000 条消息缓存起来等待处理但是消息拉取也是要结合 pullBatchSize 的这个参数是消费者一次拉取的消息数量假设是 200当缓存的消息数量达到 900 条时再次拉取 200 条消息此时缓存的消息数量就会达到 1100 条超过了 1000 条的限制, 所以这里要用的话得考虑好这两个参数的关系。这里检查这个参数也是需要在 [165535] 之间。 // 消息队列流控阈值需要在 [1, 65535] 之间 if (this.defaultMQPushConsumer.getPullThresholdForQueue() 1 || this.defaultMQPushConsumer.getPullThresholdForQueue() 65535) {throw new MQClientException(pullThresholdForQueue Out of range [1, 65535] FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null); }上面是消息队列的流控下面还有 topic 层面的流控所谓主题层面的流控就是对消费者从整个主题中拉取并缓存的消息总量进行限制。其默认值为 -1意味着不做限制即消费者可以无限制地从该主题中拉取消息并缓存若 pullThresholdForTopic 被设置为一个非 -1 的有效数值也就是启用了主题级别的流控那么上面的 pullThresholdForQueue队列级别的流控阈值会被覆盖, 新的 pullThresholdForQueue 值会根据 pullThresholdForTopic 的值和分配给该消费者的消息队列数量重新计算得出。 // topic 层面的流控, 默认是 -1, 就是不限制从某个 topic 上拉取的消息数, 范围是 [1, 6553500] if (this.defaultMQPushConsumer.getPullThresholdForTopic() ! -1) {if (this.defaultMQPushConsumer.getPullThresholdForTopic() 1 || this.defaultMQPushConsumer.getPullThresholdForTopic() 6553500) {throw new MQClientException(pullThresholdForTopic Out of range [1, 6553500] FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);} }除了流控还有消息缓存的总大小这里是从队列层面对缓存的消息大小做限制, 但是由于消费者一次可能会拉取多个消息即批量拉取, 在拉取瞬间消息队列中缓存的消息大小可能会超过 100 MiB例如当缓存的消息大小接近 100 MiB 时若一次拉取的消息大小较大就会使瞬时的缓存消息大小超过这个限制这里是校验消费者拉取的消息缓存的总大小不能超过 pullThresholdSizeForQueue, 范围是 [1, 1024] MB。 // 消费者拉取的消息缓存的总大小不能超过 pullThresholdSizeForQueue, 范围是 [1, 1024] MB if (this.defaultMQPushConsumer.getPullThresholdSizeForQueue() 1 || this.defaultMQPushConsumer.getPullThresholdSizeForQueue() 1024) {throw new MQClientException(pullThresholdSizeForQueue Out of range [1, 1024] FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null); }上面是队列层面的当然处理队列层面的还有 topic 层面的topic 层面的限制就是 -1也就是默认不限制这个 topic 下面的消息缓存大小。 // 这里就是 topic 层面去限制缓存中拉取的消息大小, 默认是 -1 就是不限 if (this.defaultMQPushConsumer.getPullThresholdSizeForTopic() ! -1) {// 如果限制, 这个值的范围需要在 [1, 102400] MBif (this.defaultMQPushConsumer.getPullThresholdSizeForTopic() 1 || this.defaultMQPushConsumer.getPullThresholdSizeForTopic() 102400) {throw new MQClientException(pullThresholdSizeForTopic Out of range [1, 102400] FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);} }消息拉取需要有一定的时间间隔下面的 pullInterval 就是用来控制时间间隔的当然默认是 0就是立刻提交消息拉取请求后面讲消息拉取源码的时候也会讲到这里先有个概念。 // 消息拉取时间间隔需要在 [0, 65535] if (this.defaultMQPushConsumer.getPullInterval() 0 || this.defaultMQPushConsumer.getPullInterval() 65535) {throw new MQClientException(pullInterval Out of range [0, 65535] FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null); }然后就是一次性可以批量消费多少条消息消费者批量消费的消息数在 [1, 1024] 之间。 // 消费者批量消费的消息数在 [1, 1024] 之间 if (this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize() 1|| this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize() 1024) {throw new MQClientException(consumeMessageBatchMaxSize Out of range [1, 1024] FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null); }最后是消费者批量拉取消息的数量也就是上面有提到的 pullBatchSize配合上面的 pullThresholdSizeForQueue 来使用数量需要在 [1, 1024] 之间。 // 消费者批量拉取消息的数量需要在 [1, 1024] 之间 if (this.defaultMQPushConsumer.getPullBatchSize() 1 || this.defaultMQPushConsumer.getPullBatchSize() 1024) {throw new MQClientException(pullBatchSize Out of range [1, 1024] FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null); }2.2.2 copySubscription 拷贝订阅关系 这里的拷贝订阅关系其实是将 DefaultMQPushConsumer 中的 subscription 拷贝到 RebalanceImpl 的 subscriptionInner 属性中但是 subscription 看了下源码里面设置的方法就是通过 set 方法去设置而这个 set 方法已经是弃用了所以这个方法的前面这段可以不看了。 下面来看下这个方法的源码。 /*** 拷贝订阅关系* throws MQClientException*/ private void copySubscription() throws MQClientException {try {// 获取消费者的订阅表达式, 这里的表达式可以通过 consumer.setSubscription(xxx); 设置, 但是这个方法已经是 DeprecatedMapString, String sub this.defaultMQPushConsumer.getSubscription();if (sub ! null) {// 遍历所有订阅表达式for (final Map.EntryString, String entry : sub.entrySet()) {// topic 名final String topic entry.getKey();// 订阅表达式final String subString entry.getValue();// 创建订阅信息SubscriptionData subscriptionData FilterAPI.buildSubscriptionData(topic, subString);// 添加到 subscriptionInner 订阅信息集合中this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);}}// 设置消息监听器, 消费者可以通过 consumer.setMessageListener(); 来设置监听器if (null this.messageListenerInner) {this.messageListenerInner this.defaultMQPushConsumer.getMessageListener();}// 判断消费者的消费模式switch (this.defaultMQPushConsumer.getMessageModel()) {case BROADCASTING:// 广播模式, 广播模式下所有消费者都会消费同一条消息, 所以没有什么重传的, 万一有一个消费失败重传导致其他消费者又重新消费一遍就完蛋了break;case CLUSTERING:// 集群模式会把对这个消费者组的重传 topic 创建订阅信息, 当然是订阅所有 TAG 了final String retryTopic MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());SubscriptionData subscriptionData FilterAPI.buildSubscriptionData(retryTopic, SubscriptionData.SUB_ALL);// 设置到 subscriptionInner 里面this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);break;default:break;}} catch (Exception e) {throw new MQClientException(subscription exception, e);} }我们主要关注的还是后面的源码首先就是设置消息的监听器这个监听器在创建消费者的时候就会设置了这里是将 DefaultMQPushConsumer 的 messageListenerInner 设置到 DefaultMQPushConsumerImpl 的 messageListenerInner这个监听器就是用来创建并发ConsumeMessageConcurrentlyService还是顺序ConsumeMessageOrderlyService消费的类时候传进去的。 最后判断消费者的消费模式来设置重传 topic %RETRY% consumerGroup的订阅信息如果消费者是广播模式广播模式下所有消费者都会消费同一条消息所以没有什么重传的万一有一个消费失败重传导致其他消费者又重新消费一遍就重复消费了。但是在集群模式会对这个消费者的消费者组创建一个订阅信息注意重传 topic 是消费者组维度的而不是消费者维度也就是说一个消费者组里面的消费者都会共用一个重传 topic 来存储消息重传 topic 肯定是默认订阅所有 tag 了。 2.2.3 加载消费偏移量 this.offsetStore.load() 用于加载消费偏移量LocalFileOffsetStore 会加载本地磁盘中的数据RemoteBrokerOffsetStore 没有实现RemoteBrokerOffsetStore 的消费偏移量是存储到 broker 的所以不会在这里加载所以下面就看下 LocalFileOffsetStore 的加载偏移量源码。 /*** 本地偏移量加载* throws MQClientException*/ Override public void load() throws MQClientException {// 从 #{user.home}/.rocketmq_offsets/{clientId}/{groupName}/{offsets.json} 中加载偏移量到 offsetTableOffsetSerializeWrapper offsetSerializeWrapper this.readLocalOffset();if (offsetSerializeWrapper ! null offsetSerializeWrapper.getOffsetTable() ! null) {offsetTable.putAll(offsetSerializeWrapper.getOffsetTable());for (EntryMessageQueue, AtomicLong mqEntry : offsetSerializeWrapper.getOffsetTable().entrySet()) {AtomicLong offset mqEntry.getValue();log.info(load consumers offset, {} {} {},this.groupName,mqEntry.getKey(),offset.get());}} }readLocalOffset 就不进去看了里面就是从 {user.home}/.rocketmq_offsets/{clientId}/{groupName}/{offsets.json} 中加载偏移量到 offsetTable不过一般我们都是向 broker 拉取偏移量所以这里很少用到。 2.2.4 更新消费者订阅 topic 路由信息 updateTopicSubscribeInfoWhenSubscriptionChanged 就是从 nameserver 中获取当前消费者订阅的 topic 路由信息并尝试更新本地缓存那这个方法源码如下 /*** 从 NameServer 中拉取 topic 的订阅信息来更新本地缓存*/ private void updateTopicSubscribeInfoWhenSubscriptionChanged() {// 获取消费者的订阅信息MapString, SubscriptionData subTable this.getSubscriptionInner();if (subTable ! null) {for (final Map.EntryString, SubscriptionData entry : subTable.entrySet()) {final String topic entry.getKey();// 从 NameServer 中拉取这个 topic 的订阅信息并且尝试去更新本地的相关缓存this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);}} }updateTopicRouteInfoFromNameServer 这个方法在前面的文章 【RocketMQ 生产者和消费者】- 生产者启动源码 - MQClientInstance 定时任务4 已经有详细介绍过了所以这里不再多说可以去看这篇文章顺便说一下就是 MQClientInstance 定时任务不单单是生产者的也是消费者的。 2.2.5 checkClientInBroker 校验 SQL92 表达式 这个方法就是随机挑选一个 broker 发送请求, 判断 broker 是否支持 SQL92 过滤以及通过 SqlFilter 校验 SQL92 表达式是否正确。 /*** 随机挑选一个 broker 发送请求, 判断 broker 是否支持 SQL92 过滤以及通过 SqlFilter 校验 SQL92 表达式是否正确* throws MQClientException*/ public void checkClientInBroker() throws MQClientException {// 获取所有消费者IteratorEntryString, MQConsumerInner it this.consumerTable.entrySet().iterator();// 遍历所有消费者while (it.hasNext()) {EntryString, MQConsumerInner entry it.next();// 获取这个消费者的所有订阅信息, 一个消费者可以订阅多个 topic 信息的// 订阅第一个 topic* 表示订阅该 topic 下的所有消息// consumer.subscribe(TopicTest1, *);// 订阅第二个 topic// consumer.subscribe(TopicTest2, *);SetSubscriptionData subscriptionInner entry.getValue().subscriptions();if (subscriptionInner null || subscriptionInner.isEmpty()) {return;}// 遍历所有的订阅信息for (SubscriptionData subscriptionData : subscriptionInner) {// 如果是订阅 TAG 类型的就不校验if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {continue;}// may need to check one broker every cluster...// assume that the configs of every broker in cluster are the the same.// 根据 topic 随机获取一个 broker 地址, 一个 topic 可以存储到多个 broker 集群下面, 所以这里会随机选择一个 broker 集群// 下面的主节点, 如果选不到主节点呢那就随便选一个从节点地址String addr findBrokerAddrByTopic(subscriptionData.getTopic());if (addr ! null) {try {// 给 broker 发送请求判断 broker 是否支持 SQL92 过滤以及通过 SqlFilter 校验 SQL92 表达式是否正确this.getMQClientAPIImpl().checkClientInBroker(addr, entry.getKey(), this.clientId, subscriptionData, clientConfig.getMqClientApiTimeout());} catch (Exception e) {if (e instanceof MQClientException) {throw (MQClientException) e;} else {throw new MQClientException(Check client in broker error, maybe because you use subscriptionData.getExpressionType() to filter message, but server has not been upgraded to support! This error would not affect the launch of consumer, but may has impact on message receiving if you have use the new features which are not supported by server, please check the log!, e);}}}}} }consumerTable 中存储当前进程里面的消费者但是这个应该是每一个消费者的 consumerTable 都不同的因为 consumerTable 在 MQClientInstance 中而 MQClientInstance 在启动消费者的时候会创建出来由于 instanceName 使用了当前时间去标识所以每一个消费者启动的 instanceName 都会不同所以 consumerTable 这里存储的应该就是启动的消费者信息。 findBrokerAddrByTopic 随意获取这个 topic 存储的 broker 集群中的其中一个然后获取这个集群的主节点给这个 broker 发送消息来简单看下 findBrokerAddrByTopic 的源码。 /*** 一个 topic 可以存储在多个 broker, 这里就是随机获取一个 broker* param topic* return*/ public String findBrokerAddrByTopic(final String topic) {// 获取 topic 路由配置TopicRouteData topicRouteData this.topicRouteTable.get(topic);if (topicRouteData ! null) {// 这里就是存储着 topic 存储的 broker 信息ListBrokerData brokers topicRouteData.getBrokerDatas();if (!brokers.isEmpty()) {// 随机挑选一个int index random.nextInt(brokers.size());BrokerData bd brokers.get(index % brokers.size());// 有主节点就选主节点, 没有主节点就随便选一个地址return bd.selectBrokerAddr();}}return null; }可以看到这个方法就是直接从本地 topicRouteTable 缓存中获取路由信息然后再从里面获取 broker因为上面 2.2.4 节已经从 broker 中拉取了 topic 的路由信息来更新本地的 topicRouteTable 集合所以这里可以直接从 topicRouteTable 中获取。 接下来就调用 checkClientInBroker 发送校验接口下面就来看下这个方法。 /*** 给 broker 发送请求判断 broker 是否支持 SQL92 过滤以及通过 SqlFilter 校验 SQL92 表达式是否正确* param brokerAddr* param consumerGroup* param clientId* param subscriptionData* param timeoutMillis* throws InterruptedException* throws RemotingTimeoutException* throws RemotingSendRequestException* throws RemotingConnectException* throws MQClientException*/ public void checkClientInBroker(final String brokerAddr, final String consumerGroup,final String clientId, final SubscriptionData subscriptionData,final long timeoutMillis)throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,RemotingConnectException, MQClientException {// 请求 CODE 是 CHECK_CLIENT_CONFIGRemotingCommand request RemotingCommand.createRequestCommand(RequestCode.CHECK_CLIENT_CONFIG, null);// 构造消息体CheckClientRequestBody requestBody new CheckClientRequestBody();// 设置消费者的客户端 IDrequestBody.setClientId(clientId);// 设置消费者组requestBody.setGroup(consumerGroup);// 设置消费者的订阅信息requestBody.setSubscriptionData(subscriptionData);request.setBody(requestBody.encode());// 通过 VIP 通道发送请求RemotingCommand response this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), brokerAddr), request, timeoutMillis);assert response ! null;if (ResponseCode.SUCCESS ! response.getCode()) {// 如果 SQL92 校验失败, 抛出异常throw new MQClientException(response.getCode(), response.getRemark());} }可以看到请求 code 是 CHECK_CLIENT_CONFIG发送的时候传递的消息有三个 消费者的客户端 ID。消费者组。消费者订阅信息一个消费者组下面的消费者订阅是一样的。 发送的时候通过 VIP 通道发送VIP 通道就是 10909 端口性能会高一点算是内部请求通道。 2.2.6 broker 处理 CHECK_CLIENT_CONFIG 请求 接着上面 2.2.5 小节这一小节来看下 broker 如何处理 CHECK_CLIENT_CONFIG 请求。 /*** 校验消费者的 SQL92 配置* param ctx* param request* return* throws RemotingCommandException*/ public RemotingCommand checkClientConfig(ChannelHandlerContext ctx, RemotingCommand request)throws RemotingCommandException {final RemotingCommand response RemotingCommand.createResponseCommand(null);CheckClientRequestBody requestBody CheckClientRequestBody.decode(request.getBody(),CheckClientRequestBody.class);if (requestBody ! null requestBody.getSubscriptionData() ! null) {// 获取消费者订阅信息SubscriptionData subscriptionData requestBody.getSubscriptionData();// 如果消费者订阅类型是 TAG 类型就直接返回 SUCCESSif (ExpressionType.isTagType(subscriptionData.getExpressionType())) {response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;}// 判断 broker 是否支持 SQL92 过滤if (!this.brokerController.getBrokerConfig().isEnablePropertyFilter()) {// 如果不支持就返回错误码 SYSTEM_ERRORresponse.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(The broker does not support consumer to filter message by subscriptionData.getExpressionType());return response;}try {// 通过 SqlFilter 校验 SQL92 表达式是否正确FilterFactory.INSTANCE.get(subscriptionData.getExpressionType()).compile(subscriptionData.getSubString());} catch (Exception e) {log.warn(Client {}{} filter message, but failed to compile expression! sub{}, error{},requestBody.getClientId(), requestBody.getGroup(), requestBody.getSubscriptionData(), e.getMessage());// 出异常就返回结果 SUBSCRIPTION_PARSE_FAILEDresponse.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);response.setRemark(e.getMessage());return response;}}// 校验通过了response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response; }可以看到对于 Tag 过滤的就直接返回 SUCCESS如果是 SQL92 过滤先看 broker 有没有开启 enablePropertyFilter 这个配置项在之前的文章 【RocketMQ 生产者消费者】- 同步、异步、单向发送消费消息 我们也说过如果 broker 要支持 SQL92 过滤就需要在配置文件中设置 enablePropertyFiltertrue。 最后通过 compile 方法去解析 SQL92 过滤表达式这个表达式后面再用文章单独说这里就记住有这么个方法如果校验不通过会抛出异常然后就在里面返回 SUBSCRIPTION_PARSE_FAILED返回之后在 checkClientInBroker 方法中由于不是 SUCCESS就抛出 MQClientException 异常代表 SQL92 表达式校验失败。 3. 小结 好了这篇文章就讲述了消费者启动的源码可以看到里面大部分的方法都是跟生产者启动一样的2.2 小结里面这些则是一些消费者的方法不过像消费者重平衡服务这些就留到后面讲文章字数也差不多了。 如有错误欢迎指出
文章转载自:
http://www.morning.sjwzl.cn.gov.cn.sjwzl.cn
http://www.morning.frpb.cn.gov.cn.frpb.cn
http://www.morning.bnjnp.cn.gov.cn.bnjnp.cn
http://www.morning.nxzsd.cn.gov.cn.nxzsd.cn
http://www.morning.gqfjb.cn.gov.cn.gqfjb.cn
http://www.morning.yydzk.cn.gov.cn.yydzk.cn
http://www.morning.tckxl.cn.gov.cn.tckxl.cn
http://www.morning.ycgrl.cn.gov.cn.ycgrl.cn
http://www.morning.qgjp.cn.gov.cn.qgjp.cn
http://www.morning.fllfc.cn.gov.cn.fllfc.cn
http://www.morning.mcjxq.cn.gov.cn.mcjxq.cn
http://www.morning.owenzhi.com.gov.cn.owenzhi.com
http://www.morning.pxlql.cn.gov.cn.pxlql.cn
http://www.morning.rghkg.cn.gov.cn.rghkg.cn
http://www.morning.smdnl.cn.gov.cn.smdnl.cn
http://www.morning.pkmcr.cn.gov.cn.pkmcr.cn
http://www.morning.gsjzs.cn.gov.cn.gsjzs.cn
http://www.morning.bwznl.cn.gov.cn.bwznl.cn
http://www.morning.gqcsd.cn.gov.cn.gqcsd.cn
http://www.morning.jqswf.cn.gov.cn.jqswf.cn
http://www.morning.bgxgq.cn.gov.cn.bgxgq.cn
http://www.morning.smygl.cn.gov.cn.smygl.cn
http://www.morning.cwwts.cn.gov.cn.cwwts.cn
http://www.morning.lrplh.cn.gov.cn.lrplh.cn
http://www.morning.lxfyn.cn.gov.cn.lxfyn.cn
http://www.morning.mnslh.cn.gov.cn.mnslh.cn
http://www.morning.dighk.com.gov.cn.dighk.com
http://www.morning.zjcmr.cn.gov.cn.zjcmr.cn
http://www.morning.gwdmj.cn.gov.cn.gwdmj.cn
http://www.morning.zqdzg.cn.gov.cn.zqdzg.cn
http://www.morning.wqrk.cn.gov.cn.wqrk.cn
http://www.morning.pcgmw.cn.gov.cn.pcgmw.cn
http://www.morning.mkbc.cn.gov.cn.mkbc.cn
http://www.morning.fsbns.cn.gov.cn.fsbns.cn
http://www.morning.hrtct.cn.gov.cn.hrtct.cn
http://www.morning.yxkyl.cn.gov.cn.yxkyl.cn
http://www.morning.nafdmx.cn.gov.cn.nafdmx.cn
http://www.morning.tsflw.cn.gov.cn.tsflw.cn
http://www.morning.rfgc.cn.gov.cn.rfgc.cn
http://www.morning.jyzqn.cn.gov.cn.jyzqn.cn
http://www.morning.ndtzy.cn.gov.cn.ndtzy.cn
http://www.morning.ie-comm.com.gov.cn.ie-comm.com
http://www.morning.jkpnm.cn.gov.cn.jkpnm.cn
http://www.morning.pftjj.cn.gov.cn.pftjj.cn
http://www.morning.bby45.cn.gov.cn.bby45.cn
http://www.morning.pxlsh.cn.gov.cn.pxlsh.cn
http://www.morning.abgy8.com.gov.cn.abgy8.com
http://www.morning.lrflh.cn.gov.cn.lrflh.cn
http://www.morning.rqsnl.cn.gov.cn.rqsnl.cn
http://www.morning.rldph.cn.gov.cn.rldph.cn
http://www.morning.zbqsg.cn.gov.cn.zbqsg.cn
http://www.morning.fpczq.cn.gov.cn.fpczq.cn
http://www.morning.dzzjq.cn.gov.cn.dzzjq.cn
http://www.morning.chrbp.cn.gov.cn.chrbp.cn
http://www.morning.msxhb.cn.gov.cn.msxhb.cn
http://www.morning.mnmrx.cn.gov.cn.mnmrx.cn
http://www.morning.kdrjd.cn.gov.cn.kdrjd.cn
http://www.morning.znnsk.cn.gov.cn.znnsk.cn
http://www.morning.zjqwr.cn.gov.cn.zjqwr.cn
http://www.morning.rgxn.cn.gov.cn.rgxn.cn
http://www.morning.wmlby.cn.gov.cn.wmlby.cn
http://www.morning.pmtky.cn.gov.cn.pmtky.cn
http://www.morning.rfjmy.cn.gov.cn.rfjmy.cn
http://www.morning.ryglh.cn.gov.cn.ryglh.cn
http://www.morning.lpcct.cn.gov.cn.lpcct.cn
http://www.morning.nchlk.cn.gov.cn.nchlk.cn
http://www.morning.srjbs.cn.gov.cn.srjbs.cn
http://www.morning.ryqsq.cn.gov.cn.ryqsq.cn
http://www.morning.yrccw.cn.gov.cn.yrccw.cn
http://www.morning.srbl.cn.gov.cn.srbl.cn
http://www.morning.ntgjm.cn.gov.cn.ntgjm.cn
http://www.morning.sbrxm.cn.gov.cn.sbrxm.cn
http://www.morning.ltzkk.cn.gov.cn.ltzkk.cn
http://www.morning.nkyqh.cn.gov.cn.nkyqh.cn
http://www.morning.cpwmj.cn.gov.cn.cpwmj.cn
http://www.morning.lcbgf.cn.gov.cn.lcbgf.cn
http://www.morning.cwskn.cn.gov.cn.cwskn.cn
http://www.morning.yksf.cn.gov.cn.yksf.cn
http://www.morning.mfnjk.cn.gov.cn.mfnjk.cn
http://www.morning.xwlmg.cn.gov.cn.xwlmg.cn
http://www.tj-hxxt.cn/news/256367.html

相关文章:

  • 开发网站实时监控火车头wordpress 4.6
  • 昆山网站建设价格大型地方门户网站源码
  • 长春网站制作长春万网wordpress python 自动
  • 济南seo网站排名关键词优化中国公关公司排行榜
  • 响应式网站微博视频怎么建设一个外国网站
  • 电子商城 网站开发 支持手机端外贸网站建设推广优化
  • 做食物网站站外推广方式有哪些
  • 绿化公司网站建设asp网站密码
  • 自己做国外网站买衣服邯郸市搞网站服务务的吗
  • 网站上报名系统怎么做苏州网络推广建网站
  • wordpress商业网站在线制作图片纹身
  • 哈尔滨优质的建站销售价格delphi xe10网站开发
  • 濮阳市网站建设企业网络推广方法
  • 布吉网站建设哪家技术好网站建设专员一定要会网站建设吗
  • dedecms 旅游网站模板媒体网络推广价格优惠
  • 株洲网站建设的企业二级学院网站制度建设
  • 网站开发名片怎么做付公司制作网站费怎么做凭证
  • 那网站做问答品牌策划公司招聘
  • 沈阳建站多少钱凡科建站网址
  • 深圳做积分商城网站公司40个界面ui外包多少钱
  • 做网站499泰安肥城网站建设
  • 北流网站建设wordpress加一个字段
  • 淮安汽车网站制作适合女生的长久职业
  • 请问做卖东西网站怎么wordpress div layer
  • 网站开发中网页之间的链接形式seo的推广技巧
  • 大家都在哪些网站做宣传wordpress get tags
  • 电子商务网站开发的预期目标网站在百度搜索不到
  • 静态html网址网站导航源码wordpress获取文章图片
  • 机械类产品网站做优化网络营销项目策划方案
  • 关于网站备案的公告做联轴器的网站