北京网站建设最大的公司排名,兴宁市住房和城乡建设部网站,知名的家居行业网站开发,做雕塑网站找哪家好一、kafka架构 Kafka基础知识 
Kafka是最初由Linkedin公司开发#xff0c;是一个分布式、分区的、多副本的、多生产者、多订阅者#xff0c;基于zookeeper协 调的分布式日志系统(也可以当做MQ系统)#xff0c;常见可以用于webynginx日志、访问日志#xff0c;消息服务等等是一个分布式、分区的、多副本的、多生产者、多订阅者基于zookeeper协 调的分布式日志系统(也可以当做MQ系统)常见可以用于webynginx日志、访问日志消息服务等等Linkedin于 2010年贡献给了Apache基金会并成为顶级开源项目。主要应用场景是:日志收集系统和消息系统。 Kafka主要设计目标如下: 以时间复杂度为O(1)的方式提供消息持久化能力即使对TB级以上数据也能保证常数时间的访问性能。 高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条消息的传输。支持KafkaServer间的消息分区及分布式消费同时保证每个partition内的消息顺序传输。同时支持离线数据处理和实时数据处理。支持在线水平扩展 kafka是一种发布-订阅模式, 对于消息中间件消息分推拉两种模式。Kafka只有消息的拉取没有推送可以通过轮询实现消息的推送。1.Kafka在一个或多个可以跨越多个数据中心的服务器上作为集群运行。 2.Kafka集群中按照主题分类管理一个主题可以有多个分区一个分区可以有多个副本分区。 3.每个记录由一个键一个值和一个时间戳组成。 Kafka具有四个核心API: 1.ProducerAPI:允许应用程序将记录流发布到一个或多个Kafka主题。2.ConsumerAPI:允许应用程序订阅一个或多个主题并处理为其生成的记录流。3.StreamsAPI:允许应用程序充当流处理器使用一个或多个主题的输入流并生成一个或多个输出主题的 输出流从而有效地将输入流转换为输出流。4.ConnectorAPI:允许构建和运行将Kafka主题连接到现有应用程序或数据系统的可重用生产者或使用者。例如关系数据库的连接器可能会捕获对表的所有更改。 Kafka优势 1.高吞吐量:单机每秒处理几十上百万的消息量。即使存储了许多TB的消息它也保持稳定的性能。2.高性能:单节点支持上千个客户端并保证零停机和零数据丢失。3.持久化数据存储:将消息持久化到磁盘。通过将数据持久化到硬盘以及replication防止数据丢失。 4.分布式系统无需停机就可扩展机器。 5.可靠性-kafka是分布式分区复制和容错的。 6.客户端状态维护消息被处理的状态是在Consumer端维护而不是由server端维护。当失败时能自动平衡。 7.支持online和offline的场景。 8.支持多种客户端语言。Kafka支持Java、.NET、PHP、Python等多种语言。 Kafka应用场景 日志收集一个公司可以用Kafka可以收集各种服务的Log通过Kafka以统一接口服务的方式开放给各种Consumer。 消息系统解耦生产者和消费者、缓存消息等。 用户活动跟踪用来记录web用户或者APP用户的各种活动如网页搜索、搜索、点击用户数据收集然后进行用户行为分析。 运营指标Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据生产各种操作的集中反馈比 如报警和报告 流式处理比如Spark Streaming和Storm。 Kafka基本架构 
消息和批次 Kafka的数据单元称为消息。可以把消息看成是数据库里的一个“数据行”或一条“记录”消息由字节数组组成。批次就是一组消息这些消息属于同一个主题和分区。 模式 消息模式schema有许多可用的选项以便于理解。如JSON和XML但是它们缺乏强类型处理能力。Kafka的 许多开发者喜欢使用Apache Avro。Avro提供了一种紧凑的序列化格式模式和消息体分开。当模式发生变化时不需要重新生成代码它还支持强类型和模式进化其版本既向前兼容也向后兼容。 主题和分区 Kafka的消息通过主题进行分类。主题可比是数据库的表或者文件系统里的文件夹。主题可以被分为若干分区一 个主题通过分区分布于Kafka集群中提供了横向扩展的能力。 生产者和消费者 生产者创建消息。消费者消费消息。消息被发布到一个特定的主题上。、 Borker和集群 一个独立的Kafka服务器称为broker。broker接收来自生产者的消息为消息设置偏移量并提交消息到磁盘保 存。broker为消费者提供服务对读取分区的请求做出响应返回已经提交到磁盘上的消息。单个broker可以轻松处理数千个分区以及每秒百万级的消息量。每个集群都有一个broker是集群控制器。 Kafka核心概念 
Producer 生产者创建消息。 Consumer 消费者读取消息 Broker 一个独立的Kafka 服务器被称为broker是集群的组成部分。 Topic 每条发布到Kafka集群的消息都有一个类别这个类别被称为Topic。 Partition 1. 主题可以被分为若干个分区一个分区就是一个提交日志。 2. 消息以追加的方式写入分区然后以先入先出的顺序读取。 3. 无法在整个主题范围内保证消息的顺序但可以保证消息在单个分区内的顺序。 4. Kafka 通过分区来实现数据冗余和伸缩性。 5. 在需要严格保证消息的消费顺序的场景下需要将partition数目设为1。 Replicas kafka 使用主题来组织数据每个主题被分为若干个分区每个分区有多个副本。 Offset 生产者Offset消息写入的时候每一个分区都有一个offset这个offset就是生产者的offset同时也是这个分区的最新最大的offset。 消费者Offset某个分区的offset情况生产者写入的offset是最新最大的值是12而当Consumer A进行消费时从0开始消费一直消费到了9消费者的offset就记录在9Consumer B就纪录在了11。 副本 Kafka通过副本保证高可用。副本分为首领副本(Leader)和跟随者副本(Follower)。 AR 分区中的所有副本统称为ARAssigned RepllicasARISROSR。 ISR 所有与leader副本保持一定程度同步的副本包括Leader组成ISRIn-Sync ReplicasISR集合是AR集合中 的一个子集。 OSR 与leader副本同步滞后过多的副本不包括leader副本组成OSR(Out-Sync Relipcas)。 Kafka的安装和配置 
第一步jdk安装上传jdk-8u261-linux-x64.rpm到服务器并安装。 rpm -ivh jdk-8u261-linux-x64.rpm 第二步配置java环境变量 vim /etc/profile # 生效 source /etc/profile # 验证 java -version 第三步上传zookeeper安装包并解压。 tar -zxf zookeeper-3.4.14.tar.gz cd /zookeeper-3.4.14/conf # 复制zoo_sample.cfg命名为zoo.cfg cp zoo_sample.cfg zoo.cfg # 编辑zoo.cfg文件 vim zoo.cfg dataDir/usr/local/zookeeper/zookeeper-3.4.14/data 第四步配置zookeeper环境变量 vim /etc/profile 启动命令zkServer.sh start 查看状态命令zkServer.sh status 第五步上传kafka_2.12-1.0.2.tgz到服务器并解压 tar -zxf kafka_2.12-1.0.2.tgz 第六步配置kafka环境变量 vim /etc/profile 第七步修改kafka配置文件连接zookeeper # 进入配置文件夹修改server.properties文件 cd config/ vim server.properties 第七步启动kafka kafka-server-start.sh -daemon ../config/server.properties 消费和主题 # 列出现有的主题 [rootnode1 ~]# kafka-topics.sh --list --zookeeper localhost:2181/myKafka # 创建主题该主题包含一个分区该分区为Leader分区它没有Follower分区副本。 [rootnode1 ~]# kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic topic_1 --partitions 1 --replication-factor 1 # 查看分区信息 [rootnode1 ~]# kafka-topics.sh --zookeeper localhost:2181/myKafka --list # 查看指定主题的详细信息 [rootnode1 ~]# kafka-topics.sh --zookeeper localhost:2181/myKafka --describe --topic topic_1 # 删除指定主题 [rootnode1 ~]# kafka-topics.sh --zookeeper localhost:2181/myKafka --delete --topic topic_1 # 开启生产者 [rootnode1 ~]# kafka-console-producer.sh --topic topic_1 --broker-list localhost:9020 # 开启消费者 [rootnode1 ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_1 # 开启消费者方式二从头消费不按照偏移量消费 [rootnode1 ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_1 --from-beginning Kafka消息接收和发送 
生产者主要的对象有KafkaProducer ProducerRecord 。 其中KafkaProducer 是用于发送消息的类ProducerRecord 类用于封装Kafka的消息。 KafkaProducer 的创建需要指定的参数和含义 二、Kafka高级特性 
生产者 
消息发送 流程图 1. Producer创建时会创建一个Sender线程并设置为守护线程。 2. 生产消息时内部其实是异步流程生产的消息先经过拦截器-序列化器-分区器然后将消息缓存在缓冲 区该缓冲区也是在Producer创建时创建。 3. 批次发送的条件为缓冲区数据大小达到batch.size或者linger.ms达到上限哪个先达到就算哪个。 4. 批次发送后发往指定分区然后落盘到broker如果生产者配置了retrires参数大于0并且失败原因允许重 试那么客户端内部会对该消息进行重试。 5. 落盘到broker成功返回生产元数据给生产者。 6. 元数据返回有两种方式一种是通过阻塞直接返回另一种是通过回调返回。 配置参数 序列化器 Kafka中的数据都是字节数组将消息发送到Kafka之前需要先将数据序列化为字节数组序列化器的作用就是用于序列化要发送的消息。 分区器 默认分区计算 1. 如果record提供了分区号则使用record提供的分区号 2. 如果record没有提供分区号则使用key的序列化后的值的hash值对分区数量取模 3. 如果record没有提供分区号也没有提供key则使用轮询的方式分配分区号。 拦截器 Producer拦截器interceptor和Consumer端Interceptor是在Kafka 0.10版本被引入的主要用于实现Client 端的定制化控制逻辑。 Intercetpor的实现接口是org.apache.kafka.clients.producer.ProducerInterceptor其定义的方法包括 onSend(ProducerRecord)该方法封装进KafkaProducer.send方法中即运行在用户主线程中。Producer 确保在消息被序列化以计算分区前调用该方法。用户可以在该方法中对消息做任何操作但最好保证不要修 改消息所属的topic和分区否则会影响目标分区的计算。 onAcknowledgement(RecordMetadata, Exception)该方法会在消息被应答之前或消息发送失败时调用 并且通常都是在Producer回调逻辑触发之前。onAcknowledgement运行在Producer的IO线程中因此不 要在该方法中放入很重的逻辑否则会拖慢Producer的消息发送效率。 close关闭Interceptor主要用于执行一些资源清理工作。 原理 主线程负责消息创建拦截器序列化器分区器等操作并将消息追加到消息收集器。 Sender线程 该线程从消息收集器获取缓存的消息将其处理为 Node, ListProducerBatch 的形式表示集群的broker节点。 进一步将Node, ListProducerBatch转化为Node, Request形式此时才可以向服务端发送数据。在发送之前Sender线程将消息以 MapNodeId, DequeRequest 的形式保存到InFlightRequests 中进行缓存可以通过其获取 leastLoadedNode ,即当前Node中负载压力最小的一个以实现消息的尽快发出。 消费者 
消费组 消费者从订阅的主题消费消息消费消息的偏移量保存在Kafka的名字是 __consumer_offsets 的主题中消费者还可以将自己的偏移量存储到Zookeeper需要设置offset.storagezookeeper。推荐使用Kafka存储消费者的偏移量。因为Zookeeper不适合高并发。多个从同一个主题消费的消费者可以加入到一个消费组中。消费组中的消费者共享group_id。 消费者四种情况 1. 消费组均衡地给消费者分配分区每个分区只由消费组中一个消费者消费。 2. 消费组均衡地给消费者分配分区每个分区只由消费组中一个消费者消费。 3. 如果在消费组中添加一个消费者2则每个消费者分别从两个分区接收消息。 4. 如果消费组有四个消费者则每个消费者可以分配到一个分区。 5. 如果向消费组中添加更多的消费者超过主题分区数量则有一部分消费者就会闲置不会接收任何消息。 心跳机制 消费者宕机退出消费组触发再平衡重新给消费组中的消费者分配分区。 Kafka 的心跳是 Kafka Consumer 和 Broker 之间的健康检查只有当 Broker Coordinator 正常时Consumer 才会发送心跳。broker 处理心跳的逻辑在 GroupCoordinator 类中如果心跳超期 broker coordinator 会把消费者从 group中移除并触发 rebalance。 订阅 TopicKafka用于分类管理消息的逻辑单元类似与MySQL的数据库。 Partition是Kafka下数据存储的基本单元这个是物理上的概念。同一个topic的数据会被分散的存储到多个partition中这些partition可以在同一台机器上也可以是在多台机器上。优势在于有利于水平扩展避免单台机器在磁盘空间和性能上的限制同时可以通过复制来增加数据冗余性提高容灾能力。为了做到均匀分布通常partition的数量通常是Broker Server数量的整数倍。 Consumer Group同样是逻辑上的概念是Kafka实现单播和广播两种消息模型的手段。保证一个消费组获取到特定主题的全部的消息。在消费组内部若干个消费者消费主题分区的消息消费组可以保证一个主题的每个分区只被消费组中的一个消费者消费。 consumer 采用 pull 模式从 broker 中读取数据。 采用 pull 模式consumer 可自主控制消费消息的速率可以自己控制消费方式批量消费/逐条消费)还可以选择不同的提交方式从而实现不同的传输语义。 反序列化 Kafka的broker中所有的消息都是字节数组消费者获取到消息之后需要先对消息进行反序列化处理然后才能 交给用户程序消费处理。消费者的反序列化器包括key的和value的反序列化器。 key.deserializer value.deserializer IntegerDeserializer StringDeserializer 需要实现 org.apache.kafka.common.serialization.DeserializerT 接口。 位移提交 1. Consumer需要向Kafka记录自己的位移数据这个汇报过程称为提交位移(Committing Offsets) 2. Consumer 需要为分配给它的每个分区提交各自的位移数据 3.位移提交的由Consumer端负责的Kafka只负责保管。 4.位移提交分为自动提交和手动提交 5.位移提交分为同步提交和异步提交 消费位移管理 Kafka中消费者根据消息的位移顺序消费消息。消费者的位移由消费者管理可以存储于zookeeper中也可以存储于Kafka主题__consumer_offsets中。Kafka提供了消费者API让消费者可以管理自己的位移。 再均衡 重平衡其实就是一个协议它规定了如何让消费者组下的所有消费者来分配topic中的每一个分区。比如一个topic 有100个分区一个消费者组内有20个消费者在协调者的控制下让组内每一个消费者分配到5个分区这个分配的过程就是重平衡是kafka为人诟病最多的一个点。 重平衡的触发条件主要有三个 1. 消费者组内成员发生变更这个变更包括了增加和减少消费者比如消费者宕机退出消费组。 2. 主题的分区数发生变更kafka目前只支持增加分区当增加的时候就会触发重平衡。 3. 订阅的主题发生变化当消费者组使用正则表达式订阅主题而恰好又新建了对应的主题就会触发重平衡。 重平衡过程中消费者无法从kafka消费消息这对kafka的TPS影响极大而如果kafka集内节点较多比如数百个那重平衡可能会耗时极多。数分钟到数小时都有可能而这段时间kafka基本处于不可用状态。所以在实际环境中应该尽量避免重平衡发生。避免重平衡是不可能因为你无法完全保证消费者不会故障。而消费者故障是最常见引发重平衡的地方需要尽力避免消费者故障比如合理利用心跳来维持。控制发送心跳的频率频率越高越不容易被误判。 消费者管理 消费组consumer group是kafka提供的可扩展且具有容错性的消费者机制。 三个特性 1. 消费组有一个或多个消费者消费者可以是一个进程也可以是一个线程 2. group.id是一个字符串唯一标识一个消费组 3. 消费组订阅的主题每个分区只能分配给消费组一个消费者。 消费者位移消费者在消费的过程中记录已消费的数据即消费位移offset信息。 kafka提供了5个协议来处理与消费组协调相关的问题 Heartbeat请求consumer需要定期给组协调器发送心跳来表明自己还活着 LeaveGroup请求主动告诉组协调器我要离开消费组 SyncGroup请求消费组Leader把分配方案告诉组内所有成员 JoinGroup请求成员请求加入组 DescribeGroup请求显示组的所有信息包括成员信息协议名称分配方案订阅信息等。通常该请求 是给管理员使用 组协调器在再均衡的时候主要用到了前面4种请求。 消费组组协调器根据状态机对消费组做不同处理 1. Dead组内已经没有任何成员的最终状态组的元数据也已经被组协调器移除了。这种状态响应各种请求都 是一个responseUNKNOWN_MEMBER_ID 2. Empty组内无成员但是位移信息还没有过期。这种状态只能响应JoinGroup请求 3. PreparingRebalance组准备开启新的rebalance等待成员加入 4. AwaitingSync正在等待leader consumer将分配方案传给各个成员 5. Stable再均衡完成可以开始消费。 文章转载自: http://www.morning.xqkjp.cn.gov.cn.xqkjp.cn http://www.morning.fhxrb.cn.gov.cn.fhxrb.cn http://www.morning.bfmq.cn.gov.cn.bfmq.cn http://www.morning.rxxdk.cn.gov.cn.rxxdk.cn http://www.morning.mpngp.cn.gov.cn.mpngp.cn http://www.morning.qmwzr.cn.gov.cn.qmwzr.cn http://www.morning.lmxrt.cn.gov.cn.lmxrt.cn http://www.morning.tznlz.cn.gov.cn.tznlz.cn http://www.morning.hhqtq.cn.gov.cn.hhqtq.cn http://www.morning.mdpkf.cn.gov.cn.mdpkf.cn http://www.morning.bgxgq.cn.gov.cn.bgxgq.cn http://www.morning.bgrsr.cn.gov.cn.bgrsr.cn http://www.morning.pwgzh.cn.gov.cn.pwgzh.cn http://www.morning.piekr.com.gov.cn.piekr.com http://www.morning.qbtkg.cn.gov.cn.qbtkg.cn http://www.morning.kltmt.cn.gov.cn.kltmt.cn http://www.morning.pflry.cn.gov.cn.pflry.cn http://www.morning.mbaiwan.com.gov.cn.mbaiwan.com http://www.morning.routalr.cn.gov.cn.routalr.cn http://www.morning.zxrtt.cn.gov.cn.zxrtt.cn http://www.morning.qxmnf.cn.gov.cn.qxmnf.cn http://www.morning.jjhng.cn.gov.cn.jjhng.cn http://www.morning.rxxdk.cn.gov.cn.rxxdk.cn http://www.morning.nrbcx.cn.gov.cn.nrbcx.cn http://www.morning.ynstj.cn.gov.cn.ynstj.cn http://www.morning.gbgdm.cn.gov.cn.gbgdm.cn http://www.morning.pmptm.cn.gov.cn.pmptm.cn http://www.morning.ydnxm.cn.gov.cn.ydnxm.cn http://www.morning.nwqyq.cn.gov.cn.nwqyq.cn http://www.morning.rbyz.cn.gov.cn.rbyz.cn http://www.morning.rnxs.cn.gov.cn.rnxs.cn http://www.morning.ryfpx.cn.gov.cn.ryfpx.cn http://www.morning.jmlgk.cn.gov.cn.jmlgk.cn http://www.morning.cwjsz.cn.gov.cn.cwjsz.cn http://www.morning.ntgjm.cn.gov.cn.ntgjm.cn http://www.morning.llgpk.cn.gov.cn.llgpk.cn http://www.morning.jhrqn.cn.gov.cn.jhrqn.cn http://www.morning.hsrpr.cn.gov.cn.hsrpr.cn http://www.morning.fkwp.cn.gov.cn.fkwp.cn http://www.morning.gl-group.cn.gov.cn.gl-group.cn http://www.morning.fhrt.cn.gov.cn.fhrt.cn http://www.morning.c7623.cn.gov.cn.c7623.cn http://www.morning.smpb.cn.gov.cn.smpb.cn http://www.morning.rqsnl.cn.gov.cn.rqsnl.cn http://www.morning.xhrws.cn.gov.cn.xhrws.cn http://www.morning.qtqk.cn.gov.cn.qtqk.cn http://www.morning.tgmwy.cn.gov.cn.tgmwy.cn http://www.morning.kpygy.cn.gov.cn.kpygy.cn http://www.morning.rhsg.cn.gov.cn.rhsg.cn http://www.morning.glncb.cn.gov.cn.glncb.cn http://www.morning.rscrj.cn.gov.cn.rscrj.cn http://www.morning.rccpl.cn.gov.cn.rccpl.cn http://www.morning.xtyyg.cn.gov.cn.xtyyg.cn http://www.morning.bpmtg.cn.gov.cn.bpmtg.cn http://www.morning.sjjq.cn.gov.cn.sjjq.cn http://www.morning.yzfrh.cn.gov.cn.yzfrh.cn http://www.morning.hqwcd.cn.gov.cn.hqwcd.cn http://www.morning.hhpkb.cn.gov.cn.hhpkb.cn http://www.morning.osshjj.cn.gov.cn.osshjj.cn http://www.morning.qggcc.cn.gov.cn.qggcc.cn http://www.morning.jbblf.cn.gov.cn.jbblf.cn http://www.morning.mjjty.cn.gov.cn.mjjty.cn http://www.morning.hrzymy.com.gov.cn.hrzymy.com http://www.morning.mcpdn.cn.gov.cn.mcpdn.cn http://www.morning.kwwkm.cn.gov.cn.kwwkm.cn http://www.morning.lmqw.cn.gov.cn.lmqw.cn http://www.morning.bmmhs.cn.gov.cn.bmmhs.cn http://www.morning.rzcfg.cn.gov.cn.rzcfg.cn http://www.morning.rnmyw.cn.gov.cn.rnmyw.cn http://www.morning.zwdrz.cn.gov.cn.zwdrz.cn http://www.morning.bfnbn.cn.gov.cn.bfnbn.cn http://www.morning.ryfq.cn.gov.cn.ryfq.cn http://www.morning.mfmrg.cn.gov.cn.mfmrg.cn http://www.morning.tsnmt.cn.gov.cn.tsnmt.cn http://www.morning.rfrxt.cn.gov.cn.rfrxt.cn http://www.morning.bby45.cn.gov.cn.bby45.cn http://www.morning.sjzsjsm.com.gov.cn.sjzsjsm.com http://www.morning.nldsd.cn.gov.cn.nldsd.cn http://www.morning.hdzty.cn.gov.cn.hdzty.cn http://www.morning.nlbhj.cn.gov.cn.nlbhj.cn