查工程建设不良记录免费的网站,国外网站建设素材,网站为什么备案,郑州即将迎来全面解封kafka整理
一、kafka概述
kafka是apache旗下一款开源的顶级的消息队列的系统, 最早是来源于领英, 后期将其贡献给apache, 采用语言是scala.基于zookeeper, 启动kafka集群需要先启动zookeeper集群, 同时在zookeeper记录kafka相关的元数据
kafka本质上就是消息队列的中间件产品…kafka整理
一、kafka概述
kafka是apache旗下一款开源的顶级的消息队列的系统, 最早是来源于领英, 后期将其贡献给apache, 采用语言是scala.基于zookeeper, 启动kafka集群需要先启动zookeeper集群, 同时在zookeeper记录kafka相关的元数据
kafka本质上就是消息队列的中间件产品 ,kafka中消息数据是直接存储在磁盘上
kafka的特点:
可靠性可扩展性耐用性高性能
二、kafka的架构图 kafka cluster kafka的集群 brokerkafka的节点 producer生产者 consumer消费者 topic主题一个逻辑容器 shard分片分片的数量 replicas副本受节点的限制副本节点数 zookeeper:对kafka集群进行管理保存kafka的元数据信息
三、安装
3.1解压
[pxjpxj62 /opt/software]$tar -zxvf kafka_2.12-2.4.1.tgz -C /opt/app/3.2建软连接
[pxjpxj62 /opt/app]$ln -s kafka_2.12-2.4.1 kafka3.3修改 server.properties
[pxjpxj62 /opt/app/kafka/config]$vim server.properties 3.4启动与停止
前台启动: ./kafka-server-start.sh ../config/server.properties
后台启动: nohup ./kafka-server-start.sh ../config/server.properties 21
注意: 第一次启动, 建议先前台启动, 观察是否可以正常启动, 如果OK, ctrl C 退出, 然后挂载到后台
启动: ./start-kafka.sh 四、shell命令操作
4.1创建top
[pxjpxj62 /opt/app/kafka/bin]$./kafka-topics.sh --create --zookeeper pxj62:2181,pxj63:2181,pxj64:2181 --topic test01 --partitions 3 --replication-factor 2
Created topic test01.
[pxjpxj62 /opt/app/kafka/bin]$./kafka-topics.sh --create --zookeeper pxj62:2181,pxj63:2181,pxj64:2181 --topic test02 --partitions 3 --replication-factor 3
Created topic test02.4.2 查看当前有那些topic
[pxjpxj62 /opt/app/kafka/bin]$./kafka-topics.sh --list --zookeeper pxj62:2181,pxj63:2181,pxj64:2181
test01
test024.3 如何查看某一个topic的详细信息
[pxjpxj62 /opt/app/kafka/bin]$./kafka-topics.sh --describe --zookeeper pxj62:2181,pxj63:2181,pxj64:2181 --topic test01
Topic: test01 PartitionCount: 3 ReplicationFactor: 2 Configs: Topic: test01 Partition: 0 Leader: 2 Replicas: 2,0 Isr: 2,0Topic: test01 Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0,1Topic: test01 Partition: 2 Leader: 1 Replicas: 1,2 Isr: 1,2
[pxjpxj62 /opt/app/kafka/bin]$./kafka-topics.sh --describe --zookeeper pxj62:2181,pxj63:2181,pxj64:2181 --topic test02
Topic: test02 PartitionCount: 3 ReplicationFactor: 3 Configs: Topic: test02 Partition: 0 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0Topic: test02 Partition: 1 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1Topic: test02 Partition: 2 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
[pxjpxj62 /opt/app/kafka/bin]$./kafka-topics.sh --create --zookeeper pxj62:2181,pxj63:2181,pxj64:2181 --topic test03 --partitions 3 --replication-factor 1
Created topic test03.
[pxjpxj62 /opt/app/kafka/bin]$./kafka-topics.sh --describe --zookeeper pxj62:2181,pxj63:2181,pxj64:2181 --topic test03
Topic: test03 PartitionCount: 3 ReplicationFactor: 1 Configs: Topic: test03 Partition: 0 Leader: 1 Replicas: 1 Isr: 1Topic: test03 Partition: 1 Leader: 2 Replicas: 2 Isr: 2Topic: test03 Partition: 2 Leader: 0 Replicas: 0 Isr: 04.4修改topic
[pxjpxj62 /opt/app/kafka/bin]$./kafka-topics.sh --alter --zookeeper pxj62:2181,pxj63:2181,pxj64:2181 --topic test01 --partitions 5
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
[pxjpxj62 /opt/app/kafka/bin]$./kafka-topics.sh --describe --zookeeper pxj62:2181,pxj63:2181,pxj64:2181 --topic test01
Topic: test01 PartitionCount: 5 ReplicationFactor: 2 Configs: Topic: test01 Partition: 0 Leader: 2 Replicas: 2,0 Isr: 2,0Topic: test01 Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0,1Topic: test01 Partition: 2 Leader: 1 Replicas: 1,2 Isr: 2,1Topic: test01 Partition: 3 Leader: 2 Replicas: 2,1 Isr: 2,1Topic: test01 Partition: 4 Leader: 0 Replicas: 0,2 Isr: 0,2
[pxjpxj62 /opt/app/kafka/bin]$
注意只能调大分片的数量, 无法调小以及无法调整副本数量4.5删除topic
[pxjpxj62 /opt/app/kafka/bin]$./kafka-topics.sh --delete --zookeeper pxj62:2181,pxj63:2181,pxj64:2181 --topic test01
Topic test01 is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
[pxjpxj62 /opt/app/kafka/bin]$./kafka-topics.sh --describe --zookeeper pxj62:2181,pxj63:2181,pxj64:2181 --topic test01
Error while executing topic command : Topic test01 does not exist as expected
[2023-04-09 22:36:54,129] ERROR java.lang.IllegalArgumentException: Topic test01 does not exist as expectedat kafka.admin.TopicCommand$.kafka$admin$TopicCommand$$ensureTopicExists(TopicCommand.scala:484)at kafka.admin.TopicCommand$ZookeeperTopicService.describeTopic(TopicCommand.scala:390)at kafka.admin.TopicCommand$.main(TopicCommand.scala:67)at kafka.admin.TopicCommand.main(TopicCommand.scala)(kafka.admin.TopicCommand$)
[pxjpxj62 /opt/app/kafka/bin]$
4.6模拟一个生产者. 用于生产数据到topic中
[pxjpxj62 /opt/app/kafka/bin]$./kafka-console-producer.sh --broker-list pxj62:9092,pxj63:9092,pxj64:9092 --topic test02
pxj
pxj
jps
ll4.7消费者接收
[pxjpxj63 /opt/app/kafka/bin]$./kafka-console-consumer.sh --bootstrap-server pxj62:9092,pxj63:9092,pxj64:9092 --topic test02 --from-beginning
pxj
pxj
jps
ll五、kafkaAPI
5.1生产者
package com.ccj.pxj.kafka;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;import java.util.Properties;public class KafkaProducerTest {public static void main(String[] args) {// 1- 创建 生产者对象// 1.1 设置生产者相关的配置Properties props new Properties();props.put(bootstrap.servpackage com.ccj.pxj.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class KafkaConsumerTest {public static void main(String[] args) {// 1. 创建 kafka的消费者对象//1.1: 设置消费者的配置信息Properties props new Properties();props.setProperty(bootstrap.servers, pxj62:9092,pxj63:9092,pxj64:9092); // 指定 kafka地址props.setProperty(group.id, test); // 指定消费组 idprops.setProperty(enable.auto.commit, true); // 是否开启自动提交数据的偏移量props.setProperty(auto.commit.interval.ms, 1000); // 自动提交的间隔时间props.setProperty(key.deserializer, org.apache.kafka.common.serialization.StringDeserializer); // 设置key反序列类props.setProperty(value.deserializer, org.apache.kafka.common.serialization.StringDeserializer);// 设置value反序列化类//1.2: 创建kafka消费者对象KafkaConsumerString, String consumer new KafkaConsumer(props);//2.设置消费者监听那些Topicconsumer.subscribe(Arrays.asList(test02));//3. 消费数据: 一直在消费, 只要有数据,立马进行处理操作while (true) {//3.1: 获取消息数据, 参数表示等待(超时)的时间ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(100));for (ConsumerRecordString, String record : records) {long offset record.offset(); // 偏移量信息String key record.key(); // 获取keyString value record.value(); // 获取valueint partition record.partition();// 从哪个分区读取的数据System.out.println(偏移量: offset ; key值:key ;value值: value ; 分区:partition);}}}
}
ers, pxj62:9092,pxj63:9092,pxj64:9092); // 指定kafka的地址props.put(acks, all); // 指定消息确认方案props.put(key.serializer, org.apache.kafka.common.serialization.StringSerializer);// key序列化类props.put(value.serializer, org.apache.kafka.common.serialization.StringSerializer); // value序列化类//1.2: 构建生产者ProducerString, String producer new KafkaProducer(props);//2. 发送数据for (int i 0; i 10; i) {//2.1 构建 数据的承载对象ProducerRecordString, String producerRecord new ProducerRecord(test02,Integer.toString(i));producer.send(producerRecord);}//3. 释放资源producer.close();}
}5.2 消费者
package com.ccj.pxj.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class KafkaConsumerTest {public static void main(String[] args) {// 1. 创建 kafka的消费者对象//1.1: 设置消费者的配置信息Properties props new Properties();props.setProperty(bootstrap.servers, pxj62:9092,pxj63:9092,pxj64:9092); // 指定 kafka地址props.setProperty(group.id, test); // 指定消费组 idprops.setProperty(enable.auto.commit, true); // 是否开启自动提交数据的偏移量props.setProperty(auto.commit.interval.ms, 1000); // 自动提交的间隔时间props.setProperty(key.deserializer, org.apache.kafka.common.serialization.StringDeserializer); // 设置key反序列类props.setProperty(value.deserializer, org.apache.kafka.common.serialization.StringDeserializer);// 设置value反序列化类//1.2: 创建kafka消费者对象KafkaConsumerString, String consumer new KafkaConsumer(props);//2.设置消费者监听那些Topicconsumer.subscribe(Arrays.asList(test02));//3. 消费数据: 一直在消费, 只要有数据,立马进行处理操作while (true) {//3.1: 获取消息数据, 参数表示等待(超时)的时间ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(100));for (ConsumerRecordString, String record : records) {long offset record.offset(); // 偏移量信息String key record.key(); // 获取keyString value record.value(); // 获取valueint partition record.partition();// 从哪个分区读取的数据System.out.println(偏移量: offset ; key值:key ;value值: value ; 分区:partition);}}}
}
SLF4J: Failed to load class org.slf4j.impl.StaticLoggerBinder.
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
偏移量:1; key值:null;value值:0; 分区:1
偏移量:2; key值:null;value值:1; 分区:1
偏移量:3; key值:null;value值:2; 分区:1
偏移量:4; key值:null;value值:3; 分区:1
偏移量:5; key值:null;value值:4; 分区:1
偏移量:6; key值:null;value值:5; 分区:1
偏移量:7; key值:null;value值:6; 分区:1
偏移量:8; key值:null;value值:7; 分区:1
偏移量:9; key值:null;value值:8; 分区:1
偏移量:10; key值:null;value值:9; 分区:1
六、kafka的核心原理
6.1kafka的分区和副本
分区 topic可以理解为是一个大的容器(逻辑), 分片相当于将topic划分为多个小容器, 将这些小容器分布在不同的broker上, 进行分布式存储, 分片的数量不受节点数量限制作用:1- 提升吞吐量, 前提 kafka节点充足下2- 解决单台节点存储有限的问题, 可以通过分片实现分布式存储3- 提高并发能力副本
对topic中每一个分片构建多个副本, 从而保证数据不能丢失, 副本的数量最多与节点数量是相等, 一般来说副本为 1~3个
作用:提升数据可靠性, 防止数据丢失6.2kafka数据传输过程
三阶段
第一阶段生产者将数据生产到集群的broket端
第二阶段broker将数据存储
第三阶段消费者从broker端消费数据
6.3生产者如何保证数据不丢失
对于kafka主要采用ack认证机制处理的
0生产者只管发送到broket端不管broker的响应
1生产者只管发送到broket端需要等待对应接受分片的主副本接收到数据后给予响应认为数据发送成功
-1:ALL生产者只管发送到broket端需要等待对应接受分片所有的 副本接收到数据后给予响应认为数据发送成功
效率01-1
安全-110
ack模式的选择根据生产需求确定
props.put(“acks”,all)6.3如果broker端迟迟没有给予响应如何解决
采用先等待超时时间再重试的策略一般重试3次如果重试后依然没有给予响应此时让程序直接报错。通知相关人员处理即可
6.4宽带占用如何解决
可以引入缓存池采用异步发送方案生产者将数据在发送数据时候底层会将这个数据保存到缓存池中当池子中数据达到一批数据大小后将达一批数据直接发送到broker此时broker针对这一批数据给予一次性响应即可批量发送数据6.5 采用批量发送数据如果发送一批数据到broker端broker端又没有给予响应此时缓存池中数据满了如何解决呢
解决方案
1.丢弃缓存池中数据报异常适用于数据不重要或者可以重读的消息总数据
2.在写入缓冲池的时候需要将数据在其他的地方也持久存储一份发送成功一批数据将持久化地方数据删除一部分以保证在出现此问题后数据依然存在下次启动的时候优先从持久化容器中读取即可七、安装 kafka-eagle
7.1.解压
7.2环境变量
[pxjpxj62 /home/pxj]$vim .bashrc
export PS1[\u\h pwd]\$
export JAVA_HOME/usr/java/jdk1.8.0_141
export PATH$JAVA_HOME/bin:$PATH
export HADOOP_HOME/opt/app/hadoop
export ZOOKEEPER_HOME/opt/app/zookeeper
export KAFKA_HOME/opt/app/kafka
export KE_HOME/opt/app/kafka-eagle
export PATH${HADOOP_HOME}/bin:${HADOOP_HOME}/sbin:${ZOOKEEPER_HOME}/bin:${KAFKA_HOME}/bin:${KE_HOME}/bin:$PATH[pxjpxj62 /home/pxj]$source .bashrc7.3配置 kafka_eagle。
使用vi打开conf目录下的system-config.propertie
[pxjpxj62 /opt/app/kafka-eagle/conf]$vim system-config.properties
kafka.eagle.zk.cluster.aliascluster1
cluster1.zk.listpxj62:2181,pxj63:2181,pxj64:2181
#cluster2.zk.listxdn10:2181,xdn11:2181,xdn12:2181
# kafka metrics, 30 days by default
######################################
kafka.eagle.metrics.chartstrue
kafka.eagle.metrics.retain30# kafka sqlite jdbc driver address
######################################
#kafka.eagle.driverorg.sqlite.JDBC
#kafka.eagle.urljdbc:sqlite:/hadoop/kafka-eagle/db/ke.db
#kafka.eagle.usernameroot
#kafka.eagle.passwordwww.kafka-eagle.org######################################
# kafka mysql jdbc driver address
######################################
kafka.eagle.drivercom.mysql.jdbc.Driver
kafka.eagle.urljdbc:mysql://pxj63:3306/ke?useUnicodetruecharacterEncodingUTF-8zeroDateTimeBehaviorconvertToNull
kafka.eagle.usernameroot
kafka.eagle.passwordI LOVE PXJ7.4配置JAVA_HOME
在24行加入
export JAVA_HOME/usr/java/jdk1.8.0_1417.5授权运行
[pxjpxj62 /opt/app/kafka-eagle/bin]$chmod x ke.sh 7.6启动
[pxjpxj62 /opt/app/kafka-eagle/bin]$./ke.sh start7.7访问web
http://pxj62:8048/ke八、同步发送
package com.ccj.pxj.kafka;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class KafkaProducerSyncTest {public static void main(String[] args) {Properties propsnew Properties();props.put(bootstrap.servers, pxj62:9092,pxj63:9092,pxj64:9092); // 指定kafka的地址props.put(acks, all); // 指定消息确认方案props.put(key.serializer, org.apache.kafka.common.serialization.StringSerializer);// key序列化类props.put(value.serializer, org.apache.kafka.common.serialization.StringSerializer); // value序列化类//构造生产者KafkaProducerString,String producer new KafkaProducer(props);
// 2.发送数据for (int i 0; i 10 ; i) {
// 构建 数据承载对象ProducerRecordString, String producerRecord new ProducerRecord(test01, i_02);// 使用get 其实就是同步方式, 会当发送后, 会一直等待响应, 如果长时间没有响应, 就会重试, 如果依然没有, 直接报错// get支持自定义超时的时间try{producer.send(producerRecord).get();}catch (Exception e){e.printStackTrace();}}producer.close();}
}
九、异步发送
package com.ccj.pxj.kafka;import org.apache.kafka.clients.producer.*;import java.util.Properties;public class KafkaProducerAsyncTest {public static void main(String[] args) {// 1- 创建 生产者对象// 1.1 设置生产者相关的配置Properties props new Properties();props.put(bootstrap.servers, pxj62:9092,pxj63:9092,pxj64:9092); // 指定kafka的地址props.put(acks, all); // 指定消息确认方案props.put(key.serializer, org.apache.kafka.common.serialization.StringSerializer);// key序列化类props.put(value.serializer, org.apache.kafka.common.serialization.StringSerializer); // value序列化类//1.2: 构建生产者ProducerString, String producer new KafkaProducer(props);
// 2.发送数据for (int i 0; i 10; i) {ProducerRecordString, String producerRecord new ProducerRecord(test01, i_22);producer.send(producerRecord, new Callback() {Overridepublic void onCompletion(RecordMetadata metadata, Exception e) {// 此方法为回调函数的方式, 当进行异步发送的时候, 不管最终是成功了还是失败了, 都会回调此函数if(e!null){// 说明有异常, 发送失败了// 在此处, 编写发送失败的处理业务逻辑代码System.err.println(发送消息失败 e.getStackTrace());}if(metadata!null){if (metadata ! null) {System.out.println(异步方式发送消息结果 topic- metadata.topic() |partition- metadata.partition() |offset- metadata.offset());}}}});}//3. 释放资源producer.close();}
}十、消费者异步
package com.ccj.pxj.kafka;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;public class KafkaConsumerTest02 {public static void main(String[] args) {Properties propsnew Properties();// 1. 创建 kafka的消费者对象//1.1: 设置消费者的配置信息props.setProperty(bootstrap.servers, pxj62:9092,pxj63:9092,pxj64:9092); // 指定 kafka地址props.setProperty(group.id, test); // 指定消费组 idprops.setProperty(enable.auto.commit, false); // 是否开启自动提交数据的偏移量props.setProperty(key.deserializer, org.apache.kafka.common.serialization.StringDeserializer); // 设置key反序列类props.setProperty(value.deserializer, org.apache.kafka.common.serialization.StringDeserializer);// 设置value反序列化类
//创建消费者对象KafkaConsumerString, String consumer new KafkaConsumer(props);consumer.subscribe(Arrays.asList(test01));while(true){ConsumerRecordsString,String recordsconsumer.poll(Duration.ofMillis(100));for (ConsumerRecordString, String record : records) {long offset record.offset(); // 偏移量信息String key record.key(); // 获取keyString value record.value(); // 获取valueint partition record.partition();// 从哪个分区读取的数据System.out.println(偏移量: offset ; key值:key ;value值: value ; 分区:partition);// 当消息消费完成后, 提交偏移量信息 : 一定不要丢失提交偏移量的代码. 否则 会造成大量的重复消费问题consumer.commitSync(); // 同步提交consumer.commitAsync(); // 异步提交}}}
}十一、broker端如何保证数据不丢失
broker主要将消息数据存储下来, 那么如何保证数据不丢失呢?
多副本机制 生产者的ack为 -1消费偏移量数据是存储在哪里呢? 在kafka的老版本(kafka 0.8x下)是存储在zookeeper中, 在新版本中消费者消息偏移量信息是存储在broker端, 通过一个topic来存储的: __consumer_offset此topic具有50个分区, 1个副本如何修改默认的过期时间呢?
# server.properties的103行位置: 默认值为 168小时
log.retention.hours168# 设置一个log文件的大小, 默认为: 1073741824 (1GB)
log.segment.bytes1073741824十二、kafka的数据查询机制 查询过程
先确定这条消息在那个segment片段中到对应片段中找index文件, 根据offset查询消息数据在log文件的那个物理偏移量位置根据从index查询到的偏移量信息, 到 log文件顺序查询(磁盘查询方式)到对应范围下数据即可
磁盘的读写分为两种读写方式: 顺序读写 和 随机读写
顺序读写效率远远高于随机读写十三、kafka中生产者的数据分发策略
kafka生产者数据分发策略: 指的生产者在生产数据到达broker指定topic中, 最终这条数据被topic中哪一个分片接收到了, 这就是生产者分发机制
思考: 常见的分发策略
1) hash策略
2) 轮询策略
3) 指定分区策略
4) 确定每个分区范围分发那么kafka支持那些分发策略呢?
1) 粘性分区策略(老版本(2.4以前): 轮询)
2) hash取模策略
3) 指定分区策略
4) 自定义分区如何设置分发策略呢? 与 ProducerRecord 和 DefaultPartitioner关系很大1) 粘性分区策略(老版本(2.4以前): 轮询)# 当生成数据时候, 使用这个只需要传递value发送方案, 底层走的 粘性分区策略(老版本(2.4以前): 轮询)public ProducerRecord(String topic, V value) {this(topic, null, null, null, value, null);}# 为什么这么说呢? 原因是 DefaultPartitionerpublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {# 当 key为null的时候, 执行 stickyPartitionCache (粘性分区)if (keyBytes null) {return stickyPartitionCache.partition(topic, cluster);} ListPartitionInfo partitions cluster.partitionsForTopic(topic);int numPartitions partitions.size();// hash the keyBytes to choose a partitionreturn Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;}2) hash取模策略# 当发送数据的时候, 如果传递 k 和 v , 默认使用 hash取模分区方案, 根据key进行hash取模public ProducerRecord(String topic, K key, V value) {this(topic, null, null, key, value, null);}# 为什么这么说呢? 原因是 DefaultPartitionerpublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {# 当 key为null的时候, 执行 stickyPartitionCache (粘性分区)if (keyBytes null) {return stickyPartitionCache.partition(topic, cluster);} # 当key不为null的时候, 获取topic的所有分区, 然后根据key进行hash取模ListPartitionInfo partitions cluster.partitionsForTopic(topic);int numPartitions partitions.size();// hash the keyBytes to choose a partitionreturn Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;}3) 指定分区策略# 当发送数据的时候, 需要明确指出给那个partition发送数据 : ProducerRecord构造# 分片是从0开始的, 如果是三个分片: 0 1 2public ProducerRecord(String topic, Integer partition, K key, V value) {this(topic, partition, null, key, value, null);}此时这种分发策略 与 defaultPartitions 没有关系了4) 自定义分区策略: (抄. 官方源码DefaultPartitioner)4.1) 创建一个类, 实现Partitioner 接口4.2) 重写接口中的partition方法, 返回值表示分区的编号4.3) 按照业务逻辑实现方法中分区方案4.4) 告知给kafka, 使用新的分区方案当生产者开始发送数据, 如果只传递了value的数据, 此时kafka会采用粘性分区策略, 首先会先随机的选择一个分区, 然后尽可能的黏上这个分区, 将这一批数据全部写入到这一个分区中, 当下次请求再来的时候, 重新在随机选择一个分区(如果间隔时间比较短, 大概率会黏住上一个分区), 再黏住这个分区, 将数据写入到这个分区下, 这种分区方案称为粘性分区策略粘性分区是kafka2.4.x及以上版本支持的一种全新的分区策略 在2.4以下的版本中, 采用的轮询方案老版本轮询:当生产者准备好一批数据后, 将这一批数据写入到某一个topic中, 如果采用轮询方案, 需要将这一批数据分为多个小批次, 分别对应不同的分片,将各个小批次的数据发送给对应的分片下即可, 而这种操作需要额外在一批数据上再次进行分批处理, 导致生产效率下降, 所以说在新版本中, 将其替换为粘性分区参数: partitioner.class :默认值: org.apache.kafka.clients.producer.internals.DefaultPartitioner通过生产者的properties对象, 重新设置一下partitioner.class 参数即可什么是粘性分区策略:
当生产者开始发送数据, 如果只传递了value的数据, 此时kafka会采用粘性分区策略, 首先会先随机的选择一个分区, 然后尽可能的黏上这个分区, 将这一批数据全部写入到这一个分区中, 当下次请求再来的时候, 重新在随机选择一个分区(如果间隔时间比较短, 大概率会黏住上一个分区), 再黏住这个分区, 将数据写入到这个分区下, 这种分区方案称为粘性分区策略粘性分区是kafka2.4.x及以上版本支持的一种全新的分区策略 在2.4以下的版本中, 采用的轮询方案老版本轮询:当生产者准备好一批数据后, 将这一批数据写入到某一个topic中, 如果采用轮询方案, 需要将这一批数据分为多个小批次, 分别对应不同的分片,将各个小批次的数据发送给对应的分片下即可, 而这种操作需要额外在一批数据上再次进行分批处理, 导致生产效率下降, 所以说在新版本中, 将其替换为粘性分区十四、kafka的负载均衡机制 如果使用kafka模拟点对点 和 发布订阅 方式点对点: 一个消费只能被一个消费者所接收让所有监听这个topic的消费者都属于同一个消费者组内即可发布订阅: 一个消息可以被多个消费者所接收让所有监听这个topic的消费者都属于不同的消费者组内即可作者潘陈pxj 日期2023-04-30 文章转载自: http://www.morning.tllws.cn.gov.cn.tllws.cn http://www.morning.pcqxr.cn.gov.cn.pcqxr.cn http://www.morning.zjqwr.cn.gov.cn.zjqwr.cn http://www.morning.pcqxr.cn.gov.cn.pcqxr.cn http://www.morning.bkxnp.cn.gov.cn.bkxnp.cn http://www.morning.pbwcq.cn.gov.cn.pbwcq.cn http://www.morning.fddfn.cn.gov.cn.fddfn.cn http://www.morning.sjpht.cn.gov.cn.sjpht.cn http://www.morning.bcdqf.cn.gov.cn.bcdqf.cn http://www.morning.gwjqq.cn.gov.cn.gwjqq.cn http://www.morning.kzdwt.cn.gov.cn.kzdwt.cn http://www.morning.ryfpx.cn.gov.cn.ryfpx.cn http://www.morning.wddmr.cn.gov.cn.wddmr.cn http://www.morning.qywfw.cn.gov.cn.qywfw.cn http://www.morning.wwdlg.cn.gov.cn.wwdlg.cn http://www.morning.rgrz.cn.gov.cn.rgrz.cn http://www.morning.sdktr.com.gov.cn.sdktr.com http://www.morning.yrgb.cn.gov.cn.yrgb.cn http://www.morning.qfqld.cn.gov.cn.qfqld.cn http://www.morning.cljpz.cn.gov.cn.cljpz.cn http://www.morning.rxtxf.cn.gov.cn.rxtxf.cn http://www.morning.ghpld.cn.gov.cn.ghpld.cn http://www.morning.fcwb.cn.gov.cn.fcwb.cn http://www.morning.ndpzm.cn.gov.cn.ndpzm.cn http://www.morning.swsrb.cn.gov.cn.swsrb.cn http://www.morning.kpbgvaf.cn.gov.cn.kpbgvaf.cn http://www.morning.pcxgj.cn.gov.cn.pcxgj.cn http://www.morning.prlgn.cn.gov.cn.prlgn.cn http://www.morning.fbbmg.cn.gov.cn.fbbmg.cn http://www.morning.clndl.cn.gov.cn.clndl.cn http://www.morning.qrdkk.cn.gov.cn.qrdkk.cn http://www.morning.xysdy.cn.gov.cn.xysdy.cn http://www.morning.bwznl.cn.gov.cn.bwznl.cn http://www.morning.vehna.com.gov.cn.vehna.com http://www.morning.flchj.cn.gov.cn.flchj.cn http://www.morning.lzqnj.cn.gov.cn.lzqnj.cn http://www.morning.jqrp.cn.gov.cn.jqrp.cn http://www.morning.kxsnp.cn.gov.cn.kxsnp.cn http://www.morning.sbqrm.cn.gov.cn.sbqrm.cn http://www.morning.snmsq.cn.gov.cn.snmsq.cn http://www.morning.wbxtx.cn.gov.cn.wbxtx.cn http://www.morning.qxwrd.cn.gov.cn.qxwrd.cn http://www.morning.rtzd.cn.gov.cn.rtzd.cn http://www.morning.zgqysw.cn.gov.cn.zgqysw.cn http://www.morning.nwllb.cn.gov.cn.nwllb.cn http://www.morning.bjsites.com.gov.cn.bjsites.com http://www.morning.rqwwm.cn.gov.cn.rqwwm.cn http://www.morning.cwyfs.cn.gov.cn.cwyfs.cn http://www.morning.ykrck.cn.gov.cn.ykrck.cn http://www.morning.dhqyh.cn.gov.cn.dhqyh.cn http://www.morning.zdnrb.cn.gov.cn.zdnrb.cn http://www.morning.pcbfl.cn.gov.cn.pcbfl.cn http://www.morning.grcfn.cn.gov.cn.grcfn.cn http://www.morning.qqtzn.cn.gov.cn.qqtzn.cn http://www.morning.jbtwq.cn.gov.cn.jbtwq.cn http://www.morning.hkswt.cn.gov.cn.hkswt.cn http://www.morning.qbnfc.cn.gov.cn.qbnfc.cn http://www.morning.cbnxq.cn.gov.cn.cbnxq.cn http://www.morning.bfjyp.cn.gov.cn.bfjyp.cn http://www.morning.wgbsm.cn.gov.cn.wgbsm.cn http://www.morning.fsbns.cn.gov.cn.fsbns.cn http://www.morning.lywpd.cn.gov.cn.lywpd.cn http://www.morning.gpmrj.cn.gov.cn.gpmrj.cn http://www.morning.fkyqt.cn.gov.cn.fkyqt.cn http://www.morning.jwskq.cn.gov.cn.jwskq.cn http://www.morning.dlurfdo.cn.gov.cn.dlurfdo.cn http://www.morning.fbpdp.cn.gov.cn.fbpdp.cn http://www.morning.nmngg.cn.gov.cn.nmngg.cn http://www.morning.xzlp.cn.gov.cn.xzlp.cn http://www.morning.mjtft.cn.gov.cn.mjtft.cn http://www.morning.qgzmz.cn.gov.cn.qgzmz.cn http://www.morning.nkkpp.cn.gov.cn.nkkpp.cn http://www.morning.zbnkt.cn.gov.cn.zbnkt.cn http://www.morning.rfbq.cn.gov.cn.rfbq.cn http://www.morning.zrlwl.cn.gov.cn.zrlwl.cn http://www.morning.nmfwm.cn.gov.cn.nmfwm.cn http://www.morning.rlxg.cn.gov.cn.rlxg.cn http://www.morning.hhxkl.cn.gov.cn.hhxkl.cn http://www.morning.dfkmz.cn.gov.cn.dfkmz.cn http://www.morning.xwbld.cn.gov.cn.xwbld.cn