网站建设电话销售话术,网站后台建设 招聘,公众号怎么制作滑动照片,百度知道电脑版网页入口文章目录 前言4 Kafka基准测试4.1 基于1个分区1个副本的基准测试4.2 基于3个分区1个副本的基准测试4.3 基于1个分区3个副本的基准测试 5 Java编程操作Kafka5.1 引入依赖5.2 向Kafka发送消息5.3 从Kafka消费消息5.4 异步使用带有回调函数的生产消息 6 幂等性6.1 幂等性介绍6.2 K… 文章目录 前言4 Kafka基准测试4.1 基于1个分区1个副本的基准测试4.2 基于3个分区1个副本的基准测试4.3 基于1个分区3个副本的基准测试 5 Java编程操作Kafka5.1 引入依赖5.2 向Kafka发送消息5.3 从Kafka消费消息5.4 异步使用带有回调函数的生产消息 6 幂等性6.1 幂等性介绍6.2 Kafka幂等性实现原理 7 Kafka事务7.1 Kafka事务介绍7.2 事务操作API7.3 Kafka事务编程7.3.1 需求7.3.2 创建Topic7.3.3 编写生产者7.3.4 创建消费者7.3.5 消费旧Topic数据并生产到新Topic7.3.6 测试7.3.7 模拟异常测试事务 前言
Kafka学习笔记(一)Linux环境基于Zookeeper搭建Kafka集群、Kafka的架构
4 Kafka基准测试
基准测试benchmark testing是一种测量和评估软件性能指标的活动。 我们可以通过基准测试了解到软件、硬件的性能水平主要测试负载的执行时间、传输速度、吞吐量、资源占用率等。
4.1 基于1个分区1个副本的基准测试
1创建1个分区1个副本的Topic 2生产消息基准测试
bin/kafka-producer-perf-test.sh --topic topic_1_1 --num-records 5000000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers192.168.245.130:9092,192.168.245.131:9092,192.168.245.132:9092 acks1命令解释 bin/kafka-producer-perf-test.sh 性能测试脚本–topic Topic的名称–num-records 指定生产数据量默认5000W–throughput 指定吞吐量即限流-1不指定–record-size record数据大小字节–producer-props bootstrap.servers 指定Kafka集群地址acks1 ACK模式 执行以上命令结果如下 3消费消息基准测试
bin/kafka-consumer-perf-test.sh --broker-list 192.168.245.130:9092,192.168.245.131:9092,192.168.245.132:9092 --topic topic_1_1 --fetch-size 1048576 --messages 5000000 --timeout 100000命令解释 bin/kafka-consumer-perf-test.sh 消费消息基准测试脚本–broker-list 集群Broker列表–topic Topic的名称–fetch-size 每次拉取的数据大小–messages 总共要消费的消息个数–timeout 超时时间 执行以上命令结果如下 4.2 基于3个分区1个副本的基准测试
1创建3个分区1个副本的Topic 2生产消息基准测试
bin/kafka-producer-perf-test.sh --topic topic_3_1 --num-records 5000000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers192.168.245.130:9092,192.168.245.131:9092,192.168.245.132:9092 acks1指标3分区1副本1分区1副本性能对比1分区1副本吞吐量10900.822140 records/sec8994.536718 records/sec提升↑吞吐速率10.40 MB/sec8.58 MB/sec提升↑平均延迟时间2508.37 ms avg latency3418.50 ms avg latency提升↑最大延迟时间47436.00 ms max latency50592.00 ms max latency
3消费消息基准测试
bin/kafka-consumer-perf-test.sh --broker-list 192.168.245.130:9092,192.168.245.131:9092,192.168.245.132:9092 --topic topic_3_1 --fetch-size 1048576 --messages 5000000 --timeout 100000指标3分区1副本1分区1副本性能对比1分区1副本data.consumed.in.MB 共计消费数据量4768.40214768.3716MB.sec 每秒消费数据量28.563721.1589提升↑data.consumed.in.nMsg 共计消费消息数量50000325000000nMsg.sec 每秒消费消息数量29951.251722186.7235提升↑
4.3 基于1个分区3个副本的基准测试
1创建1个分区3个副本的Topic 2生产消息基准测试
bin/kafka-producer-perf-test.sh --topic topic_1_3 --num-records 5000000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers192.168.245.130:9092,192.168.245.131:9092,192.168.245.132:9092 acks1指标1分区3副本1分区1副本性能对比1分区1副本吞吐量4323.273652 records/sec8994.536718 records/sec下降↓吞吐速率4.12 MB/sec8.58 MB/sec下降↓平均延迟时间7533.70 ms avg latency3418.50 ms avg latency下降↓最大延迟时间32871.00 ms max latency50592.00 ms max latency
可见副本越多生产消息的性能反而下降。
3消费消息基准测试
bin/kafka-consumer-perf-test.sh --broker-list 192.168.245.130:9092,192.168.245.131:9092,192.168.245.132:9092 --topic topic_1_3 --fetch-size 1048576 --messages 5000000 --timeout 100000指标1分区3副本1分区1副本性能对比1分区1副本data.consumed.in.MB 共计消费数据量4768.37164768.3716MB.sec 每秒消费数据量46.950421.1589下降↓data.consumed.in.nMsg 共计消费消息数量50000005000000nMsg.sec 每秒消费消息数量49231.011622186.7235下降↓
同样副本越多消费消息的性能也下降。
5 Java编程操作Kafka
创建一个Maven项目测试Java变成操作Kafka。
5.1 引入依赖
!-- kafka_demo\pom.xml --!-- kafka客户端工具 --
dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactIdversion2.4.1/version
/dependency5.2 向Kafka发送消息
public class KafkaProducerTest {public static void main(String[] args) {// 1.创建用于连接Kafka的Properties配置Properties props new Properties();props.put(bootstrap.servers, 192.168.245.130:9092);props.put(acks, all);props.put(key.serializer, org.apache.kafka.common.serialization.StringSerializer);props.put(value.serializer, org.apache.kafka.common.serialization.StringSerializer);// 2.创建一个生产者对象KafkaProducerKafkaProducerString, String producer new KafkaProducer(props);// 3.调用send发送1-100消息到my_topic主题for(int i 0; i 100; i) {try {// 获取返回值Future该对象封装了返回值FutureRecordMetadata future producer.send(new ProducerRecord(my_topic, null, i ));// 调用一个Future.get()方法等待响应future.get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}// 4. 关闭生产者producer.close();}}执行以上代码查看此时my_topic主题中的消息 5.3 从Kafka消费消息
public class KafkaConsumerTest {public static void main(String[] args) throws InterruptedException {// 1.创建Kafka消费者配置Properties props new Properties();props.setProperty(bootstrap.servers, node-01:9092,node-02:9092,node-03:9092);// 消费者组可以使用消费者组将若干个消费者组织到一起共同消费Kafka中topic的数据// 每一个消费者需要指定一个消费者组如果消费者的组名是一样的表示这几个消费者是一个组中的props.setProperty(group.id, my_group);// 自动提交offsetprops.setProperty(enable.auto.commit, true);// 自动提交offset的时间间隔props.setProperty(auto.commit.interval.ms, 1000);// 拉取的key、value数据的props.setProperty(key.deserializer, org.apache.kafka.common.serialization.StringDeserializer);props.setProperty(value.deserializer, org.apache.kafka.common.serialization.StringDeserializer);// 2.创建Kafka消费者KafkaConsumerString, String kafkaConsumer new KafkaConsumer(props);// 3. 订阅要消费的主题// 指定消费者从哪个topic中拉取数据kafkaConsumer.subscribe(Arrays.asList(my_topic));// 4.使用一个while循环不断从Kafka的topic中拉取消息while(true) {// Kafka的消费者一次拉取一批的数据ConsumerRecordsString, String consumerRecords kafkaConsumer.poll(Duration.ofSeconds(5));System.out.println(消息总数 consumerRecords.count());// 5.将将记录record的offset、key、value都打印出来for (ConsumerRecordString, String consumerRecord : consumerRecords) {// 主题String topic consumerRecord.topic();// offset这条消息处于Kafka分区中的哪个位置long offset consumerRecord.offset();// key\valueString key consumerRecord.key();String value consumerRecord.value();System.out.println(topic: topic offset: offset key: key value: value);}Thread.sleep(1000);}}
}执行以上代码查看打印日志 5.4 异步使用带有回调函数的生产消息
如果想知道消息是否成功发送到Kafka或者成功发送消息到Kafka后执行一些其他动作就可以使用带有回调函数的发送方法来发送消息。
public static void main(String[] args) {// 1. 创建用于连接Kafka的Properties配置Properties props new Properties();props.put(bootstrap.servers, node-01:9092);props.put(acks, all);props.put(key.serializer, org.apache.kafka.common.serialization.StringSerializer);props.put(value.serializer, org.apache.kafka.common.serialization.StringSerializer);// 2. 创建一个生产者对象KafkaProducerKafkaProducerString, String producer new KafkaProducerString, String(props);// 3. 调用send发送1-100消息到指定Topic testfor(int i 1; i 100; i) {// 一、同步方式// 获取返回值Future该对象封装了返回值// FutureRecordMetadata future producer.send(new ProducerRecordString, String(my_topic, null, i ));// 调用一个Future.get()方法等待响应// future.get();// 二、带回调函数异步方式producer.send(new ProducerRecordString, String(my_topic, null, i ), new Callback() {Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if(exception ! null) {System.out.println(发送消息出现异常);}else {String topic metadata.topic();int partition metadata.partition();long offset metadata.offset();System.out.println(发送消息到Kafka中的名字为 topic 的主题第 partition 分区第 offset 条数据成功);}}});}// 4. 关闭生产者producer.close();
}执行以上代码查看打印日志 6 幂等性
6.1 幂等性介绍
在HTTP/1.1中对幂等性的定义是一次和多次请求某一个资源对于资源本身应该具有同样的结果网络超时等问题除外。也就是说任意多次请求执行对资源本身所产生的影响均与一次请求执行的影响相同。
实现幂等性的关键就是服务端可以区分请求是否重复过滤掉重复的请求。要区分请求是否重复有两个要素
唯一标识要想区分请求是否重复请求中就得有唯一标识。例如支付请求中订单号就是唯一标识。记录下已处理过的请求标识光有唯一标识还不够还需要记录下哪些请求是已经处理过的这样当收到新的请求时用新请求中的标识和处理记录进行比较如果处理记录中有相同的标识说明是重复交易拒绝掉。 如上图所示当再次发送的消息seq0和上次发送的消息seq0重复时不保存新的消息。
6.2 Kafka幂等性实现原理
为了实现生产者的幂等性Kafka引入了Producer ID即PID和Sequence Number。
PID每个新的Producer在初始化的时候会被分配一个唯一的PID这个PID对用户是不可见的。Sequence Numbler针对每个生产者对应PID发送到指定的Topic, Partition的消息都对应一个从0开始单调递增的Sequence Number。
而生产者想要实现幂等性只需要添加以下配置
// 实现幂等性
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);7 Kafka事务
7.1 Kafka事务介绍
通过事务机制Kafka可以实现对多个Topic的多个Partition的原子性的写入即处于同一个事务内的所有消息不管最终需要落地到哪个Topic 的哪个Partition最终结果都是要么全部写成功要么全部写失败。
开启事务必须开启幂等性Kafka的事务机制在底层依赖于幂等生产者。
7.2 事务操作API
要开启Kafka事务生产者需要添加以下配置
// 配置事务的id开启了事务会默认开启幂等性
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, my-transactional);消费者则需要添加以下配置
// 设置隔离级别
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, read_committed);
// 关闭自动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);Producer接口中定义了以下5个事务相关方法
1.initTransactions初始化事务要使用Kafka事务必须先进行初始化操作2.beginTransaction开始事务启动一个Kafka事务3.sendOffsetsToTransaction提交偏移量批量地将分区对应的offset发送到事务中方便后续一块提交4.commitTransaction提交事务提交事务5.abortTransaction放弃事务取消事务
7.3 Kafka事务编程
7.3.1 需求
在Kafka的Topic[ods_user]中有一些用户数据数据格式如下
姓名,性别,出生日期
张三,1,1980-10-09
李四,0,1985-11-01现在要编写程序将用户的性别转换为男、女1-男0-女转换后将数据写入到Topic[dwd_user]中。要求使用事务保障要么消费了数据的同时写入数据到新Topic提交offset要么全部失败。
7.3.2 创建Topic 启动生产者控制台程序准备发送消息到Topic[ods_user]
[rootnode-01 kafka01]$ bin/kafka-console-producer.sh --broker-list 192.168.245.130:9092 --topic ods_user启动消费者控制台程序准备从新Topic[dwd_user]消费消息
[rootnode-01 kafka01]$ bin/kafka-console-consumer.sh --bootstrap-server 192.168.245.130:9092 --topic dwd_user --from-beginning --isolation-level read_committed7.3.3 编写生产者
private static KafkaProducerString, String createProducer() {// 1.创建用于连接Kafka的Properties配置Properties props new Properties();// 配置事务的id开启了事务会默认开启幂等性props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, my-transactional);props.put(bootstrap.servers, 192.168.245.130:9092);props.put(acks, all);props.put(key.serializer, org.apache.kafka.common.serialization.StringSerializer);props.put(value.serializer, org.apache.kafka.common.serialization.StringSerializer);// 2.创建一个生产者对象KafkaProducerKafkaProducerString, String producer new KafkaProducer(props);return producer;
}7.3.4 创建消费者
private static KafkaConsumerString, String createConsumer() {// 1.创建Kafka消费者配置Properties props new Properties();props.setProperty(bootstrap.servers, 192.168.245.130:9092,192.168.245.131:9092,192.168.245.132:9092);props.setProperty(group.id, my_group);// 关闭自动提交offsetprops.setProperty(enable.auto.commit, false);// 事务隔离级别props.put(isolation.level, read_committed);props.setProperty(key.deserializer, org.apache.kafka.common.serialization.StringDeserializer);props.setProperty(value.deserializer, org.apache.kafka.common.serialization.StringDeserializer);// 2.创建Kafka消费者KafkaConsumerString, String consumer new KafkaConsumer(props);// 3.订阅要消费的主题consumer.subscribe(Arrays.asList(ods_user));return consumer;
}7.3.5 消费旧Topic数据并生产到新Topic
public static void main(String[] args) {KafkaProducerString, String producer createProducer();KafkaConsumerString, String consumer createConsumer();// 1.初始化事务producer.initTransactions();while (true) {try {// 2.开启事务producer.beginTransaction();// 定义Map结构用于保存分区对应的offsetMapTopicPartition, OffsetAndMetadata offsetCommits new HashMap();// 3.拉取ods_user的消息ConsumerRecordsString, String records consumer.poll(Duration.ofSeconds(2));for (ConsumerRecordString, String record : records) {System.out.println(原始消息 record.value());// 4.保存偏移量offsetCommits.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() 1));// 5.进行转换处理String[] fields record.value().split(,);fields[1] fields[1].equalsIgnoreCase(1) ? 男 : 女;String newMsg fields[0] , fields[1] , fields[2];// 6.将新消息生产到dwd_userproducer.send(new ProducerRecord(dwd_user, newMsg));System.out.println(新消息 newMsg);}// 7.提交偏移量到事务producer.sendOffsetsToTransaction(offsetCommits, my_group);// 8.提交事务producer.commitTransaction();} catch (ProducerFencedException e) {// 9.放弃事务producer.abortTransaction();}}
}7.3.6 测试
执行上述main()方法然后向Topic[ods_user]发送消息 main()方法日志打印 从新Topic[dwd_user]消费消息 7.3.7 模拟异常测试事务 假设在进行转换处理的时候出现异常。再次向Topic[ods_user]发送消息程序会读取到消息但转换报错 再次重启main()方法还是可以读取到消息但转换报错。说明消息一直都没有被消费成功offset没有被提交Kafka事务生效了。
…
本节完更多内容请查阅分类专栏微服务学习笔记
感兴趣的读者还可以查阅我的另外几个专栏
SpringBoot源码解读与原理分析MyBatis3源码深度解析Redis从入门到精通MyBatisPlus详解SpringCloud学习笔记 文章转载自: http://www.morning.rdmz.cn.gov.cn.rdmz.cn http://www.morning.ffdyy.cn.gov.cn.ffdyy.cn http://www.morning.pwmpn.cn.gov.cn.pwmpn.cn http://www.morning.xpqyf.cn.gov.cn.xpqyf.cn http://www.morning.qpntn.cn.gov.cn.qpntn.cn http://www.morning.pfjbn.cn.gov.cn.pfjbn.cn http://www.morning.brps.cn.gov.cn.brps.cn http://www.morning.yrbq.cn.gov.cn.yrbq.cn http://www.morning.lqws.cn.gov.cn.lqws.cn http://www.morning.bkkgt.cn.gov.cn.bkkgt.cn http://www.morning.krrjb.cn.gov.cn.krrjb.cn http://www.morning.zqwp.cn.gov.cn.zqwp.cn http://www.morning.xcfmh.cn.gov.cn.xcfmh.cn http://www.morning.dtzxf.cn.gov.cn.dtzxf.cn http://www.morning.wqsjx.cn.gov.cn.wqsjx.cn http://www.morning.lptjt.cn.gov.cn.lptjt.cn http://www.morning.ppbrq.cn.gov.cn.ppbrq.cn http://www.morning.dxhdn.cn.gov.cn.dxhdn.cn http://www.morning.yydzk.cn.gov.cn.yydzk.cn http://www.morning.ykwqz.cn.gov.cn.ykwqz.cn http://www.morning.rkrcd.cn.gov.cn.rkrcd.cn http://www.morning.dnconr.cn.gov.cn.dnconr.cn http://www.morning.khxwp.cn.gov.cn.khxwp.cn http://www.morning.ghfmd.cn.gov.cn.ghfmd.cn http://www.morning.xhgxd.cn.gov.cn.xhgxd.cn http://www.morning.kycxb.cn.gov.cn.kycxb.cn http://www.morning.kpcxj.cn.gov.cn.kpcxj.cn http://www.morning.sqqdy.cn.gov.cn.sqqdy.cn http://www.morning.fqklt.cn.gov.cn.fqklt.cn http://www.morning.lbcbq.cn.gov.cn.lbcbq.cn http://www.morning.ymwrs.cn.gov.cn.ymwrs.cn http://www.morning.srtw.cn.gov.cn.srtw.cn http://www.morning.mbmh.cn.gov.cn.mbmh.cn http://www.morning.rykw.cn.gov.cn.rykw.cn http://www.morning.rdmn.cn.gov.cn.rdmn.cn http://www.morning.ntwfr.cn.gov.cn.ntwfr.cn http://www.morning.kztts.cn.gov.cn.kztts.cn http://www.morning.nlffl.cn.gov.cn.nlffl.cn http://www.morning.wjlbb.cn.gov.cn.wjlbb.cn http://www.morning.gcrlb.cn.gov.cn.gcrlb.cn http://www.morning.rfbt.cn.gov.cn.rfbt.cn http://www.morning.ykwbx.cn.gov.cn.ykwbx.cn http://www.morning.cltrx.cn.gov.cn.cltrx.cn http://www.morning.iknty.cn.gov.cn.iknty.cn http://www.morning.rrxmm.cn.gov.cn.rrxmm.cn http://www.morning.sxwfx.cn.gov.cn.sxwfx.cn http://www.morning.mdtfh.cn.gov.cn.mdtfh.cn http://www.morning.jspnx.cn.gov.cn.jspnx.cn http://www.morning.rgwz.cn.gov.cn.rgwz.cn http://www.morning.sfphz.cn.gov.cn.sfphz.cn http://www.morning.dqzcf.cn.gov.cn.dqzcf.cn http://www.morning.rsjng.cn.gov.cn.rsjng.cn http://www.morning.blqmn.cn.gov.cn.blqmn.cn http://www.morning.drspc.cn.gov.cn.drspc.cn http://www.morning.bflws.cn.gov.cn.bflws.cn http://www.morning.mkzdp.cn.gov.cn.mkzdp.cn http://www.morning.nswcw.cn.gov.cn.nswcw.cn http://www.morning.tfwsk.cn.gov.cn.tfwsk.cn http://www.morning.xplng.cn.gov.cn.xplng.cn http://www.morning.swkpq.cn.gov.cn.swkpq.cn http://www.morning.dpmkn.cn.gov.cn.dpmkn.cn http://www.morning.bxqtq.cn.gov.cn.bxqtq.cn http://www.morning.hlxpz.cn.gov.cn.hlxpz.cn http://www.morning.ktqtf.cn.gov.cn.ktqtf.cn http://www.morning.rnygs.cn.gov.cn.rnygs.cn http://www.morning.flncd.cn.gov.cn.flncd.cn http://www.morning.rmxwm.cn.gov.cn.rmxwm.cn http://www.morning.kvzvoew.cn.gov.cn.kvzvoew.cn http://www.morning.cltrx.cn.gov.cn.cltrx.cn http://www.morning.rbjf.cn.gov.cn.rbjf.cn http://www.morning.mbaiwan.com.gov.cn.mbaiwan.com http://www.morning.tqbyw.cn.gov.cn.tqbyw.cn http://www.morning.cwlxs.cn.gov.cn.cwlxs.cn http://www.morning.wwgpy.cn.gov.cn.wwgpy.cn http://www.morning.wcjk.cn.gov.cn.wcjk.cn http://www.morning.mgbsp.cn.gov.cn.mgbsp.cn http://www.morning.rcmcw.cn.gov.cn.rcmcw.cn http://www.morning.24vy.com.gov.cn.24vy.com http://www.morning.gswfs.cn.gov.cn.gswfs.cn http://www.morning.lbrrn.cn.gov.cn.lbrrn.cn