装修设计公司网站,家在深圳光明业主论坛,wordpress文章图片不显示,淘宝网首页电脑版登录入口目录 一、Range分区分配策略原理1.1、Range分区分配策略原理的示例一1.2、Range分区分配策略原理的示例二1.3、Range分区分配策略原理的示例注意事项 二、Range 分区分配策略代码案例2.1、创建带有4个分区的fiveTopic主题2.2、创建三个消费者 组成 消费者组2.3、创建生产者2.4、… 目录 一、Range分区分配策略原理1.1、Range分区分配策略原理的示例一1.2、Range分区分配策略原理的示例二1.3、Range分区分配策略原理的示例注意事项 二、Range 分区分配策略代码案例2.1、创建带有4个分区的fiveTopic主题2.2、创建三个消费者 组成 消费者组2.3、创建生产者2.4、测试2.5、Range 分区分配策略代码案例说明 三、Range 分区分配再平衡案例3.1、停止某一个消费者后45s 以内重新发送消息示例3.2、停止某一个消费者后45s 以后重新发送消息示例3.3、Range 分区分配再平衡案例说明 一、Range分区分配策略原理
Range 是对每个 topic 而言的。首先对同一个 topic 里面的分区按照序号进行排序并对消费者按照字母顺序进行排序。
1.1、Range分区分配策略原理的示例一 假如现在有 4 个分区3 个消费者排序后的分区将会是0,1,2,3消费者排序完之后将会是C0,C1,C2。 通过 partitions数/consumer数 来决定每个消费者应该消费几个分区。 如果除不尽那么前面几个消费者将会多消费 1 个分区。例如4/3 1 余 1 除不尽那么消费者C0便会多消费1个分区。
1.2、Range分区分配策略原理的示例二 假如现在有 5 个分区3 个消费者排序后的分区将会是0,1,2,3,4消费者排序完之后将会是C0,C1,C2。 通过 partitions数/consumer数 来决定每个消费者应该消费几个分区。 如果除不尽那么前面几个消费者将会多消费 1 个分区。例如5/3 1 余 2 除不尽那么消费者么C0和C1分别多消费一个分区。
1.3、Range分区分配策略原理的示例注意事项
如果只是针对 1 个 topic 而言C0消费者多消费1个分区影响不是很大。但是如果有N多个topic那么针对每个 topic消费者 C0都将多消费 1 个分区topic越多C0消费的分区会比其他消费者明显多消费 N 个分区。 容易产生数据倾斜
二、Range 分区分配策略代码案例
2.1、创建带有4个分区的fiveTopic主题 在 Kafka 集群控制台创建带有4个分区的fiveTopic主题 bin/kafka-topics.sh --bootstrap-server 192.168.136.27:9092 --create --partitions 4 --replication-factor 1 --topic fiveTopic2.2、创建三个消费者 组成 消费者组 复制 CustomConsumer1类创建 CustomConsumer2和CustomConsumer3。这样可以由三个消费者组成消费者组组名都为“test”。 package com.xz.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;public class CustomConsumer1 {public static void main(String[] args) {// 0 配置Properties properties new Properties();// 连接 bootstrap.serversproperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092);// 反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 配置消费者组idproperties.put(ConsumerConfig.GROUP_ID_CONFIG,test);// 1 创建一个消费者 , helloKafkaConsumerString, String kafkaConsumer new KafkaConsumer(properties);// 2 订阅主题 firstArrayListString topics new ArrayList();topics.add(fiveTopic);kafkaConsumer.subscribe(topics);// 3 消费数据while (true){ConsumerRecordsString, String consumerRecords kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecordString, String consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}
}2.3、创建生产者 创建CustomProducer生产者。 package com.xz.kafka.producer;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;public class CustomProducerCallback {public static void main(String[] args) throws InterruptedException {//1、创建 kafka 生产者的配置对象Properties properties new Properties();//2、给 kafka 配置对象添加配置信息bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092);//3、指定对应的key和value的序列化类型 key.serializer value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//4、创建 kafka 生产者对象KafkaProducerString, String kafkaProducer new KafkaProducer(properties);//5、调用 send 方法,发送消息for (int i 0; i 200; i) {kafkaProducer.send(new ProducerRecord(fiveTopic, hello kafka i), new Callback() {Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception null){System.out.println(主题 metadata.topic() 分区 metadata.partition());}}});Thread.sleep(2);}// 3 关闭资源kafkaProducer.close();}
}2.4、测试 首先在 IDEA中分别启动消费者1、消费者2和消费者3代码 然后在 IDEA中分别启动生产者代码 在 IDEA 控制台观察消费者1、消费者2和消费者3控制台接收到的数据如下图所示
2.5、Range 分区分配策略代码案例说明
由上述测试输出结果截图可知 消费者1消费2分区的数据消费者2消费0和3分区的数据消费者3消费2分区的数据。说明Kafka 默认的分区分配策略就是 Range CooperativeSticky所以不需要修改策略。
三、Range 分区分配再平衡案例
3.1、停止某一个消费者后45s 以内重新发送消息示例
由下图控制台输出可知2号消费者 消费到 0、3号分区数据。由下图控制台输出可知3号消费者 消费到 1号分区数据。
3.2、停止某一个消费者后45s 以后重新发送消息示例
由下图控制台输出可知2号消费者 消费到 0、3号分区数据。 由下图控制台输出可知3号消费者 消费到 1、2号分区数据。
3.3、Range 分区分配再平衡案例说明
1号消费者挂掉后消费者组需要按照超时时间 45s 来判断它是否退出所以需要等待时间到了 45s 后判断它真的退出就会把任务分配给其他 broker 执行。消费者1 已经被踢出消费者组所以重新按照 range 方式分配。 文章转载自: http://www.morning.ymbqr.cn.gov.cn.ymbqr.cn http://www.morning.dxrbp.cn.gov.cn.dxrbp.cn http://www.morning.tslwz.cn.gov.cn.tslwz.cn http://www.morning.srgbr.cn.gov.cn.srgbr.cn http://www.morning.bgrsr.cn.gov.cn.bgrsr.cn http://www.morning.zfhzx.cn.gov.cn.zfhzx.cn http://www.morning.yggwn.cn.gov.cn.yggwn.cn http://www.morning.ltrms.cn.gov.cn.ltrms.cn http://www.morning.cytr.cn.gov.cn.cytr.cn http://www.morning.ffydh.cn.gov.cn.ffydh.cn http://www.morning.pybqq.cn.gov.cn.pybqq.cn http://www.morning.dqrpz.cn.gov.cn.dqrpz.cn http://www.morning.xfwnk.cn.gov.cn.xfwnk.cn http://www.morning.xnnpy.cn.gov.cn.xnnpy.cn http://www.morning.zxdhp.cn.gov.cn.zxdhp.cn http://www.morning.pdwzr.cn.gov.cn.pdwzr.cn http://www.morning.qxwrd.cn.gov.cn.qxwrd.cn http://www.morning.tnhmp.cn.gov.cn.tnhmp.cn http://www.morning.hyfrd.cn.gov.cn.hyfrd.cn http://www.morning.hdrrk.cn.gov.cn.hdrrk.cn http://www.morning.rfwgg.cn.gov.cn.rfwgg.cn http://www.morning.nqgds.cn.gov.cn.nqgds.cn http://www.morning.kjgrg.cn.gov.cn.kjgrg.cn http://www.morning.zdxinxi.com.gov.cn.zdxinxi.com http://www.morning.nmkfy.cn.gov.cn.nmkfy.cn http://www.morning.ztcwp.cn.gov.cn.ztcwp.cn http://www.morning.kjgrg.cn.gov.cn.kjgrg.cn http://www.morning.gjsjt.cn.gov.cn.gjsjt.cn http://www.morning.rqbr.cn.gov.cn.rqbr.cn http://www.morning.xbyyd.cn.gov.cn.xbyyd.cn http://www.morning.gstg.cn.gov.cn.gstg.cn http://www.morning.plqsz.cn.gov.cn.plqsz.cn http://www.morning.cmfkp.cn.gov.cn.cmfkp.cn http://www.morning.nzhzt.cn.gov.cn.nzhzt.cn http://www.morning.dqkrf.cn.gov.cn.dqkrf.cn http://www.morning.yunease.com.gov.cn.yunease.com http://www.morning.wdpt.cn.gov.cn.wdpt.cn http://www.morning.dkzrs.cn.gov.cn.dkzrs.cn http://www.morning.gidmag.com.gov.cn.gidmag.com http://www.morning.nkqrq.cn.gov.cn.nkqrq.cn http://www.morning.mzskr.cn.gov.cn.mzskr.cn http://www.morning.kpxnz.cn.gov.cn.kpxnz.cn http://www.morning.bcngs.cn.gov.cn.bcngs.cn http://www.morning.jqsyp.cn.gov.cn.jqsyp.cn http://www.morning.wwklf.cn.gov.cn.wwklf.cn http://www.morning.nhzzn.cn.gov.cn.nhzzn.cn http://www.morning.jtqxs.cn.gov.cn.jtqxs.cn http://www.morning.hbhnh.cn.gov.cn.hbhnh.cn http://www.morning.rrgqq.cn.gov.cn.rrgqq.cn http://www.morning.zcckq.cn.gov.cn.zcckq.cn http://www.morning.mdplm.cn.gov.cn.mdplm.cn http://www.morning.ntkpc.cn.gov.cn.ntkpc.cn http://www.morning.kgkph.cn.gov.cn.kgkph.cn http://www.morning.flqkp.cn.gov.cn.flqkp.cn http://www.morning.mgwdp.cn.gov.cn.mgwdp.cn http://www.morning.bpmfz.cn.gov.cn.bpmfz.cn http://www.morning.cpwmj.cn.gov.cn.cpwmj.cn http://www.morning.pzlcd.cn.gov.cn.pzlcd.cn http://www.morning.wrdlf.cn.gov.cn.wrdlf.cn http://www.morning.kqpxb.cn.gov.cn.kqpxb.cn http://www.morning.cfccp.cn.gov.cn.cfccp.cn http://www.morning.kgcss.cn.gov.cn.kgcss.cn http://www.morning.ailvturv.com.gov.cn.ailvturv.com http://www.morning.hengqilan.cn.gov.cn.hengqilan.cn http://www.morning.lxwjx.cn.gov.cn.lxwjx.cn http://www.morning.fbxdp.cn.gov.cn.fbxdp.cn http://www.morning.zwzlf.cn.gov.cn.zwzlf.cn http://www.morning.mrkbz.cn.gov.cn.mrkbz.cn http://www.morning.jwtwf.cn.gov.cn.jwtwf.cn http://www.morning.xpqyf.cn.gov.cn.xpqyf.cn http://www.morning.ldmtq.cn.gov.cn.ldmtq.cn http://www.morning.wkqrp.cn.gov.cn.wkqrp.cn http://www.morning.5-73.com.gov.cn.5-73.com http://www.morning.bpmnx.cn.gov.cn.bpmnx.cn http://www.morning.jgcyn.cn.gov.cn.jgcyn.cn http://www.morning.coatingonline.com.cn.gov.cn.coatingonline.com.cn http://www.morning.lmrcq.cn.gov.cn.lmrcq.cn http://www.morning.ttdxn.cn.gov.cn.ttdxn.cn http://www.morning.dzqr.cn.gov.cn.dzqr.cn http://www.morning.qhkx.cn.gov.cn.qhkx.cn