当前位置: 首页 > news >正文

海南智能网站建设设计清远专业网站建设服务

海南智能网站建设设计,清远专业网站建设服务,wordpress怎么让手机端好看,网站建设群标签好写什么kafka进阶 消息顺序保证 Kafka它在设计的时候就是要保证分区下消息的顺序#xff0c;也就是说消息在一个分区中的顺序是怎样的#xff0c;那么消费者在消费的时候看到的就是什么样的顺序。 消费者和分区的对应关系 参考这篇文章。 分区文件 一个分区对应着log.dirs下的…kafka进阶 消息顺序保证 Kafka它在设计的时候就是要保证分区下消息的顺序也就是说消息在一个分区中的顺序是怎样的那么消费者在消费的时候看到的就是什么样的顺序。 消费者和分区的对应关系 参考这篇文章。 分区文件 一个分区对应着log.dirs下的一个子目录例如主题test1的0号分区其对应目录的内容为 ls test1-0 00000000000000000000.index 00000000000000000000.log 00000000000000000000.timeindex 分别有日志文件和索引文件。 Kafka解决查询效率的手段之一是将数据文件分段比如有100条Message它们的offset是从0到99。假设将数据文件分成5段第一段为0-19第二段为20-39以此类推每段放在一个单独的数据文件里面数据文件以该段中最小的offset命名。这样在查找指定offset的Message的时候用二分查找就可以定位到该Message在哪个段中。 数据文件分段使得可以在一个较小的数据文件中查找对应offset的Message了但是这依然需要顺序扫描才能找到对应offset的Message。为了进一步提高查找的效率Kafka为每个分段后的数据文件建立了索引文件文件名与数据文件的名字是一样的只是文件扩展名为.index。 索引文件中包含若干个索引条目每个条目表示数据文件中一条Message的索引。索引包含两个部分均为4个字节的数字分别为相对offset和position。 相对offset因为数据文件分段以后每个数据文件的起始offset不为0相对offset表示这条Message相对于其所属数据文件中最小的offset的大小。举例分段后的一个数据文件的offset是从20开始那么offset为25的Message在index文件中的相对offset就是25-20 5。存储相对offset可以减小索引文件占用的空间。 position表示该条Message在数据文件中的绝对位置。只要打开文件并移动文件指针到这个position就可以读取对应的Message了。 index文件中并没有为数据文件中的每条Message建立索引而是采用了稀疏存储的方式每隔一定字节的数据建立一条索引。这样避免了索引文件占用过多的空间从而可以将索引文件保留在内存中。但缺点是那些没有建立索引的Message也不能一次定位到其在数据文件中的位置从而需要做一次局部的顺序扫描但是这次顺序扫描的范围就很小了。 总的来说logindex的设计方式类似于hdfs上的MapFileMapFile也是通过“定位到大致位置”“局部顺序扫描”来快速定位的。 分区副本 分区有多个副本其中一个是leader副本leader副本通过特定的策略选举产生其他是follower副本。读写操作均由leader副本处理follower副本仅仅是从leader副本处把新的消息同步过来这个过程有一定延迟所以follower副本的消息可能略少于leader副本这在一定阈值范围内是可以容忍的。 kafka的主从副本机制与mysql的对比 1、由于只从leader副本处读写kafka的分区副本并不支持负载均衡而纯粹是一种高可靠设计。mysql的slave是可以分摊查询压力的。由此也可看出kafka分区副本的一致性保证要强于mysql的读写分离。 2、为保证生产者的效率leader和follower之间是异步同步不会因为某个follower太慢拖慢整个集群。这点跟mysql的master/slave异步同步是相似的。 3、leader宕机会从followerfollower副本一般会放到不同的机器上中选举新的leader可以认为是不存在单点问题的。mysql不会从slave中选举新的master而是通过双主双活近似于主备机措施来保证master可用。 消费者位置offset 消费者在消费的过程中需要记录自己消费了多少数据即消费位置信息。在Kafka中这个位置信息有个专门的术语位移(offset)。很多消息引擎都把这部分信息保存在服务器端(broker端)。这样做的好处当然是实现简单但会有三个主要的问题 broker从此变成有状态的会影响伸缩性需要引入应答机制(acknowledgement)来确认消费成功。由于要保存很多consumer的offset信息必然引入复杂的数据结构造成资源浪费。 而Kafka选择了不同的方式每个consumer group保存自己的位移信息那么只需要简单的一个整数表示位置就够了同时可以引入checkpoint机制定期持久化简化了应答机制的实现。 老版本0.8及之前的位移是提交到zookeeper中的目录结构是/consumers/group.id/offsets//但是zookeeper其实并不适合进行大批量的读写操作尤其是写操作。因此kafka提供了另一种解决方案增加consumer offsets topic将offset信息写入这个topic摆脱对zookeeper的依赖。__consumer_offsets中的消息保存了每个consumer group某一时刻提交的offset信息结构大概是 group id topic-partition offset 我们可以使用bin/kafka-consumer-groups.sh 来查看offset。 列出基于java consumer API访问的所有consumer group ./kafka-consumer-groups.sh --bootstrap-server xx.xx.xx.xx:9092 --list列出基于zk访问的consumer groupkafka的最新版本里已没有–zookeeper选项了 ./kafka-consumer-groups.sh --zookeeper localhost:2181 --list查看某consumer group的offset ./kafka-consumer-groups.sh --bootstrap-server xx.xx.xx.xx:9092 --describe --group uniquelip1 输出是这样的 TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID mykafka 0 0 0 0 - heyHaha1 0 1974 30887 28913 - 记录了该consumer组commit过注意不是消费过而是commit过的各topic下分区的当前位置CURRENT-OFFSET及最后一个消息的位置LOG-END-OFFSET简称LEO enable.auto.commit 特别注意enable.auto.commit默认是开启的所以如果要手工控制offset必须显式关闭 props.put(enable.auto.commit, false);建议显式关闭因为commit动作必须在消息真正被业务消费之后才能执行否则在异常情况下可能导致消息丢失。比如提前commit但此刻消息只处理了部分这时进程core掉即使随后进程重启由于offset已被commit成最新的值new-posnew-pos之前可能有部分消息已经无法处理了。 auto.offset.reset值含义解释 三个值earliest、latest、none earliest 当各分区下有已提交的offset即CURRENT-OFFSET有效下同时从提交的offset开始消费 无提交的offset时例如该topic尚未被当前consumer组消费从头开始消费 latest 当各分区下有已提交的offset时从提交的offset开始消费 无提交的offset时等待消费新产生的数据对已存在的历史数据会置之不理。注意若关闭了enable.auto.commit此时查看的offset的位置如下 TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID test4 0 - 5 - consumer-1-b4c5c879-c649-4198-9e4c-13735e9eaab4 /10.180.53.159 consumer-1 我们发现CURRENT-OFFSET的值是-并非一个有效值。 none topic各分区都存在已提交的offset时从offset后开始消费 只要有一个分区不存在已提交的offset则抛出异常 Undefined offset with no reset policy for partition: test4-0 经测试auto.offset.reset默认是latest。 offset小结 offset是一个客户端的概念而非服务端概念每个consumer组都有自己独立的offset互不干扰只是offset信息会存在服务端的磁盘里而已。 生产者已经往某个topic插入数据但consumer却从该topic取不到数据有两种可能 1、CURRENT-OFFSET等于LEO说明该consumer所在的组已经消费过消息了看看是否忽略了“enable.auto.commit默认是开启”的情况 2、consumer访问的是一个之前未访问过的topic该topic有历史数据但无新的数据到来由于auto.offset.reset默认是latestconsumer会挂住等待新的消息。 enable.auto.commit 默认为truekafka后台会每隔5s默认值可以通过auto_commit_interval_ms修改自动提交一次offset。所以如果用auto commit选项消费和commit之间实际上是有延迟的。 实际使用中consumer定期poll且auto.offset.resetlatest且开启了auto commit则有可能在消费之后commit之前consumer group就会因没有初始offset信息而被kafka后台干掉从而导致后续每次都poll不到消息。为解决该问题此时应该手工commit。 特别说明一下如果consumer group没有消耗任何消息就显式commit且auto.offset.resetlatest此时CURRENT-OFFSET会被设置为LEO。 consumer group consumer构成的组。组中的consumer会被分配到不同的partition上。因此一条消息只会被Consumer Group中的一个Consumer消费。 关于producer producer的send方法是异步的它会将ProducerRecord缓存起来然后由单独的sender线程批量发送。 KafkaProducer是线程安全的且按照官方文档多线程共享一个producer实例性能更好。 我们看KafkaProducer有一个close方法其中所做的事情包含 将缓存里的消息清空 关闭sender线程。 所以,KafkaProducer其实是一个运算资源的集合体。 性能测试脚本(0.8版本) ./kafka-producer-perf-test.sh --broker-list localhost:9092 --messages 1000000 --topics test1 --threads 6 --message-size 1000 --batch-size 200 --compression-codec 0 参数说明 --messages Long: count The number of messages to send or consume (default: 9223372036854775807) --threads Integer: number of threads Number of sending threads. (default: 1)可令线程数等于分区数--message-size Integer: size The size of each message. (default: 100) --batch-size Integer: size Number of messages to write in a single batch. (default: 200) --compression-codec Integer: If set, messages are sent compressed supported codec: NoCompressionCodec (default: 0) as 0, GZIPCompressionCodec as 1, SnappyCompressionCodec as 2, LZ4CompressionCodec as 3 kafka调优 producer总体架构 有几点需注意 producer的send只是把消息放到发送缓存里会有单独的发送线程把消息从socket发送给kafka服务器这也说明producer必然是线程安全的发送缓存使用了内存池机制它维护了一个freelist里面都是batch.size大小的内存块一旦涉及内存分配或回收只是简单的从这个freelist里pop或push内存块几乎不涉及java堆分配自然也无需gc发送线程会等到一个batch.size大小的内存块被消息填满才发送这样可提升produce效率。一个内存块对应一个分区所以同一时间可能有多个内存块从socket发送给不同的节点整个发送缓存的大小由buffer.memory参数指定。 produce调优总结 我们看produce的“远端执行”性能数据有几个结论 producer建议做成单例在多线程间共享多个producer实例会将发送的消息总量分摊掉造成的结果是一方面由于多producer的分摊消息不能及时发送因为每个producer有batch.size的约束另一方面多个producer的消息可能堆在一起发送产生大量的网络IO。指示发送缓存大小的batch.size不能设置太小否则严重影响发送效率。这点可跟写文件时的进程内缓存做一类比实际上batch.size的默认值为16k也是诸实现中写文件时进程内缓存的常用大小一般是8k或16k。acks1表示仅等待分区的leader副本返回而非所有的isr副本返回虽然效率提升了但可能存在单点风险。compression.typesnappy或lz4可极大提升produce效率压缩之后意味着相同的batch.size内存块可以发送更多的消息提升了吞吐量。 顺带说一下压缩的问题kafka producer支持三种压缩格式 gzip、snappy和lz4 压缩比方面三者的关系是 gzip lz4 snappy gzip压缩比最高。 压缩解压效率方面则反过来 gzip lz4 snappy snappy压缩解压效率最高。 其中gzip是jdk自带的lz4 是kafka自带的snappy则需要额外的snappy-java包才可支持。lz4和snappy压缩库的协议都是apache 2.0商业友好。 还有一些可能的性能提升点 控制消息的大小避免超过batch.size的值。因为producer内部有一个BufferPool消息所需内存空间的分配不是用new而是分配自该pool从而避免gc。一旦消息大小超过batch.size意味着无法使用pool只能new从而带来gc开销。 buffer.memory和batch.size 我们在producer config里指定的两个参数 props.put(buffer.memory, 33554432); props.put(batch.size, 16384);最终是用于构造BufferPool //totalSize就是buffer.memory, batchSize就是batch.size this.free new BufferPool(totalSize, batchSize, metrics, time, metricGrpName);则可知buffer.memory就是缓存池的内存总量而batchSize则是缓存池里每个内存块的大小这些内存块构成了一个free链表。BufferPool.allocate里计算可用空间的几行代码证明了这一点 //poolableSize就是batch.sizesize是待分配的大小 int freeListSize this.free.size() * this.poolableSize; //availableMemory是原始的、还未分配过的内存量 if (this.availableMemory freeListSize size) {// we have enough unallocated or pooled memory to immediately// satisfy the requestfreeUp(size);this.availableMemory - size;......return ByteBuffer.allocate(size); }这里有个疑问似乎producer将batch.size作为标准大小如果待分配的内存量不是这个标准值就new之关于这点也可从deallocate实现里取得印证 public void deallocate(ByteBuffer buffer, int size) {lock.lock();try {if (size this.poolableSize size buffer.capacity()) {buffer.clear();this.free.add(buffer);} else {this.availableMemory size;}Condition moreMem this.waiters.peekFirst();if (moreMem ! null)moreMem.signal();} finally {lock.unlock();}}只有待回收的内存量恰好等于batch.size才会放入free队列供后续复用。如不等的话只是简单的增加availableMemory的大小实际走的还是jvm的gc释放流程。 压缩 kafka自带支持的是gzip和lz4压缩snappy要额外的库支持,这是MemoryRecordsBuilder类里的相关代码 private static MemoizingConstructorSupplier snappyOutputStreamSupplier new MemoizingConstructorSupplier(new ConstructorSupplier() {Overridepublic Constructor get() throws ClassNotFoundException, NoSuchMethodException {return Class.forName(org.xerial.snappy.SnappyOutputStream).getConstructor(OutputStream.class, Integer.TYPE);}});private static MemoizingConstructorSupplier snappyInputStreamSupplier new MemoizingConstructorSupplier(new ConstructorSupplier() {Overridepublic Constructor get() throws ClassNotFoundException, NoSuchMethodException {return Class.forName(org.xerial.snappy.SnappyInputStream).getConstructor(InputStream.class);}}); 其中snappyOutputStreamSupplier用于producer的压缩snappyInputStreamSupplier则用于consumer的解压。 关于几种压缩算法有人做了验证结论如下 1不管对于大量数据或是少量数据压缩性能snappy都是最佳只是在数据量大的情况下压缩性能略慢于lz4。通过对比还是可以发现snappy要优于lz4难怪hadoop选用snappy。 2从压缩比来看无疑xz是最出色的而且解压速度相对于压缩比来说也是相当可观就是压缩太慢。追加高压缩比对解压速度有要求的可以使用看看。(xz和common xz其实是一样的) 3综合来看jdk gzip和common zip不论是压缩比和解压性能都不错对于解压和压缩比有要求的可以使用但仔细分析发现common gzip更善于处理大数据量的压缩。 4最后bzip2压缩比还可以但是压缩和解压速度都偏慢。 consumer总体架构 总体结构比较简单大致流程是 consumer先通过Coordinator与服务端交互完成rebalance操作多个consumer构成一个ConsumerGrouprebalance相当于组内的负载均衡rebalance之后哪些分区分配给哪个consumer就确定了这时fetcher要准备从服务端拿消息了但还有两个关键参数要告诉服务端一是分区上次提交的offset即我要从哪个位置开始读二是我最多要读多大数据量。前者由Coordinator从服务端的offset topic里获得,后者则由max.partition.fetch.bytes参数指定。 consumer调优总结 测试下来有几点 max.partition.fetch.bytes并非越大越好需均衡考虑producer的写入速度及网络IO的开销设置过大会导致等待时间较长且一次网络传输量较大 实测使用默认值1M较好。合理设置“心跳超时”和“连续poll调用间隔超时”kafka 0.10.1版本之前这两种监测是用同一个参数session.timeout.ms表示0.10.1之后则分别用session.timeout.ms心跳超时和max.poll.interval.ms连续poll调用间隔超时表示。0.10.1版本之后我们可将session.timeout.ms设小点默认是10s确保服务端尽快监测到consumer挂掉同时可将max.poll.interval.ms设大点默认300s争取更多的消息处理时间。但0.10.1版本之前我们不能为了更长的处理时间而把session.timeout.ms调的过大那样服务端无法快速监测到consumer挂掉的情况反而导致费时甚久的消息处理白做了。connections.max.idle.ms是连接空闲关闭时间但consumer似乎有重连机制即使我们故意延迟超出这个时间依然可以poll出数据 max.partition.fetch.bytes consumer向kafka server申请数据需构造FetchRequest这里有两个关键参数要告诉服务端一是分区offset即我要从哪个位置开始读二是我要读多大数据量。前者可从kafka的offset topic里获得我们可以得到最近一次consumer提交的该分区的位置。后者就是由max.partition.fetch.bytes指定。涉及的代码是 Fetcher.javaprivate MapNode, FetchRequest.Builder createFetchRequests() {// 获得kafka集群的信息Cluster cluster metadata.fetch();MapNode, LinkedHashMapTopicPartition, FetchRequest.PartitionData fetchable new LinkedHashMap();for (TopicPartition partition : fetchablePartitions()) {//找到leader因为只有leader副本支持读写Node node cluster.leaderFor(partition);if (node null) {metadata.requestUpdate();} else if (this.client.pendingRequestCount(node) 0) {// if there is a leader and no in-flight requests, issue a new fetchLinkedHashMapTopicPartition, FetchRequest.PartitionData fetch fetchable.get(node);if (fetch null) {fetch new LinkedHashMap();fetchable.put(node, fetch);}//得到该分区上次commit的位置并将其作为本次fetch的起始offset//fetchSize就是max.partition.fetch.bytes,//表示每次fetch的最大字节数long position this.subscriptions.position(partition);fetch.put(partition, new FetchRequest.PartitionData(position, this.fetchSize));session.timeout.ms和max.poll.interval.ms 两者的差异stackoverflow上有帖子解释总结下来有几点 kafka consumer有两种超时检查一是心跳监测、二是连续poll调用间隔监测前者预防consumer挂掉包括网络断链后者预防consumer处理的太慢。kafka 0.10.1版本之前这两种监测是用一个参数session.timeout.ms表示的0.10.1之后则分别用session.timeout.ms心跳监测超时和max.poll.interval.ms连续poll调用间隔超时表示。这样修改的原因是保证尽快检测到consumer挂掉的同时允许更长的消息处理耗时。修改的方法则是为心跳监测单独起一个线程跟消息处理线程隔离开。 为何要有两种超时监测机制我估计还是kafka的ConsumerGroup支持组内负载均衡的缘故上述两种超时间隔一旦达到服务端就认为该consumer会拖慢整体性能无论挂掉还是消息处理慢都会拖整个ConsumerGroup的后腿就会从ConsumerGroup中移除该consumer并启动consumer rebalance(注意rebalance由服务端触发且视情况有必要才触发)但如果仅仅是消息处理太慢而非网络连接问题我们不排除rebalance后重新选择旧的consumer的可能。 下面是在0.10.2下触发poll调用间隔超时的代码我们设置poll查询间隔超时为10s: private static KafkaConsumerString, String getConsumer(boolean fromBeginning){......Properties props new Properties();props.put(bootstrap.servers, ip :9092);props.put(group.id, uniquelip1);props.put(enable.auto.commit, false);props.put(session.timeout.ms, 10000);props.put(max.poll.interval.ms, 10000); //设置poll查询间隔超时为10sprops.put(key.deserializer, org.apache.kafka.common.serialization.StringDeserializer);props.put(value.deserializer, org.apache.kafka.common.serialization.StringDeserializer);KafkaConsumerString, String consumer new KafkaConsumerString, String(props);return consumer;}然后强行在poll和commit间sleep 20s从而触发“poll调用间隔超时”: private static void consume(String topic, int expectNum){boolean hasPrinted false;try (KafkaConsumerString, String consumer getConsumer(true)){int total 0;consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener(){public void onPartitionsRevoked(CollectionTopicPartition partitions){LOGGER.info(before rebalance,after consumer poll, topic:{},partitions.stream().map(x - x.toString()).collect(Collectors.joining(;)));}public void onPartitionsAssigned(CollectionTopicPartition partitions){LOGGER.info(after rebalance,before consumer poll, topic:{},partitions.stream().map(x - x.toString()).collect(Collectors.joining(;)));}});while (total expectNum){ConsumerRecordsString, String records consumer.poll(1000);total records.count();LOGGER.info(consume {} msgs now, total);if (!hasPrinted records.count() 0){System.out.println(records.iterator().next().value());hasPrinted true;}Util.safeSleep(20000);LOGGER.info(sleep 20s);try{consumer.commitSync();}catch(Exception e){e.printStackTrace(new LogPrintWriter(LOGGER));}}System.out.println(total total msgs consumed);}}打印错误如下 org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the timebetween subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeo ut or by reducing the maximum size of batches returned in poll() with max.poll.records.kafka 0.9版本下报的错则是 commit offsets exception: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.我们看到这两段信息就是一字之差从0.9的“This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms”改为0.10的“This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms”。从侧面也印证了前面的说法。 注意我们的测试代码为consumer增加了ConsumerRebalanceListener可以侦测到服务端的consumer rebalance日志中会反复打印如下信息 before rebalance,after consumer poll, topic:test1-0;test1-5;test1-2;test1-1;test1-4;test1-3after rebalance,before consumer poll, topic:test1-0;test1-5;test1-2;test1-1;test1-4;test1-3而且我们的consumer在commit失败后下一次poll仍能查出数据仍是老的数据因commit失败了说明旧的consumer依然被服务端rebalance到了只要随后的消息处理不超时consumer仍能正常的消费后续消息。 还有一个疑问既然是“poll调用间隔超时”那为何会在commit时抛出异常而不是下次poll之时呢我们注意到rebalance动作其实是发生在commit之前的其实是在consumer的poll函数里这点从日志可以看出 2017-12-25 14:31:13 [ main ] - [ INFO ] Revoking previously assigned partitions [test1-0, test1-5, test1-2, test1-1, test1-4, test1-3] for group uniquelip1 2017-12-25 14:31:13 [ main ] - [ INFO ] before rebalance,after consumer poll, topic:test1-0;test1-5;test1-2;test1-1;test1-4;test1-3 2017-12-25 14:31:13 [ main ] - [ INFO ] (Re-)joining group uniquelip1 2017-12-25 14:31:13 [ main ] - [ INFO ] Successfully joined group uniquelip1 with generation 21 2017-12-25 14:31:13 [ main ] - [ INFO ] Setting newly assigned partitions [test1-0, test1-5, test1-2, test1-1, test1-4, test1-3] for group uniquelip1 2017-12-25 14:31:13 [ main ] - [ INFO ] after rebalance,before consumer poll, topic:test1-0;test1-5;test1-2;test1-1;test1-4;test1-3 2017-12-25 14:31:13 [ main ] - [ INFO ] consume 2500 msgs now 2017-12-25 14:31:33 [ main ] - [ INFO ] sleep 20s 2017-12-25 14:31:33 [ main ] - [ ERROR ] org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the timebetween subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeo ut or by reducing the maximum size of batches returned in poll() with max.poll.records. 2017-12-25 14:31:33 [ main ] - [ ERROR ] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:698) 2017-12-25 14:31:33 [ main ] - [ ERROR ] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:577) 2017-12-25 14:31:33 [ main ] - [ ERROR ] at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1091) 2017-12-25 14:31:33 [ main ] - [ ERROR ] at KafkaTest.consume(KafkaTest.java:229) 2017-12-25 14:31:33 [ main ] - [ ERROR ] at KafkaTest.main(KafkaTest.java:70)也就是说commit时server的rebalance已经发生了此时consumer和server的状态已经不一致了在server看来你这个consumer干的太慢我早就把你的活分给其他consumer这部分活就不让你干了如果你继续commit offset有可能导致多个consumer同时操作同一个分区这是kafka的基本原则所不允许的所以server端只能在commit时报错而不会容忍你的错误到下一次的poll。从最终的结果看commit失败意味着我这个consumer所做的一切消息处理都白做了。 如果我们已经合理的设置了连续poll调用间隔超时的值接下来只能通过 1、控制consume速度通过调小max.poll.records 2、优化消息处理效率 这两种方法来保证消息处理耗时在连续poll调用间隔超时之内了。 但如果这样做还不能完全避免“连续poll调用超时”又该如何应对 首先由于连续poll调用间隔超时必然导致服务端rebalance原本由consumer A处理的partition很可能被分配给其他的consumer这会导致consumerA commit失败一个分区同一时间只能被ConsumerGroup中的一个consumer消费则consumer A已经处理尚未commit的那部分消息也会被其他consumer重复处理。如果我们没法将“消息处理”“offset commit”放在一个事务中通过事务回滚来回退消息就只能依赖业务侧来提供去重机制例如全局唯一主键或者干脆允许消息重复。 同时我们也要考虑最极端的情况即消息处理出现大量积压导致反复出现“连续poll调用超时”此时建议调用consumer.unsubscribe让consumer退订停止从kafka拿消息,直到消息积压的情况有所改善再重新订阅拿消息。
http://www.tj-hxxt.cn/news/227116.html

相关文章:

  • 建设物业公司网站网页怎么制作超链接
  • 建站公司佛山网站建立定位企划
  • 重庆做网站公司有哪些沧州网络公司有哪些
  • 建设手机网站例seo发贴软件
  • 提供邯郸企业建网站网址怎么做快捷方式
  • 网络创作网站php语言做购物网站
  • 免费ui设计网站沧县做网站价格
  • 网站不用下载免费软件制作wordpress模板教程
  • 免费网站seo企业网站排名软件能优化
  • 网站建设技术咨询协议鲜花销售网站建设策划表
  • 网站备案接口wordpress悬浮搜索框
  • 上海翼成信息科技有限公司做的什么网站建设外贸网站公司
  • 济南做网站个人免费漫画软件
  • 权威的南通网站建设网络营销专业就业
  • 江西企业网站建设电话过年做哪个网站致富
  • 湖南省建设厅官方网站卓越亚马逊网站建设目的
  • 网上接网站开发类订单的平台百度seo视频教程
  • 全国有哪些做服装的网站网址大全123官方网站
  • 制作科技网站首页简历中建设网站的项目经历
  • 网站建设友汇做一个网站需要哪些
  • 临沂seo网站管理女孩子奖励自己的资料
  • 各大网站wordpress 管理系统
  • 深圳市专业的做网站深圳公司注册网上流程
  • 站酷网图片登封郑州网站建设
  • 易车网汽车之家福州seo推广外包
  • 赣州网站建设优化服务网络科技公司怎么赚钱
  • 哪个网站可以做室内设计做网站分辨率多少钱
  • 域名和网站建设网站开发技术课程设计总结
  • 网站搜索引擎优化主要方法学校网站 建设
  • 网站建设架构 服务器科学小制作