建设网站的功能定位是什么意思,免费网站大全下载,网站功能,北京南站核酸检测地点文章目录前言1、Kafka 系统架构1.1、Producer 生产者1.2、Consumer 消费者1.3、Consumer Group 消费者群组1.4、Topic 主题1.5、Partition 分区1.6、Log 日志存储1.7、Broker 服务器1.8、Offset 偏移量1.9、Replication 副本1.10、Zookeeper2、Kafka 环境搭建2.1、下载 Kafka2.…
文章目录前言1、Kafka 系统架构1.1、Producer 生产者1.2、Consumer 消费者1.3、Consumer Group 消费者群组1.4、Topic 主题1.5、Partition 分区1.6、Log 日志存储1.7、Broker 服务器1.8、Offset 偏移量1.9、Replication 副本1.10、Zookeeper2、Kafka 环境搭建2.1、下载 Kafka2.2、修改 Kafka 配置 config/server.properties2.3、运行下面有用 AdminClient API 操作3、Kafka 的 JAVA 实现3.1、AdminClient API3.2、Kafka 生产者 JAVA 代码3.3、Kafka 消费者 JAVA 代码3.4、Spring Boot 实现消费者对 Kafka 的监听4、Kafka 幂等性4.1、幂等性要解决的问题4.2、Kafka 是怎么保证幂等性的4.3、Kafka 幂等性的局限性5、Kafka 常见问题5.1、Kafka 消息丢失5.2、Kafka 消息重复消费5.3、kafka 为什么快/吞吐量大前言 
官网http://kafka.apache.org Kafka 是由 Apache 软件基金会开发的一个开源流处理平台由Scala和ava编写。Kafka是一种高吞吐量的分布式发布订阅消息系统它可以处理消费者在网站中的所有动作流数据。 1、Kafka 系统架构 1.1、Producer 生产者 生产者用于创建消息。生产者在默认情况下把消息均衡地分布到主题的所有分区上而并不关心特定消息会被写到哪个分区。不过在某些情况下生产者会把消息直接写到指定的分区。 
消息发送 
public FutureRecordMetadata send(ProducerRecordK, V record);
public FutureRecordMetadata send(ProducerRecordK, V record, Callback callback);// 发后即忘fire-and-forget
producer.send(record);
// 同步sync
producer.send(record).get();
// 异步async
producer.send(record, new Callback(){
});消息在通过send()方法发往broker的过程中有可能需要经过拦截器、序列化器、和分区器的一系列作用之后才能被真正地发往broker。 
生产者架构图 序列化 
生产者需要用序列化器把对象转换成字节数组才能通过网络发送给Kafka。消费者需要用反序列化区把从Kafka中收到的字节数组转换成相应的对象。自带的有StringSerializerByteArray、ByteBuffer、Bytes、Double、Integer、Long等还可以自定义序列化器。分区器 
如果消息中没有指定partition字段那么就需要依赖分区器根据key这个字段来计算partition的值。也可以自定义分区器。拦截器 
生产者拦截器既可以用来在消息发送前做一些准备工作比如按照某个规则过滤不符合要求的消息、修改消息的内容等也可以用来在发送回调逻辑前做一些定制化的需求比如统计类工作。通过自定义实现ProducerInterceptor接口来使用。整个生产者客户端由两个线程协调运行这两个线程分别为主线程和Sender发送线程 
① 在主线程中由KafkaProducer创建消息然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器RecordAccumulator中。消息累加器RecordAccumulator主要用来缓存消息以便Sender线程可以批量发送进而减少网络传输的资源消耗以提升性能。RecordAccumulator的缓存大小可以通过buffer.memory配置。 
在RecordAccumulator的内部为每个分区都维护了一个双端队列主线程发送过来的消息ProducerRecord都会被追加到某个双端队列Dqueue中队列中的内容就是ProducerBatch即Dqueue ProducerBatch 。一个ProducerBatch 包含多个ProducerRecord。当一条消息ProducerRecord流入RecordAccumulator如果这条消息小于batch.size参数大小则以batch.size参数大小创建ProducerBatch否则以消息的实际大小创建ProducerBatch。 
② Sender发送线程负责从消息累加器RecordAccumulator中获取消息并将其发送到Kafka中。后续Sender从缓存中获取消息进行转换发送到broker。在发送前还会保存到InFlightRequests中作用是缓存已经发送出去但还没有收到响应的请求缓存数量由max.in.flight.requests.per.connection参数确定默认是5表示每个连接最多缓存5个未响应的请求。 
1.2、Consumer 消费者 消费者消息的订阅者可以订阅一个或多个主题并且依据消息生产的顺序读取他们消费者通过检查消息的偏移量来区分已经读取过的消息。消费者一定属于某一个特定的消费组。 
订阅主题和分区 
通过subscribe()方法订阅主题具有消费者自动再均衡的功能在多个消费者的情况下可以根据分区分配政策来自动分配各个消费者与分区的关系以实现消费者负载均衡和故障自动转移。而通过assign()方法则没有。 
消息消费 
Kafka中的消息是基于拉模式的。Kafka中的消息消费是一个不断轮询的过程消费者所要做的就是重复地调用poll()方法而poll()方法返回的是所订阅的主题分区上的一组消息。如果没有消息则返回空。 
public ConsumerRecordsK, V (final Duration timeout)timeout用于控制poll()方法的阻塞时间没有消息时会阻塞。 
位移提交 
Kafka中的每条消息都有唯一的offset用来标识消息在分区中对应的位置。Kafka默认的消费唯一的提交方式是自动提交由enable.auto.commit配置默认为true。自动提交不是每一条消息提交一次而是定期提交周期由auto.commit.interval.ms配置默认为5秒。 自动提交可能发生消息重复或者丢失的情况Kafka还提供了手动提交的方式。enable.auto.commit配置为false开启手动提交。 
指定位移消费 
在Kafka中每当消费者查找不到所记录的消费位移时就会根据消费者客户端参数auto.offset.reset的配置来决定从何处开始进行消费。默认值为lastest表示从分区末尾开始消费消息earliest表示从起始开始消费none为不进行消费而是抛出异常。 
seek()可以从特定的位移处开始拉去消息得以追前消费或回溯消费。 
public void seek(TopicPartition partition, long offset)再均衡 
再均衡是指分区的所属权从一个消费者转移到另一个消费者的行为它为消费组具备高可用性和伸缩性提供保障使我们可以既方便又安全地删除消费组内的消费者或者往消费组内添加消费者。不过在再均衡发生期间消费组内的消费者是无法读取消息的。再均衡后也可能出现重复消费的情况。所以应尽量避免不必要的再均衡发生。1.3、Consumer Group 消费者群组 同一个消费者组中保证每个分区只能被一个消费者使用 不会出现多个消费者读取同一个分区的情况通过这种方式消费者可以消费包含大量消息的主题。而且如果某个消费者失效群组里的其他消费者可以接管失效悄费者的工作。 
1.4、Topic 主题 Kafka中 的消息是根据 Topic 进行分类的Topic 是支持多订阅的一个 Topic 可以有多个不同的订阅消息的消费者。Kafka 集群 Topic 的数量没有限制同一个 Topic 的数据会被划分在同一个目录下一个 Topic 可以包含 1 至多个分区所有分区的消息加在一起就是一个 Topic 的所有消息。 
1.5、Partition 分区 在Kafka中每个 Topic 至少有一个 Partition 一个 topic 可以包含多个 分区partitiontopic 消息保存在各个 partition 上由于一个 topic 能被分到多个分区上给 kafka 提供给了并行的处理能力这也正是 kafka 高吞吐的原因之一。 每一个分区都是一个顺序的、不可变的消息队列 并且可以持续的添加。分区中的消息都被分了一个序列号称之为偏移量(offset)消息在日志中的位置可以理解是消息在 partition 上的偏移量也是代表消息的唯一序号。 
分区策略 
分区策略说明轮询策略按顺序轮流将每条数据分配到每个分区中随机策略每次都随机地将消息分配到每个分区按键保存策略生产者发送数据的时候可以指定一个key计算这个key的hashCodet值按照hashCodel的值对不同消息进行存储
如果 topic 有多个 partition消费数据时就不能保证数据的顺序。严格保证消息的消费顺序的场景下需要将 partition 数目设为1。 
1.6、Log 日志存储 
一个分区对应一个日志文件Log为了防止Log过大Kafka又引入了日志分段LogSegment的概念将Log切分为多个LogSegment便于消息的维护和清理。Log在物理上只以命名为topic-partitiom文件夹的形式存储而每个LogSegment对应磁盘上的一个日志文件和两个索引文件以及可能的其他文件。 LogSegment文件由两部分组成分别为“.index”文件和“.log”文件分别表示为segment的索引文件和数据文件 
partition全局的第一个segment从0开始后续每个segment文件名为上一个segment文件最后一条消息的offset值数值大小为64位20位数据字符长度没有数字用0填充 
消息压缩 
一条消息通常不会太大Kafka是批量消息压缩通过compression.type配置默认为producer还可以配置为gzip、snappy、lz4uncompressed表示不压缩。 
日志索引 
Kafka中的索引文件以稀疏索引的方式构造消息的索引它并不保证每个消息在索引文件中都有对应的索引项。每当写入一定量log.index.interval.bytes指定默认4KB的消息时偏移量索引文件和时间戳索引文件分别增加一个偏移量索引文件项和时间戳索引文件项。稀疏索引通过MappedByteBuffer将索引文件映射到内存中以加快索引的查询速度。 
日志清理 
Kafka提供两种日志清理策略 
日志删除按照一定的保留策略基于时间、日志大小或日志起始偏移量直接删除不符合条件的日志分段。日志压缩针对每个消息的key进行整合对于有相同key的不同value值只保留最后一个版本。 
页缓存 
页缓存是把磁盘中的数据缓存到内存中把对磁盘的访问变为对内存的访问减少对磁盘IO的操作。零拷贝 
所谓的零拷贝是将数据直接从磁盘文件复制到网卡设备中而不需要经由应用程序之手。减少了数据拷贝的次数和内核和用户模式之间的上下文切换。对于Linux操作系统而言底层依赖于sendfile()方法实现。一般的数据流程磁盘 - 内核 - 应用 - Socket - 网卡数据复制4次上下文切换4次。 流程步骤 
操作系统将数据从磁盘文件中读取到内核空间的页面缓存。应用程序将数据从内核空间读入用户空间缓冲区。应用程序将读到数据写回内核空间并放入socket缓冲区。操作系统将数据从socket缓冲区复制到网卡接口此时数据才能通过网络发送。 
通过网卡直接去访问系统的内存就可以实现现绝对的零拷贝了。这样就可以最大程度提高传输性能。通过“零拷贝”技术我们可以去掉那些没必要的数据复制操作 同时也会减少上下文切换次数。 通过上图可以看到零拷贝技术只用将磁盘文件的数据复制到页面缓存中一次然后将数据从页面缓存直接发送到网络中发送给不同的订阅者时都可以使用同一个页面缓存避免了重复复制操作。 
1.7、Broker 服务器 一个独立的 Kafka 服务器被称为broker broker接收来自生产者的消息为消息设置偏移量并提交消息到磁盘保存。 
如果broker端配置参数auto.create.topics.enable设置为true默认为true那么当生产者向一个尚未创建的主题发送消息时会自动创建一个分区数为num.partitions默认为1、副本因子为default.replication.factor默认值为1的主题。分区和分区副本都对应一个日志文件不是分区数越多吞吐量就越大超过阈值会使Kafka报错或系统崩溃。分区只能增加不能减少。 
1.8、Offset 偏移量 消息的唯一标识是连续的序列号偏移量决定读取数据的位置不会有线程安全的问题消费者通过偏移量来决定下次读取的消息。消息被消费之后并不被马上删除这样多个业务就可以重复使用 kafkal 的消息我们某一个业务也可以通过修改偏移量达到重新读取消息的目的偏移量由用户控制。消息最终还是会被删除的默认生命周期为1周(7*24小时)。 
1.9、Replication 副本 每个 partition 有多个副本其中有且仅有一个作为 LeaderLeader 是当前负责数据的读写的 partition。 producer 和 consumer 只跟 leader 交互。Follower 跟随 Leader所有写请求都通过 Leader 路由数据变更会广播给所有 FollowerFollower 与 Leader 保持数据同步。如果 Leader 失效则从 Follower 中选举出一个新的 Leader。 AR、ISR、OSR 
分区中的所有副本统称为AR。所有与leader副本保持一定同步程度的副本组成。与leader副本同步滞后过多的副本组成OSR。AR  ISR OSR。正常情况 应该ARISROSR集合为空。 
Kafka 副本 Leader 选举原理的理解 
① Kafka要先从所有Broker中选出唯一的一个Controller。 所有的Broker会尝试在Zookeeper中创建临时节点/controller谁先创建成功谁就是Controller。那如果Controller挂掉或者网络出现问题ZooKeeper上的临时节点就会消失。其他的Broker通过Watch监听到Controller下线的消息后继续按照先到先得的原则竞选Controller。这个Controller就相当于选举委员会的主席。 
② Controller确定以后就可以开始做分区选主的事情。接下来就是找候选人。显然每个Replication副本都想推荐自己但不是所有的副本都有竞选资格。只有在ISR保持心跳同步的副本才有资格参与竞选。就好比是皇帝每天着急皇子们开早会只有每天来打卡的皇子才能加入ISR。那些请假的、迟到的没有资格参与选举。接下来就是Leader选举就相当于要在众多皇子中选出太子。它的选举算法和微软的PacificA算法最相近。大致意思就是默认是让ISR中第一个Replica变成Leader。比如ISR是1、5、9优先让1成为Leader。这个跟中国古代皇帝传位是一样的优先传给皇长子。 
1.10、Zookeeper 2、Kafka 环境搭建 
2.1、下载 Kafka 
你可以在kafka官网 http://kafka.apache.org/downloads下载到最新的kafka安装包选择下载二进制版本的tgz文件。 
2.2、修改 Kafka 配置 config/server.properties 
修改配置文件config/server.properties 
#broker.id属性在kafka集群中必须要是唯一
broker.id 0
#kafka部署的机器ip和提供服务的端口号
listenersPLAINTEXT://127.0.0.1:9092
#kafka的消息存储文件
log.dir/usr/local/data/kafka-logs
#kafka连接zookeeper的地址
zookeeper.connect 127.0.0.1:2181server.properties核心配置详解 
PropertyDefaultDescriptionbroker.id0每个broker都可以用⼀个唯⼀的非负整数id进⾏标识log.dirs/tmp/kafka-logskafka存放数据的路径。这个路径并不是唯⼀的可以是多个路径之间只需要使用逗号分隔即可listenersPLAINTEXT://127.0.0.1:9092server接受客户端连接的端⼝ip配置kafka本机ip即可zookeeper.connectlocalhost:2181zooKeeper连接log.retention.hours168每个日志文件删除之前保存的时间num.partitions1创建topic的默认分区数default.replication.factor1自动创建topic的默认副本数量建议设置为⼤于等于2min.insync.replicas1producer 发送数据服务端的响应级别delete.topic.enablefalse是否允许删除主题
2.3、运行下面有用 AdminClient API 操作 
启动zookeeper 
sh zookeeper-server-start.sh config/zookeeper.properties启动kafka 
sh kafka-server-start.sh config/server.properties创建一个topic 
sh kafka-topic.sh --create --topic topic_name --zookeeper udp01:2181 --partitions 3 --replication-factor 1参数解释: 
partition指定当前创建的kafka topic的分区数量不指定默认为1。replication-factor 知道每个分区的复制因子不指定默认为1。 
创建topic还有一种是自动创建当你往一个不存在的topic里面输入数据的时候他会自动创建一个默认配置的topic这种方式需要在server.properties配置文件中加上auto.create.topics.enableture。 
查看已经创建的topic 
sh kafka-topics.sh --list --zookeeper udp01:2181查看某个的topic的信息 
sh kafka-topics.sh --describe --zookeeper udp01:2181  --topic topic_name修改toipc信息 
sh kafka-topics.sh  --zookeeper udp01:2181  --alter  --topic topic_name --partitions/config/delete-config其中 partitions的数据只能比修改之前的大不能小。 
删除topic 
sh kafka-topics.sh --delete --topic topic_name --zookeeper udp01::2181删除有2种一种是标记删除但实际还是存在的一种是真的删除。要真的删除也有2种方式 
删除本地磁盘以及zk上的相关topic信息。所属目录为/brokers/topics配置server.properties文件中的delete.topic.enable为ture需要重启kafka才会生效在执行delete命令则会将topic删掉。 
向topic中传数据 
sh kafka-console-producer.sh --broker-list udp01:9092 --topic topic_name消费topic 
sh kafka-console-consumer.sh --bootstrap-server udp01:9092 --topic topic_name --from-beginning3、Kafka 的 JAVA 实现 
3.1、AdminClient API 
package com.example.canal.YangKafka;import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.ConfigResource;import java.util.*;public class AdminClientYang {public final static String TOPIC_NAME  yangTest;public static AdminClient adminClient() {Properties properties  new Properties();properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 127.0.0.1:9999);AdminClient adminClient  AdminClient.create(properties);return adminClient;}public static void main(String[] args) throws Exception {AdminClient adminClient  AdminClientYang.adminClient();System.out.println(adminClient :   adminClient);createTopic();}/*** 创建Topic实例*/public static void createTopic() {AdminClient adminClient  AdminClientYang.adminClient();// 副本因子Short re  1;NewTopic newTopic  new NewTopic(TOPIC_NAME, 1, re);CreateTopicsResult createTopicsResult  adminClient.createTopics(Arrays.asList(newTopic));System.out.println(CreateTopicsResult :   createTopicsResult);adminClient.close();}/*** 获取Topic列表*/public static void topicList() throws Exception {AdminClient adminClient  adminClient();// 是否查看internal选项ListTopicsOptions options  new ListTopicsOptions();// 设置我们是否应该列出内部topicoptions.listInternal(true);// 列出集群中可用的topicListTopicsResult listTopicsResult  adminClient.listTopics(options);// 返回一个topic名称集合的future(这里是KafkaFuture)SetString names  listTopicsResult.names().get();// 返回一个KafkaFuture它产生一个 TopicListing 对象的集合CollectionTopicListing topicListings  listTopicsResult.listings().get();// 返回一个KafkaFuture它产生一个topic名称到 TopicListing 对象的映射。KafkaFutureMapString, TopicListing mapKafkaFuture  listTopicsResult.namesToListings();// 打印namesnames.stream().forEach(System.out::println);System.out.println(---------------------------topic列表-------------------------);// 打印topicListingstopicListings.stream().forEach((topicList) - {System.out.println(topicList);});System.out.println(---------------------------topic列表-------------------------);}/*** 删除topic*/public static void delTopics() throws Exception {AdminClient adminClient  adminClient();// 删除一批topic。// 此操作不是事务性的因此它可能对某些主题成功而对另一些主题则失败。// DeleteTopicsResult返回成功后所有代理可能需要几秒钟才能意识到主题已消失。 在此期间// listTopics()和describeTopics(Collection)可能会继续返回有关已删除主题的信息。DeleteTopicsResult deleteTopicsResult  adminClient.deleteTopics(Arrays.asList(TOPIC_NAME));deleteTopicsResult.all().get();}/*** 描述topic* name: yibo_topic* desc: (nameyibo_topic,*      internalfalse,*      partitions*          (partition0,*          leader192.168.174.128:9092 (id: 0 rack: null),*          replicas192.168.174.128:9092 (id: 0 rack: null),*          isr192.168.174.128:9092 (id: 0 rack: null)),*          authorizedOperationsnull)* throws Exception*/public static void describeTopic() throws Exception {AdminClient adminClient  adminClient();// 描述集群中的一些topic。DescribeTopicsResult describeTopicsResult  adminClient.describeTopics(Arrays.asList(TOPIC_NAME));MapString, TopicDescription stringTopicDescriptionMap  describeTopicsResult.all().get();SetMap.EntryString, TopicDescription entries  stringTopicDescriptionMap.entrySet();System.out.println(----------------------------topic信息-----------------------------);entries.stream().forEach((entry) - {System.out.println(name   entry.getKey()   , desc:   entry.getValue());});System.out.println(----------------------------topic信息-----------------------------);}/*** 查询配置信息*/public static void describeConfig() throws Exception {AdminClient adminClient  adminClient();ConfigResource configResource  new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME);// 获取指定资源的配置DescribeConfigsResult describeConfigsResult  adminClient.describeConfigs(Arrays.asList(configResource));MapConfigResource, Config configResourceConfigMap  describeConfigsResult.all().get();System.out.println(----------------------------配置信息-----------------------------);configResourceConfigMap.entrySet().stream().forEach((entry) - {System.out.println(configResource :   entry.getKey()   , Config :   entry.getValue());});System.out.println(----------------------------配置信息-----------------------------);}/*** 修改配置信息 老版API*/public static void alterConfig() throws Exception {AdminClient adminClient  adminClient();MapConfigResource, CollectionAlterConfigOp configMaps  new HashMap();// 具有配置的资源的类需要提供type和名称 Type是他内部维护的枚举类共有四种类型BROKER_LOGGER((byte) 8), BROKER((byte) 4), TOPIC((byte) 2),// UNKNOWN((byte) 0)ConfigResource configResource  new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME);// 包含名称、值和操作类型的更改配置条目的类 ,需要注入ConfigEntry和操作类型同样OpType是个枚举类AlterConfigOp alterConfigOp new AlterConfigOp(new ConfigEntry(preallocate, false), AlterConfigOp.OpType.SET);configMaps.put(configResource, Arrays.asList(alterConfigOp));// 逐步更新指定资源的配置AlterConfigsResult alterConfigsResult  adminClient.incrementalAlterConfigs(configMaps);alterConfigsResult.all().get();}/*** 增加partitions数量*/public static void incrPartitions(int partitions) throws Exception {AdminClient adminClient  adminClient();MapString, NewPartitions partitionsMap  new HashMap();NewPartitions newPartitions  NewPartitions.increaseTo(partitions);partitionsMap.put(TOPIC_NAME, newPartitions);CreatePartitionsResult createPartitionsResult  adminClient.createPartitions(partitionsMap);createPartitionsResult.all().get();}
}3.2、Kafka 生产者 JAVA 代码 
package com.example.canal.YangKafka;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;/*** 生产者*/
public class ProducerdemoYang {public static void main(String[] args) {// kafka 配置Properties properties  new Properties();/*** 用于建立与 kafka 集群连接的 host/port*/properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 127.0.0.1:9999);/*** producer 需要 server 接收到数据之后发出的确认接收的信号此项配置就是指 procuder需要多少个这样的确认信号。此配置实际上代表了数据备份的可用性。以下设置为常用选项* 1acks0生产者在成功写入消息之前不会等待任何来自服务器的响应消息传递过程中有可能丢失其实就是保证消息不会重复发送或者重复消费但是速度最快。同时重试配置不会发生作用。* 2acks1默认值只要集群首领节点收到消息生产者就会收到一个来自服务器的成功响应。* 3acksall只有当所有参与赋值的节点全部收到消息时生产者才会收到一个来自服务器的成功响应。*/properties.put(ProducerConfig.ACKS_CONFIG, 1);/*** 如果请求失败生产者会自动重试如果启用重试则会有重复消息的可能性本次采用手动重试*/// properties.put(ProducerConfig.RETRIES_CONFIG, 3);/*** 当多个消息发送到相同分区时生产者会将消息打包到一起以减少请求交互. 而不是一条条发送设置批量发送消息的大小默认值是16384即16kb就是说一个batch满了16kb就发送出去* 比如说kafka里的消息5秒钟Batch才凑满了16KB才能发送出去。那这些消息的延迟就是5秒钟*/properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);/*** 有的时刻消息比较少过了很久比如5min也没有凑够16KB这样延时就很大所以需要一个参数. 再设置一个时间到了这个时间即使数据没达到16KB也将这个批次发送出去*/properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);/*** 生产者内存缓冲区的大小如果数据产生速度大于向 broker 发送的速度将会耗尽这个缓存空间*/properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);/*** 该配置控制 KafkaProducers send()partitionsFor()inittransaction* ()sendOffsetsToTransaction()commitTransaction() 和abortTransaction()方法将阻塞。对于send()此超时限制了获取元数据和分配缓冲区的总等待时间*/properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000);/*** 将消息发送到kafka server, 所以肯定需要用到序列化的操作我们这里发送的消息是string类型的所以使用string的序列化类*/properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 生产者对象KafkaProducerString, String producer  new KafkaProducerString, String(properties);String test  hello yang;// 开启幂等properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);// 封装发送的消息ProducerRecordString, String record  new ProducerRecordString, String(yangTest, test);// 封装发送的消息指定 key消息将会被同一分区处理// ProducerRecordString, String record  new ProducerRecordString, String(test-topic, 10086, test);// 同步发送消息// producer.send(record);// 异步发送消息producer.send(record, new Callback() {Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception  null) {System.out.println(发送成功);} else {System.out.println(发送失败);}if (metadata ! null) {System.out.println(异步方式发送消息结果  topic‐  metadata.topic()  |partition‐  metadata.partition()  |offset‐  metadata.offset());}}});// 关闭消息通道必须关闭否则消息发送不成功producer.close();}}3.3、Kafka 消费者 JAVA 代码 
package com.example.canal.YangKafka;import com.fasterxml.jackson.databind.JsonDeserializer;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.*;/*** 消费者*/
public class ConsumerdemoYang {public static void main(String[] args) {// kafka 配置Properties properties  new Properties();/*** 用于建立与 kafka 集群连接的 host/port*/properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 127.0.0.1:9999);/*** 消费者组*/properties.put(ConsumerConfig.GROUP_ID_CONFIG, group2);/*** 是否自动提交偏移量默认值是true为了避免出现重复数据和数据丢失可以把它设置为false然后手动提交偏移量*/// properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);/*** 自动提交的时间间隔自动提交开启时生效*/properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 2000);/*** 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理 earliest当各分区下有已提交的offset时从提交的offset开始消费无提交的offset时从头开始消费分区的记录* latest(默认)当各分区下有已提交的offset时从提交的offset开始消费无提交的offset时消费新产生的该分区下的数据在消费者启动之后生成的记录* none当各分区都存在已提交的offset时从提交的offset开始消费只要有一个分区不存在已提交的offset则抛出异常*/properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, earliest);/*** 当broker多久没有收到consumer的心跳请求后就触发reBalance默认值是10s*/properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);/*** 序列化建议使用Json这种序列化方式可以无需额外配置传输实体类*/// properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,// org.apache.kafka.common.serialization.StringSerializer);// properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,// org.apache.kafka.common.serialization.StringSerializer);properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 消费者对象KafkaConsumerString, String consumer  new KafkaConsumerString, String(properties);// 订阅主题consumer.subscribe(Collections.singletonList(yangTest));// 消费指定分区// consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));// 消息回溯消费// consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));// 指定offset消费// consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);// 从指定时间点开始消费从1小时前开始消费// ListPartitionInfo topicPartitions  consumer.partitionsFor(TOPIC_NAME);// long fetchDataTime  new Date().getTime() - (1000 * 60 * 60);// MapTopicPartition, Long map  new HashMap();// for (PartitionInfo par : topicPartitions) {// map.put(new TopicPartition(topicName, par.partition()), fetchDataTime);// }// MapTopicPartition, OffsetAndTimestamp parMap  consumer.offsetsForTimes(map);// for (Map.EntryTopicPartition, OffsetAndTimestamp entry : parMap.entrySet()) {// TopicPartition key  entry.getKey();// OffsetAndTimestamp value  entry.getValue();// if (key  null || value  null)// continue;// Long offset  value.offset();// System.out.println(partition‐  key.partition()  |offset‐  offset);// System.out.println();// // 根据消费里的timestamp确定offset// if (value ! null) {// consumer.assign(Arrays.asList(key));// consumer.seek(key, offset);// }// }// while (true) {// ConsumerRecordsString, String consumerRecords  consumer.poll(Duration.ofMillis(1000));// for (ConsumerRecordString, String consumerRecord : consumerRecords) {// System.out.println(consumerRecord.key());// System.out.println(consumerRecord.value());// }// }while (true) {/*** poll() API 是拉取消息的长轮询 比如设置了1000毫秒 并不是在这1秒钟内只拉取一次 而是当没有拉取到数据时 会多次拉取数据 直到拉取到数据 然后继续循环*/ConsumerRecordsString, String records  consumer.poll(Duration.ofMillis(1000));for (ConsumerRecordString, String record : records) {System.out.printf(收到消息partition  %d,offset  %d, key  %s, value  %s%n, record.partition(),record.offset(), record.key(), record.value());}if (records.count()  0) {// 手动同步提交offset当前线程会阻塞直到offset提交成功// 一般使用同步提交因为提交之后一般也没有什么逻辑代码了// consumer.commitSync();// 手动异步提交offset当前线程提交offset不会阻塞可以继续处理后面的程序逻辑consumer.commitAsync(new OffsetCommitCallback() {Overridepublic void onComplete(MapTopicPartition, OffsetAndMetadata offsets, Exception exception) {if (exception ! null) {System.err.println(Commit failed for   offsets);System.err.println(Commit failed exception:   exception.getStackTrace());}}});}}}
} 
3.4、Spring Boot 实现消费者对 Kafka 的监听 
application.properties 
# kafka
spring.kafka.bootstrap-servers127.0.0.1:9999
# 消费者组
spring.kafka.consumer.group-idYang
# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理
spring.kafka.consumer.auto-offset-resetearliest
# 是否开启自动提交
spring.kafka.consumer.enable-auto-committrue
# 动提交的时间间隔自动提交开启时生效
spring.kafka.consumer.auto-commit-interval100监听类 
package com.example.canal.YangKafka;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;Component
public class TopicListener {KafkaListener(topics  yangTest)public void listen(ConsumerRecord?, ? record) {System.out.println(kafka-message: key--  record.key()  ,value--  record.value().toString());}
}4、Kafka 幂等性 
kafka默认情况下提供的是至少一次的可靠性保障。即broker保障已提交的消息的发送但是遇上某些意外情况如网络抖动超时等问题导致Producer没有收到broker返回的数据ack则Producer会继续重试发送消息从而导致消息重复发送。 
如果我们禁止Producer的失败重试发送功能消息要么写入成功要么写入失败但绝不会重复发送。这样就是最多一次的消息保障模式。但对于消息组件排除特殊业务场景我们追求的一定是精确一次的消息保障模式。kafka通过 幂等性Idempotence和事务Transaction 的机制提供了这种精确的消息保障。 
4.1、幂等性要解决的问题 
在 0.11.0 之前Kafka 通过 Producer 端和 Server 端的相关配置可以做到 数据不丢 也就是 at least once但是在一些情况下可能会导致数据重复比如网络请求延迟等导致的重试操作在发送请求重试时 Server 端并不知道这条请求是否已经处理没有记录之前的状态信息所以就会有可能导致数据请求的重复发送这是 Kafka 自身的机制异常时请求重试机制导致的数据重复。 
对于大多数应用而言数据保证不丢是可以满足其需求的但是对于一些其他的应用场景比如支付数据等它们是要求精确计数的这时候如果上游数据有重复下游应用只能在消费数据时进行相应的去重操作应用在去重时最常用的手段就是根据唯一 id 键做 check 去重。 
在这种场景下因为上游生产导致的数据重复问题会导致所有有精确计数需求的下游应用都需要做这种复杂的、重复的去重处理。试想一下如果在发送时系统就能保证 exactly once这对下游将是多么大的解脱。这就是幂等性要解决的问题主要是解决数据重复的问题正如前面所述数据重复问题通用的解决方案就是加唯一 id然后根据 id 判断数据是否重复Producer 的幂等性也是这样实现的。 
4.2、Kafka 是怎么保证幂等性的 
Kafka为了实现幂等性它在底层设计架构中引入了ProducerID和SequenceNumber。 
ProducerID在每个新的Producer初始化时会被分配一个唯一的ProducerID这个ProducerID对客户端使用者是不可见的。SequenceNumber对于每个ProducerIDProducer发送数据的每个Topic和Partition都对应一个从0开始单调递增的SequenceNumber值。 当Producer发送消息(x2,y2)给Broker时Broker接收到消息并将其追加到消息流中。此时Broker返回Ack信号给Producer时发生异常导致Producer接收Ack信号失败。对于Producer来说会触发重试机制将消息(x2,y2)再次发送但是由于引入了幂等性在每条消息中附带了PIDProducerID和SequenceNumber。相同的PID和SequenceNumber发送给Broker而之前Broker缓存过之前发送的相同的消息那么在消息流中的消息就只有一条(x2,y2)不会出现重复发送的情况。 
开启幂等性配置 
props.put(enable.idempotence, ture)
//或者
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG true)4.3、Kafka 幂等性的局限性 
开启enable.idempotence后kafka就会自动帮你做好消息去重的一系列工作。底层具体实现原理很简单就是用空间换时间的优化思路即在broker端多存一些字段来标识数据的唯一性。当Producer发送了具有相同字段值的消息后broker会进行匹配去重丢弃重复的数据。实际的代码没这么简单但大致是这么个处理逻辑。 
官方的这个幂等实现看似简单高效但也存在他的局限性。他只能保证单分区上的幂等性即一个幂等性Producer只能够保证某个topic的一个分区上不出现重复消息无法实现多分区的幂等。此外如果Producer重启也会导致幂等重置。 
事务 
对于多分区保证幂等的场景则需要事务特性来处理了。kafka的事务跟我们常见数据库事务概念差不多也是提供经典的ACID即原子Atomicity、一致性 (Consistency)、隔离性 (Isolation) 和持久性 (Durability)。 
事务Producer保证消息写入分区的原子性即这批消息要么全部写入成功要么全失败。此外Producer重启回来后kafka依然保证它们发送消息的精确一次处理。事务特性的配置也很简单 
和幂等Producer一样开启enable.idempotence  true设置Producer端参数transctional.id事务Producer的代码稍微也有点不一样需要调一些事务处理的API。数据的发送需要放在beginTransaction和commitTransaction之间。Consumer端的代码也需要加上isolation.level参数用以处理事务提交的数据。示例代码: 
提供唯一的 transactionalId 
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, “transacetionId”);producer.initTransactions();
try {producer.beginTransaction();producer.send(record1);producer.send(record2);producer.commitTransaction();
} catch (KafkaException e) {producer.abortTransaction();
}事务Producer虽然在多分区的数据处理上保证了幂等但是处理性能上相应的是会有一些下降的。 
5、Kafka 常见问题 
5.1、Kafka 消息丢失 
1. 生产过程丢失消息 
解决方案重发就行了。 
由于kafka为了提高性能采用了异步发送消息。我们只有获取到发送结果才能确保消息发送成功。 有两个方案可以获取发送结果。一种是kafka把发送结果封装在Future对象中我可以使用Future的get方法同步阻塞获取结果。 
FutureRecordMetadata future  producer.send(new ProducerRecord(topic, message));
try {RecordMetadata recordMetadata  future.get();if (recordMetadata ! null) {System.out.println(发送成功);}
} catch (Exception e) {e.printStackTrace();
}另一种是使用kafka的callback函数获取返回结果。 
producer.send(new ProducerRecord(topic, message), new Callback() {Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception  null) {System.out.println(发送成功);} else {System.out.println(发送失败);}}
});如果发送失败了有两种重试方案 
自动重试 kafka支持自动重试设置参数如下当集群Leader选举中或者Follower数量不足等原因返回失败时就可以自动重试。# 设置重试次数为3 retries  3 # 设置重试间隔为100msretry.backoff.ms  100一般我们不会用kafka自动重试因为超过重试次数还是会返回失败还需要我们手动重试。手动重试 在catch逻辑或else逻辑中再调用一次send方法。如果还不成功怎么办 在数据库中建一张异常消息表把失败消息存入表中然后搞个异步任务重试便于控制重试次数和间隔时间。 
2. 服务端持久化过程丢失消息 
为了保证性能kafka采用的是异步刷盘当我们发送消息成功后Broker节点在刷盘之前宕机了就会导致消息丢失。 
当然我们也可以设置刷盘频率 
# 设置每1000条消息刷一次盘
flush.messages  1000
# 设置每秒刷一次盘
flush.ms  10003. 消费过程丢失消息 
kafka中有个offset的概念consumer从partition中拉取消息consumer本地处理完成后需要commit一下offset表示消费完成下次就不会再拉取到这条消息。所以我们需要关闭自动commit offset的配置防止consumer拉到消息后服务宕机导致消息丢失。 
enable.auto.commit  false总结  
5.2、Kafka 消息重复消费 
1. Kafka消费端重复提交导致消息重复消费 
默认情况下消息消费完以后会自动提交Offset的值避免重复消费。但是Kafka消费端的自动提交会有一个默认的5秒间隔也就是说在5秒之后的下一次向Broker拉取消息的时候才提交上一批消费的offset。所以在消费者消费的过程中如果遇到应用程序被强制kill掉或者宕机的情况可能会导致Offset没有及时提交从而产生重复提交的问题。 
2. Kafka服务端的Partition再均衡机制导致消息重复消费 
在Kafka中有一个Partition Balance机制就是把多个Partition均衡的分配给多个消费者。消费端会从分配到的Partition里面去消费消息如果消费者在默认的5分钟内没有处理完这一批消息。就会触发Kafka的Rebalance机制从而导致offset自动提交失败。而Rebalance之后消费者还是会从之前没提交的offset位置开始消费从而导致消息重复消费。 
解决方案 
① 针对于消费端挂掉等原因造成的重复消费问题 
这部分主要集中在消费端的编码层面需要我们在设计代码时以幂等性的角度进行开发设计保证同一数据无论进行多少次消费所造成的结果都一样。处理方式可以在消息体中添加唯一标识(比如将消息生成md5保存到Mysql或者是Redis中在处理消息前先检查下Mysql/Redis是否已经处理过该消息了)消费端进行确认此唯一标识是否已经消费过如果消费过则不进行之后处理。从而尽可能的避免了重复消费。 
幂等角度大概两种实现 
将唯一标识存入第三方介质如Redis要操作数据的时候先判断第三方介质(数据库或者缓存)有没有这个唯一标识。将版本号(offset)存入到数据里面然后再要操作数据的时候用这个版本号做乐观锁当版本号大于原先的才能操作。 
② 针对于Consumer消费时间过长带来的重复消费问题 
提高单条消息的处理速度。例如对消息处理中比较耗时的操作可通过异步的方式进行处理、利用多线程处理等。其次在缩短单条消息消费时常的同时根据实际场景可将max.poll.interval.ms值设置大一点避免不必要的rebalance此外可适当减小max.poll.records的值默认值是500可根据实际消息速率适当调小。 
提高消费端的处理性能避免触发Balance比如可以用多线程的方式来处理消息缩短单个消息消费的时长。或者还可以调整消息处理的超时时间也还可以减少一次性从Broker上拉取数据的条数。 
5.3、kafka 为什么快/吞吐量大 
顺序读写Kafka每个分区对应一个日志文件消息写入是追加到日志文件后面、顺序写磁盘的速度快于随机写。批量发送Kafka发送消息时将消息缓存到本地达到一定数量或者间隔一定时间再发送减少了网络请求的次数。批量压缩发送的时候对数据进行压缩。页面缓存Kafka大量使用了页面缓存就是将数据写入磁盘前会先写入系统缓存然后进行刷盘读取数据也会先读取缓存没有再读磁盘。虽然异步刷盘会因单点故障导致数据丢失但是多副本的机制保障了数据的持久化。零拷贝Kafka使用了DMA的技术使Socket缓冲池可以直接读取内核内存的数据减少了数据拷贝到应用再拷贝到Socket缓冲池的过程也减少了2次上下文切换。 文章转载自: http://www.morning.qhkx.cn.gov.cn.qhkx.cn http://www.morning.fdmfn.cn.gov.cn.fdmfn.cn http://www.morning.wrbx.cn.gov.cn.wrbx.cn http://www.morning.ykbgs.cn.gov.cn.ykbgs.cn http://www.morning.wbxrl.cn.gov.cn.wbxrl.cn http://www.morning.mqxzh.cn.gov.cn.mqxzh.cn http://www.morning.synlt.cn.gov.cn.synlt.cn http://www.morning.mdwb.cn.gov.cn.mdwb.cn http://www.morning.wnhml.cn.gov.cn.wnhml.cn http://www.morning.qdlr.cn.gov.cn.qdlr.cn http://www.morning.shnqh.cn.gov.cn.shnqh.cn http://www.morning.roymf.cn.gov.cn.roymf.cn http://www.morning.bfnbn.cn.gov.cn.bfnbn.cn http://www.morning.gcjhh.cn.gov.cn.gcjhh.cn http://www.morning.mttqp.cn.gov.cn.mttqp.cn http://www.morning.rmxgk.cn.gov.cn.rmxgk.cn http://www.morning.rbnp.cn.gov.cn.rbnp.cn http://www.morning.qwmdx.cn.gov.cn.qwmdx.cn http://www.morning.zffn.cn.gov.cn.zffn.cn http://www.morning.dsncg.cn.gov.cn.dsncg.cn http://www.morning.zmlnp.cn.gov.cn.zmlnp.cn http://www.morning.hpjpy.cn.gov.cn.hpjpy.cn http://www.morning.wnhml.cn.gov.cn.wnhml.cn http://www.morning.lmyq.cn.gov.cn.lmyq.cn http://www.morning.hdlhh.cn.gov.cn.hdlhh.cn http://www.morning.qttft.cn.gov.cn.qttft.cn http://www.morning.sthgm.cn.gov.cn.sthgm.cn http://www.morning.rdpps.cn.gov.cn.rdpps.cn http://www.morning.pgfkl.cn.gov.cn.pgfkl.cn http://www.morning.tngdn.cn.gov.cn.tngdn.cn http://www.morning.qtkfp.cn.gov.cn.qtkfp.cn http://www.morning.qrzqd.cn.gov.cn.qrzqd.cn http://www.morning.yfmlj.cn.gov.cn.yfmlj.cn http://www.morning.qggm.cn.gov.cn.qggm.cn http://www.morning.xmjzn.cn.gov.cn.xmjzn.cn http://www.morning.xhhzn.cn.gov.cn.xhhzn.cn http://www.morning.qbrs.cn.gov.cn.qbrs.cn http://www.morning.wkxsy.cn.gov.cn.wkxsy.cn http://www.morning.tygn.cn.gov.cn.tygn.cn http://www.morning.yrbqy.cn.gov.cn.yrbqy.cn http://www.morning.dhpjq.cn.gov.cn.dhpjq.cn http://www.morning.zcfsq.cn.gov.cn.zcfsq.cn http://www.morning.wdlyt.cn.gov.cn.wdlyt.cn http://www.morning.crfyr.cn.gov.cn.crfyr.cn http://www.morning.dtzsm.cn.gov.cn.dtzsm.cn http://www.morning.ddjp.cn.gov.cn.ddjp.cn http://www.morning.c7622.cn.gov.cn.c7622.cn http://www.morning.duqianw.com.gov.cn.duqianw.com http://www.morning.zzaxr.cn.gov.cn.zzaxr.cn http://www.morning.brkc.cn.gov.cn.brkc.cn http://www.morning.ljtwp.cn.gov.cn.ljtwp.cn http://www.morning.bpmnz.cn.gov.cn.bpmnz.cn http://www.morning.wnjsp.cn.gov.cn.wnjsp.cn http://www.morning.yznsx.cn.gov.cn.yznsx.cn http://www.morning.rkxk.cn.gov.cn.rkxk.cn http://www.morning.wjhpg.cn.gov.cn.wjhpg.cn http://www.morning.xqjz.cn.gov.cn.xqjz.cn http://www.morning.xtgzp.cn.gov.cn.xtgzp.cn http://www.morning.mkrqh.cn.gov.cn.mkrqh.cn http://www.morning.grpfj.cn.gov.cn.grpfj.cn http://www.morning.kpcdc.cn.gov.cn.kpcdc.cn http://www.morning.rgrys.cn.gov.cn.rgrys.cn http://www.morning.bpmdz.cn.gov.cn.bpmdz.cn http://www.morning.jjpk.cn.gov.cn.jjpk.cn http://www.morning.bsbcp.cn.gov.cn.bsbcp.cn http://www.morning.cdrzw.cn.gov.cn.cdrzw.cn http://www.morning.fpkdd.cn.gov.cn.fpkdd.cn http://www.morning.skrcn.cn.gov.cn.skrcn.cn http://www.morning.mnlk.cn.gov.cn.mnlk.cn http://www.morning.jydhl.cn.gov.cn.jydhl.cn http://www.morning.srbmc.cn.gov.cn.srbmc.cn http://www.morning.qtkfp.cn.gov.cn.qtkfp.cn http://www.morning.khyqt.cn.gov.cn.khyqt.cn http://www.morning.ssjee.cn.gov.cn.ssjee.cn http://www.morning.kaakyy.com.gov.cn.kaakyy.com http://www.morning.fgqbx.cn.gov.cn.fgqbx.cn http://www.morning.jxrpn.cn.gov.cn.jxrpn.cn http://www.morning.bdqpl.cn.gov.cn.bdqpl.cn http://www.morning.nqlx.cn.gov.cn.nqlx.cn http://www.morning.cdygl.com.gov.cn.cdygl.com