企业网站建设报价方案模板,演示网站,wordpress设置邮件提醒,wordpress站点统计小工具目录 初识Kafka基本概念安装与配置ZooKeeper安装与配置Kafka的安装与配置 生产与消费服务端参数配置 生产者客户端开发消息对象 ProducerRecord必要的参数配置发送消息序列化分区器生产者拦截器 原理分析整体架构元数据的更新 重要的生产者参数acksmax.request.sizeretries和re… 目录 初识Kafka基本概念安装与配置ZooKeeper安装与配置Kafka的安装与配置 生产与消费服务端参数配置 生产者客户端开发消息对象 ProducerRecord必要的参数配置发送消息序列化分区器生产者拦截器 原理分析整体架构元数据的更新 重要的生产者参数acksmax.request.sizeretries和retry.backoff.mscompression.typeconnections.max.idle.mslinger.msreceive.buffer.bytessend.buffer.bytesrequest.timeout.ms 消费者消费者与消费组客户端开发必要的参数配置订阅主题与分区消息消费位移提交 初识Kafka
Kafka之所以受到越来越多的青睐与它所“扮演”的三大角色是分不开的
消息系统Kafka 和传统的消息系统也称作消息中间件都具备系统解耦、冗余存储、流量削峰、缓冲、异步通信、扩展性、可恢复性等功能。与此同时Kafka 还提供了大多数消息系统难以实现的消息顺序性保障及回溯消费的功能。存储系统Kafka 把消息持久化到磁盘相比于其他基于内存存储的系统而言有效地降低了数据丢失的风险。也正是得益于Kafka 的消息持久化功能和多副本机制我们可以把Kafka作为长期的数据存储系统来使用只需要把对应的数据保留策略设置为“永久”或启用主题的日志压缩功能即可。流式处理平台Kafka 不仅为每个流行的流式处理框架提供了可靠的数据来源还提供了一个完整的流式处理类库比如窗口、连接、变换和聚合等各类操作。
基本概念
一个典型的 Kafka 体系架构包括若干 Producer、若干 Broker、若干 Consumer以及一个ZooKeeper集群。其中ZooKeeper是Kafka用来负责集群元数据的管理、控制器的选举等操作的。Producer将消息发送到BrokerBroker负责将收到的消息存储到磁盘中而Consumer负责从Broker订阅并消费消息。整个Kafka体系结构中引入了以下3个术语
Producer生产者也就是发送消息的一方。生产者负责创建消息然后将其投递到Kafka中。Consumer消费者也就是接收消息的一方。消费者连接到Kafka上并接收消息进而进行相应的业务逻辑处理。Broker服务代理节点。对于Kafka而言Broker可以简单地看作一个独立的Kafka服务节点或Kafka服务实例。大多数情况下也可以将Broker看作一台Kafka服务器前提是这台服务器上只部署了一个Kafka实例。一个或多个Broker组成了一个Kafka集群。一般而言我们更习惯使用首字母小写的broker来表示服务代理节点。
在Kafka中还有两个特别重要的概念—**主题Topic与分区Partition**。Kafka中的消息以主题为单位进行归类生产者负责将消息发送到特定的主题发送到Kafka集群中的每一条消息都要指定一个主题而消费者负责订阅主题并进行消费。
主题是一个逻辑上的概念它还可以细分为多个分区一个分区只属于单个主题很多时候也会把分区称为主题分区Topic-Partition。同一主题下的不同分区包含的消息是不同的分区在存储层面可以看作一个可追加的日志Log文件消息在被追加到分区日志文件的时候都会分配一个特定的偏移量offset。offset是消息在分区中的唯一标识Kafka通过它来保证消息在分区内的顺序性不过offset并不跨越分区也就是说Kafka保证的是分区有序而不是主题有序。
Kafka中的分区可以分布在不同的服务器broker上也就是说一个主题可以横跨多个broker以此来提供比单个broker更强大的性能。
每一条消息被发送到broker之前会根据分区规则选择存储到哪个具体的分区。如果分区规则设定得合理所有的消息都可以均匀地分配到不同的分区中。
Kafka 为分区引入了多副本Replica机制通过增加副本数量可以提升容灾能力。同一分区的不同副本中保存的是相同的消息在同一时刻副本之间并非完全一样副本之间是“一主多从”的关系其中leader副本负责处理读写请求follower副本只负责与leader副本的消息同步。副本处于不同的broker中当leader副本出现故障时从follower副本中重新选举新的leader副本对外提供服务。Kafka通过多副本机制实现了故障的自动转移当Kafka集群中某个broker失效时仍然能保证服务可用。
生产者和消费者只与leader副本进行交互而follower副本只负责消息的同步很多时候follower副本中的消息相对leader副本而言会有一定的滞后。
Kafka 消费端也具备一定的容灾能力。Consumer 使用拉Pull模式从服务端拉取消息并且保存消费的具体位置当消费者宕机后恢复上线时可以根据之前保存的消费位置重新拉取需要的消息进行消费这样就不会造成消息丢失。
分区中的所有副本统称为ARAssigned Replicas。所有与leader副本保持一定程度同步的副本包括leader副本在内组成ISRIn-Sync ReplicasISR集合是AR集合中的一个子集。消息会先发送到leader副本然后follower副本才能从leader副本中拉取消息进行同步同步期间内follower副本相对于leader副本而言会有一定程度的滞后。前面所说的“一定程度的同步”是指可忍受的滞后范围这个范围可以通过参数进行配置。与leader副本同步滞后过多的副本不包括leader副本组成**OSROut-of-Sync Replicas**由此可见ARISROSR。在正常情况下所有的follower 副本都应该与 leader 副本保持一定程度的同步即ARISROSR集合为空。
leader副本负责维护和跟踪ISR集合中所有follower副本的滞后状态当follower副本落后太多或失效时leader副本会把它从ISR集合中剔除。如果OSR集合中有follower副本“追上”了leader副本那么leader副本会把它从OSR集合转移至ISR集合。默认情况下当leader副本发生故障时只有在ISR集合中的副本才有资格被选举为新的leader而在OSR集合中的副本则没有任何机会不过这个原则也可以通过修改相应的参数配置来改变。
ISR与HW和LEO也有紧密的关系。HW是High Watermark的缩写俗称高水位它标识了一个特定的消息偏移量offset消费者只能拉取到这个offset之前的消息。 LEO是Log End Offset的缩写它标识当前日志文件中下一条待写入消息的offsetLEO的大小相当于当前日志分区中最后一条消息的offset值加1。分区ISR集合中的每个副本都会维护自身的LEO而ISR集合中最小的LEO即为分区的HW对消费者而言只能消费HW之前的消息。说白了就是没同步的消息还不允许被消费
由此可见Kafka 的复制机制既不是完全的同步复制也不是单纯的异步复制。事实上同步复制要求所有能工作的 follower 副本都复制完这条消息才会被确认为已成功提交这种复制方式极大地影响了性能。而在异步复制方式下follower副本异步地从leader副本中复制数据数据只要被leader副本写入就被认为已经成功提交。在这种情况下如果follower副本都还没有复制完而落后于leader副本突然leader副本宕机则会造成数据丢失。Kafka使用的这种ISR的方式则有效地权衡了数据可靠性和性能之间的关系。
安装与配置
ZooKeeper安装与配置
ZooKeeper是安装Kafka集群的必要组件Kafka通过ZooKeeper来实施对元数据信息的管理包括集群、broker、主题、分区等内容。不过新版本kafka已经摆脱了zk的依赖而是把元数据存储到了特定的topic里面
Kafka的安装与配置
修改broker的配置文件$KAFKA_HOME/conf/server.properties。主要关注以下几个配置参数即可 如果是单机模式那么修改完上述配置参数之后就可以启动服务。如果是集群模式那么只需要对单机模式的配置文件做相应的修改即可确保集群中每个broker的broker.id配置参数的值不一样以及listeners配置参数也需要修改为与broker对应的IP地址或域名之后就可以各自启动服务。注意在启动 Kafka 服务之前同样需要确保 zookeeper.connect参数所配置的ZooKeeper服务已经正确启动。
生产与消费
Kafka提供了许多实用的脚本工具存放在$KAFKA_HOME的bin目录下其中与主题有关的就是 kafka-topics.sh 脚本下面我们用它演示创建一个分区数为 4、副本因子为 3 的主题topic-demo示例如下 其中–zookeeper指定了Kafka所连接的ZooKeeper服务地址–topic指定了所要创建主题的名称–replication-factor 指定了副本因子–partitions 指定了分区个数–create是创建主题的动作指令。
服务端参数配置
下面挑选一些重要的服务端参数来做细致的说明这些参数都配置在$KAFKA_HOME/config/server.properties文件中。
zookeeper.connect该参数指明broker要连接的ZooKeeper集群的服务地址包含端口号没有默认值且此参数为必填项。可以配置为localhost2181如果ZooKeeper集群中有多个节点则可以用逗号将每个节点隔开类似于 localhost12181localhost22181localhost32181这种格式。最佳的实践方式是再加一个chroot路径这样既可以明确指明该chroot路径下的节点是为Kafka所用的也可以实现多个Kafka集群复用一套ZooKeeper集群这样可以节省更多的硬件资源。包含 chroot 路径的配置类似于 localhost12181localhost22181localhost32181/kafka这种如果不指定chroot那么默认使用ZooKeeper的根路径。listeners该参数指明broker监听客户端连接的地址列表即为客户端要连接broker的入口地址列表配置格式为protocol1//hostname1port1protocol2//hostname2port2其中protocol代表协议类型Kafka当前支持的协议类型有PLAINTEXT、SSL、SASL_SSL等如果未开启安全认证则使用简单的PLAINTEXT即可。hostname代表主机名port代表服务端口此参数的默认值为nullbroker.id该参数用来指定Kafka集群中broker的唯一标识默认值为-1。如果没有设置那么Kafka会自动生成一个。这个参数还和meta.properties文件及服务端参数broker.id.generation.enable和reserved.broker.max.id有关log.dir和log.dirsKafka 把所有的消息都保存在磁盘上而这两个参数用来配置Kafka 日志文件存放的根目录。一般情况下log.dir 用来配置单个根目录而 log.dirs 用来配置多个根目录以逗号分隔但是Kafka并没有对此做强制性限制也就是说log.dir和log.dirs都可以用来配置单个或多个根目录。log.dirs 的优先级比 log.dir 高但是如果没有配置log.dirs则会以 log.dir 配置为准。默认情况下只配置了 log.dir 参数其默认值为/tmp/kafka-logs。message.max.bytes该参数用来指定broker所能接收消息的最大值默认值为1000012B约等于976.6KB。如果 Producer 发送的消息大于这个参数所设置的值那么Producer就会报出RecordTooLargeException的异常。如果需要修改这个参数那么还要考虑max.request.size 客户端参数、max.message.bytestopic端参数等参数的影响。为了避免修改此参数而引起级联的影响建议在修改此参数之前考虑分拆消息的可行性。
生产者
客户端开发
消息对象 ProducerRecord
这里有必要单独说明的是构建的消息对象 ProducerRecord它并不是单纯意义上的消息它包含了多个属性原本需要发送的与业务相关的消息体只是其中的一个 value 属性比如“HelloKafka”只是ProducerRecord对象中的一个属性。ProducerRecord类的定义如下只截取成员变量 其中topic和partition字段分别代表消息要发往的主题和分区号。headers字段是消息的头部Kafka 0.11.x版本才引入这个属性它大多用来设定一些与应用相关的信息如无需要也可以不用设置。key是用来指定消息的键它不仅是消息的附加信息还可以用来计算分区号进而可以让消息发往特定的分区。前面提及消息以主题为单位进行归类而这个key可以让消息再进行二次归类同一个key的消息会被划分到同一个分区中有key的消息还可以支持日志压缩的功能。value是指消息体一般不为空如果为空则表示特定的消息—墓碑消息。timestamp是指消息的时间戳它有CreateTime和LogAppendTime两种类型前者表示消息创建的时间后者表示消息追加到日志文件的时间。
必要的参数配置
在Kafka生产者客户端KafkaProducer中有3个参数是必填的
bootstrap.servers该参数用来指定生产者客户端连接Kafka集群所需的broker地址清单具体的内容格式为host1port1host2port2可以设置一个或多个地址中间以逗号隔开此参数的默认值为“”。注意这里并非需要所有的broker地址因为生产者会从给定的broker里查找到其他broker的信息。不过建议至少要设置两个以上的broker 地址信息当其中任意一个宕机时生产者仍然可以连接到 Kafka集群上。key.serializer 和 value.serializerbroker 端接收的消息必须以字节数组byte[]的形式存在。key.serializer和value.serializer这两个参数分别用来指定key和value序列化操作的序列化器这两个参数无默认值。注意这里必须填写序列化器的全限定名如org.apache.kafka.common.serialization.StringSerializer。
KafkaProducer中的参数众多我们可以直接使用客户端中的org.apache.kafka.clients.producer.ProducerConfig类来做一定程度上的预防措施每个参数在 ProducerConfig 类中都有对应的名称。 KafkaProducer是线程安全的可以在多个线程中共享单个KafkaProducer实例也可以将KafkaProducer实例进行池化来供其他线程调用。
发送消息
创建生产者实例和构建消息之后就可以开始发送消息了。发送消息主要有三种模式**发后即忘fire-and-forget、同步sync及异步async**。 KafkaProducer 的 send方法并非是 void 类型而是 FutureRecordMetadata类型send方法有2个重载方法具体定义如下 KafkaProducer中一般会发生两种类型的异常可重试的异常和不可重试的异常。对于可重试的异常如果配置了 retries 参数那么只要在规定的重试次数内自行恢复了就不会抛出异常。retries参数的默认值为0。
同步发送的方式可靠性高要么消息被发送成功要么发生异常。如果发生异常则可以捕获并进行相应的处理而不会像“发后即忘”的方式直接造成消息的丢失。不过同步发送的方式的性能会差很多需要阻塞等待一条消息发送完之后才能发送下一条。
我们再来了解一下异步发送的方式一般是在send方法里指定一个Callback的回调函数Kafka在返回响应时调用该函数来实现异步的发送确认。有读者或许会有疑问send方法的返回值类型就是Future而Future本身就可以用作异步的逻辑处理。这样做不是不行只不过Future里的 get方法在何时调用以及怎么调用都是需要面对的问题消息不停地发送那么诸多消息对应的Future对象的处理难免会引起代码处理逻辑的混乱。使用Callback的方式非常简洁明了Kafka有响应时就会回调要么发送成功要么抛出异常。
对于同一个分区而言如果消息record1于record2之前先发送参考上面的示例代码那么KafkaProducer就可以保证对应的callback1在callback2之前调用也就是说回调函数的调用也可以保证分区有序。
通常一个KafkaProducer不会只负责发送单条消息更多的是发送多条消息在发送完这些消息之后需要调用KafkaProducer的close方法来回收资源。close方法会阻塞等待之前所有的发送请求完成后再关闭KafkaProducer。与此同时KafkaProducer还提供了一个带超时时间的close方法。
序列化
生产者需要用序列化器Serializer把对象转换成字节数组才能通过网络发送给Kafka。而在对侧消费者需要用反序列化器Deserializer把从 Kafka 中收到的字节数组转换成相应的对象所以说如果生产者和消费者序列化方式不一样就无法正确的解析消息。
如果 Kafka 客户端提供的几种序列化器都无法满足应用需求则可以选择使用如 Avro、JSON、Thrift、ProtoBuf和Protostuff等通用的序列化工具来实现或者使用自定义类型的序列化器来实现。
分区器
消息在通过send方法发往broker的过程中有可能需要经过拦截器Interceptor、序列化器Serializer和分区器Partitioner的一系列作用之后才能被真正地发往 broker。消息经过序列化之后就需要确定它发往的分区如果消息ProducerRecord中指定了partition字段那么就不需要分区器的作用因为partition代表的就是所要发往的分区号。
如果消息ProducerRecord中没有指定partition字段那么就需要依赖分区器根据key这个字段来计算partition的值分区器的作用就是为消息分配分区。
Kafka中提供的默认分区器是org.apache.kafka.clients.producer.internals.DefaultPartitioner它实现了org.apache.kafka.clients.producer.Partitioner接口如图所示 其中partition方法用来计算分区号返回值为int类型。partition方法中的参数分别表示主题、键、序列化后的键、值、序列化后的值以及集群的元数据信息通过这些信息可以实现功能丰富的分区器。close方法在关闭分区器的时候用来回收一些资源。
在默认分区器 DefaultPartitioner 的实现中close是空方法而在 partition方法中定义了主要的分区分配逻辑。如果key 不为 null那么默认的分区器会对 key 进行哈希采用MurmurHash2算法具备高运算性能及低碰撞率最终根据得到的哈希值来计算分区号拥有相同key的消息会被写入同一个分区。如果key为null那么消息将会以轮询的方式发往主题内的各个可用分区。
在不改变主题分区数量的情况下key与分区之间的映射可以保持不变。不过一旦主题中增加了分区那么就难以保证key与分区之间的映射关系了。
了使用 Kafka 提供的默认分区器进行分区分配还可以使用自定义的分区器只需同DefaultPartitioner一样实现Partitioner接口即可。默认的分区器在key为null时不会选择非可用的分区我们可以通过自定义的分区器DemoPartitioner来打破这一限制。
生产者拦截器
生产者拦截器既可以用来在消息发送前做一些准备工作比如按照某个规则过滤不符合要求的消息、修改消息的内容等也可以用来在发送回调逻辑前做一些定制化的需求比如统计类工作。
生产者拦截器的使用也很方便主要是自定义实现org.apache.kafka.clients.producer.ProducerInterceptor接口。ProducerInterceptor接口中包含3个方法。KafkaProducer在将消息序列化和计算分区之前会调用生产者拦截器的onSend方法来对消息进行相应的定制化操作KafkaProducer 会在消息被应答Acknowledgement之前或消息发送失败时调用生产者拦截器的 onAcknowledgement方法优先于用户设定的 Callback 之前执行。
KafkaProducer中不仅可以指定一个拦截器还可以指定多个拦截器以形成拦截链。拦截链会按照 interceptor.classes 参数配置的拦截器的顺序来一一执行配置的时候各个拦截器之间使用逗号隔开。
在拦截链中如果某个拦截器执行失败那么下一个拦截器会接着从上一个执行成功的拦截器继续执行。拦截器失败一个不影响其他的执行
原理分析
整体架构
生产者客户端的整体架构图 整个生产者客户端由两个线程协调运行这两个线程分别为主线程和Sender线程发送线程。在主线程中由KafkaProducer创建消息然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器RecordAccumulator也称为消息收集器 中。Sender 线程负责从RecordAccumulator中获取消息并将其发送到Kafka中。
RecordAccumulator 主要用来缓存消息以便 Sender 线程可以批量发送进而减少网络传输的资源消耗以提升性能。RecordAccumulator 缓存的大小可以通过生产者客户端参数buffer.memory 配置默认值为 33554432B即 32MB。如果生产者发送消息的速度超过发送到服务器的速度则会导致生产者空间不足这个时候KafkaProducer的send方法调用要么被阻塞要么抛出异常这个取决于参数max.block.ms的配置此参数的默认值为60000即60秒。
主线程中发送过来的消息都会被追加到RecordAccumulator的某个双端队列Deque中在 RecordAccumulator 的内部为每个分区都维护了一个双端队列队列中的内容就是ProducerBatch即 DequeProducerBatch。消息写入缓存时追加到双端队列的尾部Sender读取消息时从双端队列的头部读取。注意ProducerBatch不是ProducerRecordProducerBatch中可以包含一至多个 ProducerRecord。通俗地说ProducerRecord 是生产者中创建的消息而ProducerBatch是指一个消息批次ProducerRecord会被包含在ProducerBatch中这样可以使字节的使用更加紧凑。与此同时将较小的ProducerRecord拼凑成一个较大的ProducerBatch也可以减少网络请求的次数以提升整体的吞吐量。ProducerBatch和消息的具体格式有关如果生产者客户端需要向很多分区发送消息则可以将buffer.memory参数适当调大以增加整体的吞吐量。
消息在网络上都是以字节Byte的形式传输的在发送之前需要创建一块内存区域来保存对应的消息。在Kafka生产者客户端中通过java.io.ByteBuffer实现消息内存的创建和释放。不过频繁的创建和释放是比较耗费资源的在RecordAccumulator的内部还有一个BufferPool它主要用来实现ByteBuffer的复用以实现缓存的高效利用。不过BufferPool只针对特定大小的ByteBuffer进行管理而其他大小的ByteBuffer不会缓存进BufferPool中这个特定的大小由batch.size参数来指定默认值为16384B即16KB。我们可以适当地调大batch.size参数以便多缓存一些消息。
Sender 从 RecordAccumulator 中获取缓存的消息之后会进一步将原本分区DequeProducerBatch的保存形式转变成NodeList ProducerBatch的形式其中Node表示Kafka集群的broker节点。对于网络连接来说生产者客户端是与具体的broker节点建立的连接也就是向具体的 broker 节点发送消息而并不关心消息属于哪一个分区而对于 KafkaProducer的应用逻辑而言我们只关注向哪个分区中发送哪些消息所以在这里需要做一个应用逻辑层面到网络I/O层面的转换。
在转换成NodeListProducerBatch的形式之后Sender 还会进一步封装成NodeRequest的形式这样就可以将Request请求发往各个Node了这里的Request是指Kafka的各种协议请求对于消息发送而言就是指具体的ProduceRequest。
请求在从Sender线程发往Kafka之前还会保存到InFlightRequests中InFlightRequests保存对象的具体形式为 MapNodeIdDequeRequest它的主要作用是缓存了已经发出去但还没有收到响应的请求NodeId 是一个 String 类型表示节点的 id 编号。与此同时InFlightRequests还提供了许多管理类的方法并且通过配置参数还可以限制每个连接也就是客户端与Node之间的连接最多缓存的请求数。这个配置参数为max.in.flight.requests.per.connection默认值为 5即每个连接最多只能缓存 5 个未响应的请求超过该数值之后就不能再向这个连接发送更多的请求了除非有缓存的请求收到了响应Response。通过比较DequeRequest的size与这个参数的大小来判断对应的Node中是否已经堆积了很多未响应的消息如果真是如此那么说明这个 Node 节点负载较大或网络连接有问题再继续向其发送请求会增大请求超时的可能。
元数据的更新
InFlightRequests还可以获得leastLoadedNode即所有Node中负载最小的那一个。这里的负载最小是通过每个Node在InFlightRequests中还未确认的请求决定的未确认的请求越多则认为负载越大。对于图中的InFlightRequests 来说图中展示了三个节点Node0、Node1和Node2很明显Node1的负载最小。也就是说Node1为当前的leastLoadedNode。选择leastLoadedNode发送请求可以使它能够尽快发出避免因网络拥塞等异常而影响整体的进度。leastLoadedNode的概念可以用于多个应用场合比如元数据请求、消费者组播协议的交互。 KafkaProducer要将此消息追加到指定主题的某个分区所对应的leader副本之前首先需要知道主题的分区数量然后经过计算得出或者直接指定目标分区之后KafkaProducer需要知道目标分区的leader副本所在的broker 节点的地址、端口等信息才能建立连接最终才能将消息发送到 Kafka在这一过程中所需要的信息都属于元数据信息。
我们了解了bootstrap.servers参数只需要配置部分broker节点的地址即可不需要配置所有broker节点的地址因为客户端可以自己发现其他broker节点的地址这一过程也属于元数据相关的更新操作。与此同时分区数量及leader副本的分布都会动态地变化客户端也需要动态地捕捉这些变化。
元数据是指Kafka集群的元数据这些元数据具体记录了集群中有哪些主题这些主题有哪些分区每个分区的leader副本分配在哪个节点上follower副本分配在哪些节点上哪些副本在AR、ISR等集合中集群中有哪些节点控制器节点又是哪一个等信息。
当客户端中没有需要使用的元数据信息时比如没有指定的主题信息或者超过metadata.max.age.ms 时间没有更新元数据都会引起元数据的更新操作。客户端参数metadata.max.age.ms的默认值为300000即5分钟。元数据的更新操作是在客户端内部进行的对客户端的外部使用者不可见。当需要更新元数据时会先挑选出leastLoadedNode然后向这个Node发送MetadataRequest请求来获取具体的元数据信息。这个更新操作是由Sender线程发起的在创建完MetadataRequest之后同样会存入InFlightRequests之后的步骤就和发送消息时的类似。元数据虽然由Sender线程负责更新但是主线程也需要读取这些信息这里的数据同步通过synchronized和final关键字来保障。
重要的生产者参数
下面挑选一些重要的参数进行讲解:
acks
这个参数用来指定分区中必须要有多少个副本收到这条消息之后生产者才会认为这条消息是成功写入的。acks 是生产者客户端中一个非常重要的参数它涉及消息的可靠性和吞吐量之间的权衡。acks参数有3种类型的值都是字符串类型。
acks1。默认值即为1。生产者发送消息之后只要分区的leader副本成功写入消息那么它就会收到来自服务端的成功响应。如果消息无法写入leader副本比如在leader 副本崩溃、重新选举新的 leader 副本的过程中那么生产者就会收到一个错误的响应为了避免消息丢失生产者可以选择重发消息。如果消息写入leader副本并返回成功响应给生产者且在被其他follower副本拉取之前leader副本崩溃那么此时消息还是会丢失因为新选举的leader副本中并没有这条对应的消息。acks设置为1是消息可靠性和吞吐量之间的折中方案。acks0。生产者发送消息之后不需要等待任何服务端的响应。如果在消息从发送到写入Kafka的过程中出现某些异常导致Kafka并没有收到这条消息那么生产者也无从得知消息也就丢失了。在其他配置环境相同的情况下acks 设置为 0 可以达到最大的吞吐量。acks-1或acksall。生产者在消息发送之后需要等待ISR中的所有副本都成功写入消息之后才能够收到来自服务端的成功响应。在其他配置环境相同的情况下acks 设置为-1all可以达到最强的可靠性。但这并不意味着消息就一定可靠因为ISR中可能只有leader副本这样就退化成了acks1的情况。要获得更高的消息可靠性需要配合 min.insync.replicas 等参数的联动。
max.request.size
这个参数用来限制生产者客户端能发送的消息的最大值默认值为1048576B即 1MB。一般情况下这个默认值就可以满足大多数的应用场景了。并不建议盲目地增大这个参数的配置值尤其是在对Kafka整体脉络没有足够把控的时候。因为这个参数还涉及一些其他参数的联动比如broker端的message.max.bytes参数如果配置错误可能会引起一些不必要的异常。比如将broker端的message.max.bytes参数配置为10而max.request.size参数配置为20那么当我们发送一条大小为15B的消息时生产者客户端就会报出异常。
retries和retry.backoff.ms
retries参数用来配置生产者重试的次数默认值为0即在发生异常的时候不进行任何重试动作。消息在从生产者发出到成功写入服务器之前可能发生一些临时性的异常比如网络抖动、leader副本的选举等这种异常往往是可以自行恢复的生产者可以通过配置retries大于0的值以此通过内部重试来恢复而不是一味地将异常抛给生产者的应用程序。如果重试达到设定的次数那么生产者就会放弃重试并返回异常。不过并不是所有的异常都是可以通过重试来解决的比如消息太大超过max.request.size参数配置的值时这种方式就不可行了。
重试还和另一个参数retry.backoff.ms有关这个参数的默认值为100它用来设定两次重试之间的时间间隔避免无效的频繁重试。在配置 retries 和 retry.backoff.ms之前最好先估算一下可能的异常恢复时间这样可以设定总的重试时间大于这个异常恢复时间以此来避免生产者过早地放弃重试。
如果将acks参数配置为非零值并且max.in.flight.requests.per.connection参数配置为大于1的值那么就会出现错序的现象如果第一批次消息写入失败而第二批次消息写入成功那么生产者会重试发送第一批次的消息此时如果第一批次的消息写入成功那么这两个批次的消息就出现了错序。一般而言在需要保证消息顺序的场合建议把参数max.in.flight.requests.per.connection配置为1而不是把acks配置为0不过这样也会影响整体的吞吐。
compression.type
这个参数用来指定消息的压缩方式默认值为“none”即默认情况下消息不会被压缩。该参数还可以配置为“gzip”“snappy”和“lz4”。对消息进行压缩可以极大地减少网络传输量、降低网络I/O从而提高整体的性能。
connections.max.idle.ms
这个参数用来指定在多久之后关闭限制的连接默认值是540000ms即9分钟。
linger.ms
这个参数用来指定生产者发送 ProducerBatch 之前等待更多消息ProducerRecord加入ProducerBatch 的时间默认值为 0。生产者客户端会在 ProducerBatch 被填满或等待时间超过linger.ms 值时发送出去。增大这个参数的值会增加消息的延迟但是同时能提升一定的吞吐量。这个linger.ms参数与TCP协议中的Nagle算法有异曲同工之妙。
receive.buffer.bytes
这个参数用来设置Socket接收消息缓冲区SO_RECBUF的大小默认值为32768B即32KB。如果设置为-1则使用操作系统的默认值。如果Producer与Kafka处于不同的机房则可以适地调大这个参数值。
send.buffer.bytes
这个参数用来设置Socket发送消息缓冲区SO_SNDBUF的大小默认值为131072B即128KB。与receive.buffer.bytes参数一样如果设置为-1则使用操作系统的默认值。
request.timeout.ms
这个参数用来配置Producer等待请求响应的最长时间默认值为30000ms。请求超时之后可以选择进行重试。注意这个参数需要比broker端参数replica.lag.time.max.ms的值要大这样可以减少因客户端重试而引起的消息重复的概率。
消费者
消费者与消费组
消费者Consumer负责订阅Kafka中的主题Topic并且从订阅的主题上拉取消息。与其他一些消息中间件不同的是在Kafka的消费理念中还有一层消费组Consumer Group的概念每个消费者都有一个对应的消费组。当消息发布到主题后只会被投递给订阅它的每个消费组中的一个消费者。每个消费者只能消费所分配到的分区中的消息。换言之每一个分区只能被一个消费组中的一个消费者所消费。
消费者与消费组这种模型可以让整体的消费能力具备横向伸缩性我们可以增加或减少消费者的个数来提高或降低整体的消费能力。对于分区数固定的情况一味地增加消费者并不会让消费能力一直得到提升如果消费者过多出现了消费者的个数大于分区个数的情况就会有消费者分配不到任何分区。感觉这种设计纯粹是为了保证分区消息的有序性。以上分配逻辑都是基于默认的分区分配策略进行分析的可以通过消费者客户端参数partition.assignment.strategy 来设置消费者与订阅主题之间的分区分配策略。
消费组是一个逻辑上的概念它将旗下的消费者归为一类每一个消费者只隶属于一个消费组。每一个消费组都会有一个固定的名称消费者在进行消费前需要指定其所属消费组的名称这个可以通过消费者客户端参数group.id来配置默认值为空字符串。消费者并非逻辑上的概念它是实际的应用实例它可以是一个线程也可以是一个进程。同一个消费组内的消费者既可以部署在同一台机器上也可以部署在不同的机器上。
客户端开发
必要的参数配置
在Kafka消费者客户端KafkaConsumer中有4个参数是必填的
bootstrap.servers该参数的释义和生产者客户端KafkaProducer 中的相同用来 指 定 连 接 Kafka 集 群 所 需 的broker 地 址 清 单具 体 内 容 形 式 为host1port1host2post可以设置一个或多个地址中间用逗号隔开此参数的默认值为“”。注意这里并非需要设置集群中全部的broker地址消费者会从现有的配置中查找到全部的Kafka集群成员。这里设置两个以上的broker地址信息当其中任意一个宕机时消费者仍然可以连接到Kafka集群上group.id消费者隶属的消费组的名称默认值为“”。如果设置为空则会报出异常Exception in thread mainorg.apache.kafka.common.errors.InvalidGroupIdExceptionThe configured groupId is invalid。一般而言这个参数需要设置成具有一定的业务意义的名称。key.deserializer 和 value.deserializer与生产者客户端KafkaProducer中的key.serializer和value.serializer参数对应。消费者从broker端获取的消息格式都是字节数组byte[]类型所以需要执行相应的反序列化操作才能还原成原有的对象格式。这两个参数分别用来指定消息中key和value所需反序列化操作的反序列化器这两个参数无默认值。注意这里必须填写反序列化器类的全限定名比如示例中的org.apache.kafka.common.serialization.StringDeserializer单单指定StringDeserializer是错误的
订阅主题与分区
在创建好消费者之后我们就需要为该消费者订阅相关的主题了。一个消费者可以订阅一个或多个主题我们可以使用subscribe方法订阅了一个主题对于这个方法而言既可以以集合的形式订阅多个主题也可以以正则表达式的形式订阅特定模式的主题。
对于消费者使用集合的方式subscribeCollection来订阅主题而言比较容易理解订阅了什么主题就消费什么主题中的消息。如果前后两次订阅了不同的主题那么消费者以最后一次的为准。
如果消费者采用的是正则表达式的方式subscribePattern订阅在之后的过程中如果有人又创建了新的主题并且主题的名字与正则表达式相匹配那么这个消费者就可以消费到新添加的主题中的消息。如果应用程序需要消费多个主题并且可以处理不同的类型那么这种订阅方式就很有效。在Kafka 和其他系统之间进行数据复制时这种正则表达式的方式就显得很常见。可以进行正则名称匹配
消费者不仅可以通过KafkaConsumer.subscribe方法订阅主题还可以直接订阅某些主题的特定分区在KafkaConsumer中还提供了一个assign方法来实现这些功能。这个方法只接受一个参数partitions用来指定需要订阅的分区集合。这里补充说明一下TopicPartition类在Kafka的客户端中它用来表示分区这个类的部分内容如下所示 TopicPartition类只有2个属性topic和partition分别代表分区所属的主题和自身的分区编号这个类可以和我们通常所说的主题—分区的概念映射起来。KafkaConsumer 中的partitionsFor方法可以用来查询指定主题的元数据信息。其中PartitionInfo类型即为主题的分区元数据信息此类的主要结构如下 PartitionInfo类中的属性topic表示主题名称partition代表分区编号leader代表分区的leader副本所在的位置replicas代表分区的AR集合inSyncReplicas代表分区的ISR集合offlineReplicas代表分区的OSR集合。
既然有订阅那么就有取消订阅可以使用 KafkaConsumer 中的unsubscribe方法来取消主题的订阅。这个方法既可以取消通过 subscribeCollection方式实现的订阅也可以取消通过subscribePattern方式实现的订阅还可以取消通过 assignCollection方式实现的订阅。注意如果将subscribeCollection或assignCollection中的集合参数设置为空集合那么作用等同于unsubscribe方法因为新的订阅会覆盖旧的。
通过 subscribe方法订阅主题具有消费者自动再均衡的功能在多个消费者的情况下可以根据分区分配策略来自动分配各个消费者与分区的关系。当消费组内的消费者增加或减少时分区分配关系会自动调整以实现消费负载均衡及故障自动转移。而通过assign方法订阅分区时是不具备消费者自动均衡的功能的其实这一点从assign方法的参数中就可以看出端倪两种类型的subscribe都有ConsumerRebalanceListener类型参数的方法而assign方法却没有。
消息消费
Kafka中的消费是基于拉模式的。消息的消费一般有两种模式推模式和拉模式。推模式是服务端主动将消息推送给消费者而拉模式是消费者主动向服务端发起请求来拉取消息。
Kafka中的消息消费是一个不断轮询的过程消费者所要做的就是重复地调用poll方法而poll方法返回的是所订阅的主题分区上的一组消息。对于poll方法而言如果某些分区中没有可供消费的消息那么此分区对应的消息拉取的结果就为空如果订阅的所有分区中都没有可供消费的消息那么poll方法返回为空的消息集合。
注意到poll方法里还有一个超时时间参数timeout用来控制poll方法的阻塞时间在消费者的缓冲区里没有可用数据时会发生阻塞。注意这里 timeout 的类型是 Duration它是JDK8中新增的一个与时间有关的类型。timeout的设置取决于应用程序对响应速度的要求比如需要在多长时间内将控制权移交给执行轮询的应用线程。可以直接将timeout设置为0这样poll方法会立刻返回而不管是否已经拉取到了消息。如果应用线程唯一的工作就是从Kafka中拉取并消费消息则可以将这个参数设置为最大值Long.MAX_VALUE。
消费者消费到的每条消息的类型为ConsumerRecord注意与ConsumerRecords的区别这个和生产者发送的消息类型ProducerRecord相对应不过ConsumerRecord中的内容更加丰富具体的结构参考如下代码 opic 和 partition 这两个字段分别代表消息所属主题的名称和所在分区的编号。offset 表示消息在所属分区的偏移量。timestamp 表示时间戳与此对应的timestampType 表示时间戳的类型。timestampType 有两种类型CreateTime 和LogAppendTime分别代表消息创建的时间戳和消息追加到日志的时间戳。headers表示消息的头部内容。key 和 value 分别表示消息的键和消息的值一般业务应用要读取的就是valuechecksum是CRC32的校验值。
poll方法的返回值类型是 ConsumerRecords它用来表示一次拉取操作所获得的消息集内部包含了若干ConsumerRecord它提供了一个iterator方法来循环遍历消息集内部的消息。
ConsumerRecords类提供了一个recordsTopicPartition方法来获取消息集中指定分区的消息。ConsumerRecords 类中并没提供与 partitions类似的 topics方法来查看拉取的消息集中所包含的主题列表如果要按照主题维度来进行消费那么只能根据消费者订阅主题时的列表来进行逻辑处理了。
到目前为止可以简单地认为poll方法只是拉取一下消息而已但就其内部逻辑而言并不简单它涉及消费位移、消费者协调器、组协调器、消费者的选举、分区分配的分发、再均衡的逻辑、心跳等内容在后面的章节中会循序渐进地介绍这些内容。
位移提交
对于Kafka中的分区而言它的每条消息都有唯一的offset用来表示消息在分区中对应的位置。对于消费者而言它也有一个offset的概念消费者使用offset来表示消费到分区中某个消息所在的位置。单词“offset”可以翻译为“偏移量”也可以翻译为“位移”读者可能并没有过多地在意这一点在很多中文资料中都会交叉使用“偏移量”和“位移”这两个词并没有很严谨地进行区分。笔者对offset做了一些区分对于消息在分区中的位置我们将offset称为“偏移量”对于消费者消费到的位置将offset 称为“位移”有时候也会更明确地称之为“消费位移”。
在每次调用poll方法时它返回的是还没有被消费过的消息集当然这个前提是消息已经存储在Kafka 中了并且暂不考虑异常情况的发生要做到这一点就需要记录上一次消费时的消费位移。并且这个消费位移必须做持久化保存而不是单单保存在内存中否则消费者重启之后就无法知晓之前的消费位移。再考虑一种情况当有新的消费者加入时那么必然会有再均衡的动作对于同一分区而言它可能在再均衡动作之后分配给新的消费者如果不持久化保存消费位移那么这个新的消费者也无法知晓之前的消费位移。
在新消费者客户端中消费位移存储在Kafka内部的主题__consumer_offsets中。这里把将消费位移存储起来持久化的动作称为“提交”消费者在消费完消息之后需要执行消费位移的提交。不过需要非常明确的是当前消费者需要提交的消费位移并不是x而是 x1。读者可能看过一些相关资料里面所讲述的内容可能是提交的消费位移就是当前所消费到的消费位移即提交的是x这明显是错误的。类似的错误还体现在对LEOLog End Offset的解读上。
KafkaConsumer 类提供了 positionTopicPartition和committedTopicPartition两个方法来分别获取上面所说的position和committed offset的值。 文章转载自: http://www.morning.cxtbh.cn.gov.cn.cxtbh.cn http://www.morning.wfpmt.cn.gov.cn.wfpmt.cn http://www.morning.btmwd.cn.gov.cn.btmwd.cn http://www.morning.qrzwj.cn.gov.cn.qrzwj.cn http://www.morning.kgltb.cn.gov.cn.kgltb.cn http://www.morning.rfpq.cn.gov.cn.rfpq.cn http://www.morning.ykkrg.cn.gov.cn.ykkrg.cn http://www.morning.wqkzf.cn.gov.cn.wqkzf.cn http://www.morning.mxhys.cn.gov.cn.mxhys.cn http://www.morning.fbhmn.cn.gov.cn.fbhmn.cn http://www.morning.mngh.cn.gov.cn.mngh.cn http://www.morning.qsfys.cn.gov.cn.qsfys.cn http://www.morning.fndmk.cn.gov.cn.fndmk.cn http://www.morning.mlnbd.cn.gov.cn.mlnbd.cn http://www.morning.nfmtl.cn.gov.cn.nfmtl.cn http://www.morning.cjmmt.cn.gov.cn.cjmmt.cn http://www.morning.c7513.cn.gov.cn.c7513.cn http://www.morning.rbhcx.cn.gov.cn.rbhcx.cn http://www.morning.wxwall.com.gov.cn.wxwall.com http://www.morning.rfbt.cn.gov.cn.rfbt.cn http://www.morning.jtdrz.cn.gov.cn.jtdrz.cn http://www.morning.mplld.cn.gov.cn.mplld.cn http://www.morning.syqtt.cn.gov.cn.syqtt.cn http://www.morning.nwbnt.cn.gov.cn.nwbnt.cn http://www.morning.jwgmx.cn.gov.cn.jwgmx.cn http://www.morning.tpbhf.cn.gov.cn.tpbhf.cn http://www.morning.thbqp.cn.gov.cn.thbqp.cn http://www.morning.msgrq.cn.gov.cn.msgrq.cn http://www.morning.jyfrz.cn.gov.cn.jyfrz.cn http://www.morning.shyqcgw.cn.gov.cn.shyqcgw.cn http://www.morning.lpzqd.cn.gov.cn.lpzqd.cn http://www.morning.dfndz.cn.gov.cn.dfndz.cn http://www.morning.yixingshengya.com.gov.cn.yixingshengya.com http://www.morning.jjsxh.cn.gov.cn.jjsxh.cn http://www.morning.snnb.cn.gov.cn.snnb.cn http://www.morning.nbdtdjk.cn.gov.cn.nbdtdjk.cn http://www.morning.dkgtr.cn.gov.cn.dkgtr.cn http://www.morning.shprz.cn.gov.cn.shprz.cn http://www.morning.qcdtzk.cn.gov.cn.qcdtzk.cn http://www.morning.dmcxh.cn.gov.cn.dmcxh.cn http://www.morning.lfttb.cn.gov.cn.lfttb.cn http://www.morning.fhrt.cn.gov.cn.fhrt.cn http://www.morning.kgtyj.cn.gov.cn.kgtyj.cn http://www.morning.kjlia.com.gov.cn.kjlia.com http://www.morning.dwmtk.cn.gov.cn.dwmtk.cn http://www.morning.bkqdg.cn.gov.cn.bkqdg.cn http://www.morning.jmllh.cn.gov.cn.jmllh.cn http://www.morning.kqylg.cn.gov.cn.kqylg.cn http://www.morning.lxyyp.cn.gov.cn.lxyyp.cn http://www.morning.dybth.cn.gov.cn.dybth.cn http://www.morning.wttzp.cn.gov.cn.wttzp.cn http://www.morning.fpryg.cn.gov.cn.fpryg.cn http://www.morning.zphlb.cn.gov.cn.zphlb.cn http://www.morning.ndmh.cn.gov.cn.ndmh.cn http://www.morning.jkszt.cn.gov.cn.jkszt.cn http://www.morning.mztyh.cn.gov.cn.mztyh.cn http://www.morning.jrkzk.cn.gov.cn.jrkzk.cn http://www.morning.nzhzt.cn.gov.cn.nzhzt.cn http://www.morning.rjcqb.cn.gov.cn.rjcqb.cn http://www.morning.qblcm.cn.gov.cn.qblcm.cn http://www.morning.mfbcs.cn.gov.cn.mfbcs.cn http://www.morning.msgcj.cn.gov.cn.msgcj.cn http://www.morning.mkkcr.cn.gov.cn.mkkcr.cn http://www.morning.dbfp.cn.gov.cn.dbfp.cn http://www.morning.srbfz.cn.gov.cn.srbfz.cn http://www.morning.hmmtx.cn.gov.cn.hmmtx.cn http://www.morning.rgrdd.cn.gov.cn.rgrdd.cn http://www.morning.qyhcg.cn.gov.cn.qyhcg.cn http://www.morning.xcdph.cn.gov.cn.xcdph.cn http://www.morning.rkgyx.cn.gov.cn.rkgyx.cn http://www.morning.pdynk.cn.gov.cn.pdynk.cn http://www.morning.pnmtk.cn.gov.cn.pnmtk.cn http://www.morning.kzcz.cn.gov.cn.kzcz.cn http://www.morning.nggry.cn.gov.cn.nggry.cn http://www.morning.stsnf.cn.gov.cn.stsnf.cn http://www.morning.rnwt.cn.gov.cn.rnwt.cn http://www.morning.dkgtr.cn.gov.cn.dkgtr.cn http://www.morning.lwlnw.cn.gov.cn.lwlnw.cn http://www.morning.yppln.cn.gov.cn.yppln.cn http://www.morning.ybmp.cn.gov.cn.ybmp.cn