方正集团网站是谁做的,企业网站策划建设方案百度,wordpress front profile,传统设计公司网站zookeeper kafka 消息队列
一、消息队列简介
1、什么是消息队列
消息队列#xff08;Message Queue#xff09;是一种用于跨进程或分布式系统中传递消息的通信机制。消息队列在异步通信、系统解耦、负载均衡和容错方面具有重要作用。
#xff08;1#xff09;特性 异步…zookeeper kafka 消息队列
一、消息队列简介
1、什么是消息队列
消息队列Message Queue是一种用于跨进程或分布式系统中传递消息的通信机制。消息队列在异步通信、系统解耦、负载均衡和容错方面具有重要作用。
1特性 异步通信发送方将消息发送到队列中后不需要等待接收方处理完毕即刻返回继续执行。接收方可以在需要时从队列中读取并处理消息。 解耦消息队列在发送方和接收方之间充当中介允许它们独立运行。这样即使其中一个部分暂时不可用系统的整体功能依然可以保持正常。 负载均衡通过消息队列多个消费者可以分担消息处理的工作量提高系统的吞吐量。 可靠性消息队列可以确保消息在传递过程中不丢失即使在系统出现故障时也能保证消息被妥善处理。 扩展性可以根据需要增加消息生产者或消费者从而轻松扩展系统。 缓冲和流量控制在高并发的场景下系统的不同部分可能无法以相同的速度处理请求。消息队列可以作为缓冲区调节生产者和消费者之间的速度差异防止系统过载或崩溃。
2用途
任务队列在需要异步处理任务的情况下消息队列非常有用。例如Web 应用在接收到用户请求后将耗时操作如视频处理、文件转换放入队列中立即返回响应后续处理由工作线程完成。事件通知在事件驱动系统中组件之间可以通过消息队列进行通信和通知。例如当用户在网站上完成购买操作后消息队列可以通知库存管理系统更新库存。日志收集可以将系统中产生的日志消息通过消息队列发送到集中式日志处理系统中以便进行实时分析和监控。数据流处理用于处理数据流的系统如实时分析平台可以通过消息队列来传递和处理数据。
2、常见的消息队列软件
1RabbitMQ
特点
基于AMQPAdvanced Message Queuing Protocol协议具有高度灵活性。支持多种消息传递协议包括 STOMP、MQTT 和 HTTP。提供可靠的消息传递机制包括消息确认、持久化、发布确认等。支持复杂的路由机制如使用交换机Exchanges和绑定Bindings来实现消息路由。易于管理和监控提供了丰富的管理工具和插件。
用途
企业级应用程序集成。任务调度系统。实时消息通知系统。
优点
功能强大支持多种协议和复杂的消息路由。稳定性高适用于企业级应用。良好的文档和社区支持。
缺点
对资源要求较高配置和维护相对复杂。吞吐量相对较低不适合高吞吐量的实时数据流处理。
2Apache Kafka
特点
分布式流处理平台提供高吞吐量、低延迟的消息传递。支持持久化所有消息都被持久化到磁盘上可以通过配置保留时间。基于发布-订阅模型消费者可以独立地读取和处理消息。支持分区Partition和副本Replica保证高可用性和容错性。强大的扩展性能够轻松扩展以处理大规模数据。
用途
大数据实时处理和分析。日志收集和处理。流式数据处理如物联网数据、金融交易数据。
优点
高吞吐量适合处理大量实时数据。低延迟消息传递和处理速度快。可扩展性强适合大规模分布式系统。
缺点
学习曲线较陡配置和管理相对复杂。消费者需要处理消息的顺序性和幂等性增加了应用程序的复杂性。
3ActiveMQ
特点
基于JMSJava Message Service规范的开源消息中间件。支持多种消息传递协议包括 OpenWire、STOMP、MQTT、AMQP、REST、WebSocket 等。提供丰富的特性如消息持久化、事务、消息优先级等。易于嵌入 Java 应用程序支持多语言客户端如 C、.NET、Python、Perl、PHP。提供了管理控制台和监控工具便于管理和监控消息系统。
用途
企业应用集成。任务调度和自动化流程。消息驱动的微服务架构。
优点
功能全面支持多种协议和语言。与 Java 应用程序集成良好。社区活跃文档丰富。
缺点
性能相对较低不适合高吞吐量的场景。在处理大规模分布式系统时扩展性有限。
4Amazon SQS
特点
AWS 提供的完全托管的消息队列服务无需管理服务器和基础设施。提供标准队列和 FIFO先入先出队列满足不同的消息传递需求。支持自动扩展能够处理任意数量的消息。与 AWS 生态系统无缝集成易于与其他 AWS 服务如 Lambda、S3、SNS集成。提供消息可见性超时、消息延迟、消息批处理等功能。
用途
分布式系统中的异步任务处理。微服务架构中的消息传递。日志和事件驱动的工作流。
优点
托管服务无需运维简单易用。高可用性和可靠性由 AWS 提供保障。可与 AWS 生态系统中的其他服务轻松集成。
缺点
成本可能较高特别是在高消息吞吐量的场景中。对于非常高性能和定制化需求的场景灵活性可能不足。
二、kafka
Apache Kafka 是一个分布式流处理平台最初由LinkedIn开发并于2011年成为Apache开源项目。Kafka主要用于构建实时数据管道和流处理应用程序它具有高吞吐量、低延迟、可扩展性和容错性。
1、kafka 架构
Kafka 的架构主要包括以下几个核心组件
1. Producer生产者
生产者是负责发布消息到 Kafka 主题的客户端。生产者可以选择将消息发送到特定的分区也可以通过键进行分区路由。
2. Consumer消费者
消费者是负责从 Kafka 主题中读取消息的客户端。消费者可以订阅一个或多个主题并以流的方式处理数据。消费者通常会组成一个消费者组Consumer Group每个消费者组可以同时读取和处理消息实现负载均衡。
3. Broker代理
Kafka 集群中的每个服务器称为一个代理Broker。每个代理负责接收、存储和转发消息。一个 Kafka 集群通常由多个代理组成以实现高可用性和容错性。
4. Topic主题
主题是 Kafka 中数据的分类容器。每个主题可以有多个生产者和消费者。主题可以分为多个分区每个分区内的消息是有序的但跨分区的消息顺序不保证。在 Kafka 中一个代理Broker可以承载多个主题Topic。具体有多少个主题取决于 Kafka 集群的配置和使用场景。
1分区内的顺序性
在一个分区内消息是严格有序的这意味着消息的生产顺序与消费顺序一致。消费者在消费消息时按照偏移量顺序读取消息。
2跨分区的无序性
虽然分区内的消息是有序的但不同分区之间的消息顺序不保证。跨分区的消息无序性是由于分区是独立的并行处理单元生产者可以同时向多个分区发送消息导致整体顺序无法保证。
3特征 多生产者和多消费者一个主题可以有多个生产者发布消息也可以有多个消费者订阅和消费消息。这种特性使得 Kafka 可以轻松实现多对多的数据传递。 逻辑分组主题用于将相同类别的数据进行逻辑分组便于管理和处理。例如可以创建一个主题来存储网站的访问日志另一个主题来存储订单信息。
5. Partition分区
分区是 Kafka 中并行处理的基本单元。每个分区在磁盘上是一个日志文件消息以追加的方式写入。分区提供了高吞吐量和并行处理能力。
1特性
顺序性在一个分区内消息是严格有序的。每条消息都有一个唯一的偏移量Offset表示消息在分区中的位置。消费者在消费消息时会按照偏移量顺序逐条读取消息。并行处理主题的分区允许 Kafka 在多个服务器上并行处理消息。生产者可以将消息分布到不同的分区消费者也可以从多个分区并行读取消息从而实现负载均衡和高吞吐量。副本机制每个分区可以有多个副本Replica其中一个为领导者Leader其余为追随者Follower。所有的读写操作都由领导者处理追随者从领导者复制数据。副本机制保证了数据的高可用性和容错性。
6. Zookeeper
Zookeeper 负责存储集群的元数据如代理信息、分区状态等。它还负责选举 Kafka 集群的控制器管理分区的副本分配和故障恢复。
2、partition的路由规则与默认策略
1 Partition 的数据路由规则 数据路由规则决定了生产者如何将消息发送到 Kafka Topic 的各个 Partition。Kafka 提供了几种常见的 Partitioning 策略具体如下 Round-robin轮询这是最简单的策略生产者轮流将消息发送到不同的 Partition。如果所有 Partition 都有相似的负载和数据量这种策略可以实现基本的负载均衡。但是它不能保证消息的相关性或有序性。 Hash-based基于哈希生产者使用消息的某个属性如 key计算哈希值然后根据哈希值将消息路由到对应的 Partition。这种策略可以保证具有相同 key 的消息总是被发送到同一个 Partition从而保证了这些消息的顺序性。 Custom Partitioner自定义分区器开发者可以根据自己的需求实现自定义的 Partitioner 接口来控制消息的分区逻辑。这种方式灵活性最高可以根据业务需求定义非常复杂的分区逻辑。
2 默认的 Partitioning 策略 在 Kafka 中默认的 Partitioning 策略是基于消息的 key 进行哈希分区。具体步骤如下 如果消息有 key则使用 key 进行哈希计算然后将哈希值与 Topic 的 Partition 数取模以确定消息发送到哪个 Partition。 如果消息没有 key则使用轮询策略即将消息依次发送到每个 Partition实现简单的负载均衡。
3、消费者组详细介绍
1介绍 消费者组成员 消费者组由多个消费者实例Consumer Instance组成。每个消费者实例通常运行在不同的进程或者不同的计算机上。 主题分区分配 消费者组的每个实例会订阅一个或多个主题。每个主题被分为多个分区Partitions每个分区只能由消费者组中的一个实例进行消费。Kafka 通过分区的分配策略将分区均匀分配给消费者组中的各个实例确保每个分区只被一个消费者实例消费但一个消费者实例可以消费多个分区。 消费者组协调器 Kafka 集群中有一个特殊的 Broker 作为消费者组的协调器Consumer Group Coordinator。每个消费者组有一个唯一的消费者组 ID由协调器管理和分配。
2消费者组的工作机制 消费者协调与分配 当消费者组中的一个新消费者实例加入或离开时或者分区发生重新分配时消费者组的协调器负责重新分配分区给消费者实例。分区的分配基于消费者实例的订阅和分配策略Partition Assignment Strategy如范围分配、轮询分配等。 消息处理并行性 每个分区内的消息是有序的而不同分区之间的消息顺序不保证。消费者组中的每个消费者实例在处理分区内的消息时是单线程的但是多个实例可以并行处理不同分区的消息从而提高整体的消费吞吐量。 消费者偏移提交 消费者组中的每个消费者实例会周期性地提交消费偏移Offset到 Kafka 集群以记录其对消息的消费进度。Kafka 提供了自动和手动两种提交偏移的方式确保消息处理的可靠性和一致性。
3消费者组的应用场景 消费者组在 Kafka 中有多种应用场景包括但不限于 并行处理多个消费者实例并行处理同一主题的消息提高消费吞吐量和效率。 水平扩展通过增加消费者实例可以水平扩展消费者组的处理能力适应大规模数据流的需求。 容错和高可用性当一个消费者实例故障或下线时协调器会重新分配其负责的分区给其他实例确保消息的连续性和可用性。
4、偏移量详细介绍
1偏移量概念 偏移量是一个64位的整数用来唯一标识消费者在一个特定分区中已经消费过的消息位置。每个消费者都会为每个分区维护一个偏移量。偏移量的作用包括 消费位置的记录偏移量表示消费者已经处理并成功提交的消息位置。消费者会定期地更新偏移量以记录自己的消费进度。 消息处理的顺序性Kafka 保证每个分区内的消息顺序消费者通过记录偏移量来确保消息的有序消费避免重复消费或消息丢失。 消费者的恢复如果消费者实例停止或重启它可以利用存储的偏移量来恢复消费位置从上次离开的地方继续消费而不会丢失消息。
2偏移量的管理方式
在 Kafka 中偏移量的管理可以通过以下几种方式实现
自动提交偏移量 消费者可以选择开启自动提交偏移量的功能Kafka 将定期根据配置的间隔自动将消费者的偏移量提交到 Kafka 集群中。这种方式简单方便但可能会因为提交频率不合适而导致消息的重复消费或丢失。 手动提交偏移量 消费者可以显式地在适当的时候手动提交当前的偏移量。手动提交偏移量可以精确地控制偏移量的提交时机避免自动提交可能出现的问题如重复消费或消息丢失。 偏移量存储 消费者通常会将偏移量存储在外部系统中如数据库或文件系统。这样做的好处是即使消费者实例重新启动或扩展也能够方便地恢复之前的消费进度。
3偏移量的生命周期
偏移量的生命周期包括
消费者组内偏移量管理 每个消费者组内的每个消费者实例会独立地管理自己负责的分区的偏移量。每次消费者处理完消息后会更新并提交偏移量。这个过程可以是自动的由 Kafka 管理或者手动的由应用程序控制。 偏移量的保留和删除 Kafka 默认会保留消费者组的偏移量信息一段时间以便在消费者重新加入或恢复时使用。可以通过 Kafka 配置来调整偏移量的保留时间retention period或存储方式。 偏移量的提交 偏移量提交到 Kafka 集群后会被持久化存储确保在 Kafka 集群故障或消费者重启时偏移量信息不会丢失。
5、kafka中的副本同步
在 Apache Kafka 中副本Replica的同步是通过一种基于日志的复制机制来实现的具体过程如下
1 副本同步基本原理
领导者和追随者 每个分区Partition都有一个领导者Leader和若干个追随者Follower。领导者负责处理所有的读写请求并维护分区的写入顺序。追随者从领导者复制数据并保持与领导者的数据同步。
2 副本同步过程
副本同步的过程主要分为两个阶段首先是领导者将数据写入本地日志Leader Log然后追随者从领导者的日志中复制数据。
2.1 领导者写入本地日志Leader Log 生产者发送消息 当生产者向 Kafka 发送消息时消息首先写入分区的领导者副本Leader Replica的本地日志中。 消息确认 消息发送确认类型 Kafka 提供了三种消息发送确认类型分别是 acks0生产者发送消息后不等待服务端的任何确认消息被认为已发送成功。acks1生产者发送消息后等待服务端的领导者副本确认接收到消息后即认为消息发送成功。acksall或 acks-1生产者发送消息后等待服务端的所有 ISR同步副本确认接收到消息后即认为消息发送成功。 消息持久性保证 根据不同的确认类型Kafka 提供了不同级别的消息持久性保证 acks0消息可能会丢失适用于对数据实时性要求高但可以容忍少量消息丢失的场景。acks1消息被确认写入领导者副本后即认为成功数据不会丢失但可能会存在一定程度的重复发送。acksall消息被所有 ISR 中的副本确认写入后才认为成功提供了最高级别的数据持久性保证但延迟较高。 生产者确认机制 当生产者发送消息后可以通过设置 acks 参数来选择确认级别。生产者可以通过配置来平衡消息传递的可靠性和延迟。 消息确认机制的工作流程 生产者发送消息到 Kafka 集群的指定主题和分区。根据生产者配置的 acks 参数生产者可能会等待领导者副本或所有 ISR 中的副本确认消息的接收。如果确认级别设为 acksall生产者会等待所有 ISR 中的副本都确认接收消息然后才会收到确认。如果确认级别设为 acks1生产者会等待领导者副本确认接收消息并且不会等待其他 ISR 中的副本确认。
2.2 追随者从领导者复制数据 追随者复制数据 追随者定期从领导者获取数据块batch并复制到本地的日志中。追随者使用 Kafka 协议从领导者拉取数据确保领导者和追随者之间的数据一致性。 同步方式 Kafka 支持两种类型的复制同步方式 同步复制 在 Kafka 中同步复制是指追随者副本在接收到数据后必须向领导者发送确认领导者才会继续处理新的写入请求。这种方式确保了每条消息在所有的 ISR 中的副本都得到了写入确认保证了数据的可靠性和一致性但会增加延迟。 异步复制 异步复制是指追随者副本在接收到数据后不会向领导者发送确认领导者可以立即继续处理新的写入请求。追随者会在后台异步地复制数据这样可以降低写入操作的延迟但可能会造成一段时间内领导者和追随者之间的数据不一致。
3 同步机制细节 数据批次 Kafka 使用数据批次batches来减少网络开销和提高效率。领导者会将多个消息组合成一个批次然后一次性发送给追随者。 保序性 Kafka 保证同一分区的消息在所有的副本之间的顺序一致性。即使跨越多个副本消息也会按照写入的顺序进行复制和提交。 ISR 机制 ISR 是指与领导者保持同步的副本集合。只有在 ISR 中的副本才能参与到消息的读写操作中确保了高可用性和一致性。
4 故障处理 领导者故障 如果领导者宕机Kafka 会从 ISR 中选择一个新的领导者。新的领导者会继续从上一个领导者复制未提交的数据并负责后续的写入操作。 追随者故障 如果追随者宕机领导者会继续向其他追随者发送数据直到该追随者重新加入 ISR 并恢复复制。
6、ISR 的定义和作用
1ISR 的定义
ISR 是指与分区的领导者副本保持同步的一组副本。领导者会将消息写入 ISR 中的所有副本确保数据的一致性和可用性。ISR 中的副本在领导者视角下是“同步的”即它们已经接收并复制了领导者的所有写入操作。
2ISR 的作用
确保高可用性当领导者失效时Kafka 可以从 ISR 中选择一个新的领导者而无需等待其他副本不在 ISR 中的副本复制完整份数据。提高性能只有 ISR 中的副本才参与到读写请求的处理中这样可以减少网络延迟和提高性能。
7、kafka文件存储机制概述
1日志分段Log Segments
概念 Kafka 将每个主题的每个分区的消息以日志分段Log Segments的形式进行持久化存储。每个分区都有一个或多个日志分段每个日志分段对应一个日志段文件。这种存储模型包括日志文件Log Segments和索引文件Index Files其存储在同一个文件夹下的不同文件中。 日志分段文件 每个日志分段文件都是一个独立的文件用于存储一定时间范围内或达到一定大小的消息数据。Kafka 使用预先配置的参数例如 log.segment.bytes 控制默认大小默认为 1GB来确定何时创建新的日志分段文件。 日志分段文件的特点 每个日志分段文件都有一个唯一的起始偏移量Offset和一个范围记录了该段文件内包含的消息的偏移量范围。新消息会追加到当前活跃的日志分段文件中当文件大小达到预设阈值时Kafka 将关闭该文件并创建一个新的日志分段文件。
2 索引文件Index Files
概念 为了快速查找消息Kafka 使用索引文件Index Files来存储消息偏移量和物理位置之间的映射关系。索引文件按照预设大小默认为 4KB划分成索引条目每个条目存储一段消息的偏移量和物理位置。 索引文件的作用 当消费者需要读取消息时它可以通过索引文件快速定位到对应消息的物理位置而无需扫描整个日志文件。索引文件大大提高了消息的读取效率特别是在大型分区和高吞吐量的情况下。
3 清理策略Retention Policy
概念 Kafka 支持基于时间和大小的消息保留策略通过配置参数来控制消息在日志分段中的保留时长和大小。主要参数包括 log.retention.hours指定消息在分段中保留的时间默认为 7 天。log.retention.bytes指定分段文件的最大大小默认为 -1无限制。 清理过程 根据配置的保留策略Kafka 定期扫描日志分段文件并删除过期或超出大小限制的分段文件。清理策略确保了存储空间的有效利用同时保证了数据的有效性和可靠性。
4 日志压缩Log Compaction
概念 对于一些需要保留最新状态的数据Kafka 提供了日志压缩Log Compaction功能。日志压缩会定期检查主题的消息保留每个键的最新消息而删除过时的或重复的消息从而节省存储空间。 应用场景 日志压缩特别适合用于存储键值对的场景如状态存储或事件日志确保最新状态的数据不会因为历史数据的堆积而被覆盖。
5具体配置 默认情况下Kafka 的日志文件和索引文件存储在一个或多个目录中这些目录由 log.dirs 参数指定。 log.dirs 参数可以配置为一个或多个目录路径多个路径之间用逗号分隔。这样做的目的是为了提供数据的冗余备份和提高性能。 当 Kafka 创建新的日志分段文件或索引文件时它会依次选择配置的目录路径之一来存储文件。 配置示例
在 Kafka 的配置文件通常是 server.properties中可以配置 log.dirs 参数例如
log.dirs/path/to/kafka/logs如果需要配置多个存储路径可以用逗号分隔
log.dirs/path/to/kafka/logs1,/path/to/kafka/logs28、LEOLog End Offset和 HWHigh Watermark
1概念
LEOLog End Offset指的是每个副本包括 leader 和 follower当前最大的消息偏移量offset。即当前副本中最新消息的 offset。HWHigh Watermark指的是消费者能够见到的最大的 offset也就是所有副本中最小的 LEO。HW 之前的所有消息被认为是已经提交和可靠的。
2处理机制
1. Follower 故障处理
故障情况 如果一个 follower 发生故障它会暂时被踢出 ISRIn-Sync Replicas同步副本集合。当 follower 恢复后它会从本地磁盘读取上次的 HW即从已经被确认的最高偏移量开始。follower 将本地 log 文件中高于 HW 的部分截掉然后从 HW 开始向 leader 进行数据同步。一旦 follower 的 LEO 大于等于该 Partition 的 HW即 follower 追上 leader就可以重新加入 ISR参与到消息的复制和同步中。
2. Leader 故障处理
故障情况 如果 leader 发生故障Kafka 会从 ISR 中选出一个新的 leader。新的 leader 被选出后其余的 follower 会进行以下操作 首先它们会将自己本地 log 文件中高于 HW 的部分截掉保留 HW 之前的数据。然后它们开始从新的 leader 处获取数据进行同步确保数据的一致性和可靠性。
9、kafka常用命令
1Kafka 根目录结构
Kafka 的安装目录结构通常如下所示
kafka/
├── bin/ # 包含所有 Kafka 命令行工具的目录
├── config/ # 存放 Kafka 配置文件的目录
├── libs/ # 存放 Kafka 所需的库文件
├── logs/ # 存放 Kafka 日志文件的目录
└── ...2 常用 Kafka 命令及使用示例
2.1创建和管理主题Topics 创建主题 ./bin/kafka-topics.sh --create --topic topic_name --partitions num_partitions --replication-factor replication_factor --bootstrap-server broker_list--replication-factor replication_factor指定每个分区的副本数。
--bootstrap-server broker_list指定连接的 Kafka Broker 列表。示例 ./bin/kafka-topics.sh --create --topic myTopic --partitions 3 --replication-factor 1 --bootstrap-server localhost:9092说明在 Kafka 安装目录下的 bin/ 目录中执行命令。 查看主题列表 ./bin/kafka-topics.sh --list --bootstrap-server broker_list示例./bin/kafka-topics.sh --list --bootstrap-server localhost:9092查看主题详细信息 ./bin/kafka-topics.sh --describe --topic topic_name --bootstrap-server broker_list示例./bin/kafka-topics.sh --describe --topic myTopic --bootstrap-server localhost:90922.2生产者和消费者操作 生产者发送消息 ./bin/kafka-console-producer.sh --topic topic_name --bootstrap-server broker_list示例 ./bin/kafka-console-producer.sh --topic myTopic --bootstrap-server localhost:9092消费者消费消息 ./bin/kafka-console-consumer.sh --topic topic_name --from-beginning --bootstrap-server broker_list示例 ./bin/kafka-console-consumer.sh --topic myTopic --from-beginning --bootstrap-server localhost:90922.3 消费者组管理 查看消费者组列表 ./bin/kafka-consumer-groups.sh --list --bootstrap-server broker_list示例 ./bin/kafka-consumer-groups.sh --list --bootstrap-server localhost:9092查看消费者组消费情况 ./bin/kafka-consumer-groups.sh --describe --group group_name --bootstrap-server broker_list示例 ./bin/kafka-consumer-groups.sh --describe --group myConsumerGroup --bootstrap-server localhost:90922.4 其他管理和调试命令 查看集群信息 ./bin/kafka-topics.sh --describe --topic topic_name --bootstrap-server broker_list示例 ./bin/kafka-topics.sh --describe --topic myTopic --bootstrap-server localhost:9092查看 Broker 日志 tail -f logs/server.log示例在 Kafka 根目录下执行查看当前 Kafka Broker 的日志。 tail -f logs/server.log三、zookeeper
ZooKeeper动物园管理员是一个开源的分布式协调服务旨在提供一个高度可靠的协作基础用于分布式应用程序中的数据管理、配置管理、命名服务等任务。
1、zookeeper的工作机制
ZooKeeper 的工作机制主要围绕其设计的分布式一致性服务和协作基础展开它通过一些关键的设计和算法来实现高可靠性和可扩展性。以下是 ZooKeeper 的工作机制的详细解释。
1集群模式 ZooKeeper 集群ZooKeeper 以集群的方式运行每个集群节点称为 ZooKeeper 服务器。集群中的节点数量通常为奇数例如 3、5、7 等以实现多数派选举和容错性。 客户端连接客户端通过连接到任意一个 ZooKeeper 服务器来与整个集群进行交互这些服务器相互之间通过 TCP/IP 进行通信。
2数据模型 ZooKeeper 的数据模型ZooKeeper 提供了类似文件系统的数据模型称为 ZooKeeper 数据树ZooKeeper Data Tree。 /
├── apps
│ ├── app1
│ └── app2
├── config
│ ├── global
│ │ ├── setting1
│ │ └── setting2
│ └── local
│ └── setting3
└── services├── service1│ ├── instances│ │ ├── instance1│ │ └── instance2│ └── status└── service2 根节点 / 是 ZooKeeper 数据树的起始点。 apps、config 和 services 是一级子节点。 apps 下有 app1 和 app2 两个子节点。 config 下有 global 和 local 两个子节点global 下包含 setting1 和 setting2。 services 下有 service1 和 service2service1 下有 instances 和 status 两个子节点instances 下有 instance1 和 instance2。 节点ZNodeZooKeeper 数据树中的每个节点称为 ZNode类似于文件系统中的目录或文件。每个 ZNode 可以存储一小段数据并且可以关联 ACL访问控制列表来限制访问权限。
3 原子广播协议ZAB ZAB 协议ZooKeeper 使用 ZABZooKeeper Atomic Broadcast协议来实现数据的一致性和可靠性。 基本概念 Leader 选举ZAB 协议确保集群中只有一个 Leader 负责处理客户端请求和更新。原子广播当 Leader 接收到客户端的写请求时它将请求广播给所有的 Follower并等待大多数 Follower 的确认然后才将操作应用到自身状态。事务 ID每个事务都有一个全局唯一的事务 ID用于标识和排序。
4一致性和持久性 一致性保证ZooKeeper 保证了在分布式环境下的数据一致性所有的更新操作都是原子性的并且按照客户端的顺序执行。 持久性存储ZooKeeper 将数据存储在磁盘上并使用写前日志Write-Ahead LogWAL来保证即使在节点故障后也不会丢失数据。
5事件监听和通知 Watch 机制ZooKeeper 允许客户端注册 Watcher 监听某个 ZNode 的状态变化。 事件通知一旦 ZNode 的状态发生变化如数据更新、ZNode 删除等ZooKeeper 将通知所有注册了 Watcher 的客户端使得客户端可以及时响应状态变化。
2、应用场景
ZooKeeper 的工作机制使其在以下场景中广泛应用 分布式锁通过创建临时顺序节点实现分布式锁确保在分布式环境下的资源竞争问题。 选举机制通过 ZAB 协议实现 Leader 选举确保分布式系统中只有一个主节点处理请求。 配置管理存储和管理配置信息各个节点通过 Watcher 实时感知配置的变化。 服务注册与发现将服务节点作为 ZNode 注册到 ZooKeeper 中客户端通过 Watcher 发现服务的上线和下线状态。
3、特点
分布式协调
ZooKeeper 提供了一个分布式环境下的一致性服务帮助应用程序协调多个节点之间的操作和状态。
数据管理
应用程序可以使用 ZooKeeper 存储和管理关键的元数据、配置信息以及其他动态数据。这些数据可以被多个节点共享和访问。
配置管理
ZooKeeper 提供了一个轻量级的分布式配置管理系统可以集中管理和动态更新应用程序的配置。
命名服务
作为一个命名服务ZooKeeper 允许应用程序注册和发现服务、节点或者资源的信息从而简化了分布式系统中的服务发现和通信。
分布式锁
ZooKeeper 提供了高效的分布式锁机制允许多个进程或者线程在分布式环境中安全地竞争资源访问。
事件通知
客户端可以注册监听器以接收关于数据变更和状态变化的通知这使得应用程序可以实时响应系统状态的变化。
4、zookeeper 选举机制
1角色介绍
Leader负责处理所有的客户端写请求并进行数据更新操作。Leader 也负责协调和同步所有的 Follower 节点。Follower复制 Leader 的状态并处理客户端的读请求。如果 Leader 失效Follower 可以参与选举新的 Leader。Observer观察者节点接收并复制 Leader 的状态但不参与投票和选举过程。用于扩展集群的读取能力减少对 Leader 的读取压力。
2zookeeper 节点状态 LOOKING选举状态 节点正在寻找新的 Leader即处于选举过程中。 发起选举请求尝试成为新的 Leader。 FOLLOWING跟随状态 Follower 节点的状态。 节点已经确定当前的 Leader并跟随 Leader 处理请求。 LEADING领导状态 Leader 节点的状态。 节点负责处理和协调整个集群的写请求和事务处理。 OBSERVING观察状态 Observer 节点的状态。 类似于 Follower但专门用于扩展集群的读取能力。 不参与 Leader 的选举过程仅处理读请求提升系统的性能和吞吐量。
3第一次启动选举机制流程 启动阶段 当一个新的 ZooKeeper 节点首次启动时它会进入初始状态并尝试加入到已经存在的集群中。如果此时集群中还没有 Leader即第一次启动或者之前的 Leader 已经失效新节点会启动选举过程。 节点状态 新节点将自己设置为 LOOKING 状态表示它正在寻找新的 Leader。 投票 新节点会向集群中的其他节点发送选举请求请求其他节点投票支持自己成为 Leader。每个节点会为自己投票并将投票结果广播给整个集群。 多数派原则 根据 ZooKeeper 的多数派原则如果新节点能够获得超过半数节点的投票支持N/2 1它将成为新的 Leader。这确保了选出的 Leader 能够获得集群的大部分节点的认可和支持。 Leader 确定 一旦新节点获得了多数节点的投票支持它将成为新的 Leader。其他节点将根据新的 Leader 的通知和广播更新自己的状态标记新的 Leader。 系统启动 一旦选举完成ZooKeeper 系统将正式启动新的 Leader 节点开始处理客户端的写请求并协调整个集群的状态。
4注意事项 节点数和半数原则 ZooKeeper 集群中至少需要有奇数个节点这样才能确保在选举中始终有一半以上的节点支持新的 Leader 的选举。 超时机制 在选举过程中每个节点会设置一个超时时间。如果在超时时间内没有达成半数投票节点将重新发起新一轮的选举。 持久化 ZooKeeper 通过持久化存储和写前日志WAL来保证选举操作和数据更新的持久性确保即使在节点故障或重启后系统状态可以恢复到之前的正确状态。
5非第一次选举机制
非第一次选举机制是指在已经运行的 ZooKeeper 集群中当现有的 Leader 出现故障或失效时集群如何重新选举新的 Leader 的过程。这种选举机制的执行确保了集群在面对节点故障或网络分区等情况时能够快速恢复并保持高可用性。 Leader 失效检测 集群中的各个节点包括 Followers 和 Observers会定期检测 Leader 节点的健康状态。如果节点发现 Leader 失效例如无法与 Leader 通信或未能及时响应则认为当前 Leader 失效。 触发选举 一旦有足够多的节点检测到 Leader 失效它们会自动启动选举过程。触发选举的节点会向集群中广播选举请求希望其他节点投票支持自己成为新的 Leader。 投票过程 每个节点收到选举请求后会为自己投票并将投票信息广播给集群中的其他节点。节点根据收到的投票数量来判断是否获得了多数派的支持超过半数节点的投票支持。 选举规则 EPOCH选举周期优先 每个服务器都有一个称为EPOCH的选举周期标识。EPOCH较大的服务器将优先成为新的Leader。这确保了如果有多个服务器同时尝试成为Leader具有最新选举周期的服务器将胜出。 事务IDZXID优先 如果多个服务器的EPOCH相同ZooKeeper将比较它们最近的事务IDZXID。具有较大ZXID的服务器将优先成为Leader。ZXID表示ZooKeeper事务的全局唯一标识符通常较大的ZXID表示服务器上存储的最新状态。 服务器IDSID优先 如果EPOCH和ZXID都相同ZooKeeper将比较服务器的IDSID。具有较大SID的服务器将成为新的Leader。服务器ID是在集群配置时分配的唯一标识符较大的SID通常意味着服务器在集群中具有更高的优先级。 多数派原则 根据 ZooKeeper 的多数派原则只有获得多数节点的投票支持的节点才能成为新的 Leader。这确保了选出的新 Leader 能够获得集群中大部分节点的认可和支持从而确保系统的一致性和可用性。 新 Leader 确定 一旦某个节点获得了足够的投票支持它将成为新的 Leader。其他节点将根据新 Leader 的通知和广播更新自己的状态标记新的 Leader。 系统恢复 新的 Leader 被选举出来后它将接管集群的所有写请求和事务处理确保集群状态的一致性和可用性。集群恢复正常运行继续处理客户端的请求和事务操作。
6注意事项
超时机制 在选举过程中每个节点会设置一个超时时间。如果在超时时间内没有达成半数投票节点将重新发起新一轮的选举确保集群能够迅速恢复和响应。 节点状态变更 节点在从 FOLLOWING 或 OBSERVING 状态切换到 LOOKING 状态后开始参与选举过程。选举完成后节点可能会切换到 FOLLOWING 或 OBSERVING 状态根据其在集群中的角色来处理读取请求或观察集群状态。 持久化和日志 ZooKeeper 通过持久化存储和写前日志WAL来保证选举操作和数据更新的持久性即使在节点故障或重启后系统状态也能够快速恢复到之前正确的状态。
5、zookeeper常用命令
1启动和管理 ZooKeeper 服务 启动 ZooKeeper 服务 bin/zkServer.sh start停止 ZooKeeper 服务 bin/zkServer.sh stop重启 ZooKeeper 服务 bin/zkServer.sh restart查看 ZooKeeper 服务状态 bin/zkServer.sh status2使用 ZooKeeper 客户端 启动 ZooKeeper 客户端 bin/zkCli.sh连接到特定的 ZooKeeper 服务器 bin/zkCli.sh -server 127.0.0.1:21813ZooKeeper 客户端命令
在启动了 ZooKeeper 客户端后可以使用以下命令 创建 znode create /my-node some data获取 znode 数据 get /my-node设置 znode 数据 set /my-node new data删除 znode delete /my-node列出 znode ls /查看 znode 状态 stat /my-node4高级管理命令 设置集群模式 在 conf/zoo.cfg 中配置多台服务器: server.1server1:2888:3888
server.2server2:2888:3888
server.3server3:2888:3888管理 ACL访问控制列表 setAcl /my-node world:anyone:r5系统服务管理
将 ZooKeeper 设置为系统服务可以简化管理 创建 Systemd 服务文件 在 /etc/systemd/system 目录下创建 zookeeper.service 文件: [Unit]
DescriptionZooKeeper
Afternetwork.target[Service]
Typeforking
ExecStart/path/to/zookeeper/bin/zkServer.sh start
ExecStop/path/to/zookeeper/bin/zkServer.sh stop
ExecReload/path/to/zookeeper/bin/zkServer.sh restart
Userzookeeper
Groupzookeeper
Restarton-failure[Install]
WantedBymulti-user.target启动和启用 ZooKeeper 服务 systemctl daemon-reload
systemctl start zookeeper
systemctl enable zookeeper四、zookeeper kafka集群搭建
1、项目需求
服务器部署192.168.20.140zookeeper、kafka192.168.20.141zookeeper、kafka192.168.20.142zookeeper、kafka
2、zookeeper集群搭建
1关闭防火墙
systemctl stop firewalld
setenforce 02安装jdk和zookeeper 3移动zookeeper到/usr/local下 mv zookeeper /usr/local/zookeeper4配置zookeeper
# 复制模板配置文件cp zoo_sample.cfg zoo.cfg#通信心跳时间Zookeeper服务器与客户端心跳时间单位毫秒
tickTime2000
#Leader和Follower初始连接时能容忍的最多心跳数(tickTime的数量)这里表示为10*2s
initLimit10
#Leader和Follower之间同步通信的超时时间这里表示如果超过5*2sLeader认 为Follwer死掉并从服务器列表中删除Follwer
syncLimit5
#修改指定保存Zookeeper中的数据的目录目录需要单独创建
dataDir/tmp/zookeeper
#添加 指定存放日志的目录目录需要单独创建
dataLogDir/tmp/logs
#客户端连接端口
clientPort2181
#添加集群信息
server.1192.168.20.140:3188:3288
server.2192.168.20.141:3188:3288
server.3192.168.20.142:3188:3288server.AB:C:D
●A是一个数字表示这个是第几号服务器。集群模式下需要在zoo.cfg中dataDir指定的目录下创建一个文件
myid这个文件里面有一个数据就是A的值Zookeeper启动时读取此文件拿到里面的数据与zoo.cfg里面
的配置信息比较从而判断到底是哪个server
●c是这个服务器Follower与集群中的Leader服务器交换信息的端口
●D是万一集群中的Leader服务器挂了需要一个端口来重新进行选举选出一个新的Leader而这个端口就
是用来执行选举时服务器相互通信的端口
如果指定节点不参加选举在末尾加observer
server.3192.168.19.102:3188:3288:observer#在每个节点上创建数据目录和日志目录
mkdir /tmp/zookeeper
mkdir /tmp/logs
#在每个节点的dataDir指定的目录下创建一个myid的文件
echo 1 /tmp/zookeeper/myid
echo 2 /tmp/zookeeper/myid
echo 3 /tmp/zookeeper/myid5将zookeeper加入到系统服务管理
cd /etc/systemd/system
vim zookeeper.service[Unit]
DescriptionZooKeeper
Afternetwork.target[Service]
Typeforking
ExecStart/usr/local/zookeeper/bin/zkServer.sh start
ExecStop/usr/local/zookeeper/bin/zkServer.sh stop
ExecReload/usr/local/zookeeper/bin/zkServer.sh restartRestarton-failure[Install]
WantedBymulti-user.target# 写入后加载配置
systemctl daemon-reload3、kafka集群搭建
1解压kafka压缩包将其移动到/usr/local目录下 2配置kafka的配置文件
cd /kafka/config
# 备份配置文件
cp server.properties{,.bak}vim server.properties
broker.id0
#21行broker的全局唯一编号每个broker不能重复因此要在其他机器上配置
broker.id1、broker.id2
listenersPLAINTEXT://192.168.19.100:9092
broker的IP需区分开来也可保持默认配置不用修改
num.network.threads3
#42行broker 处理网络请求的线程数量一般情况下不需要去修改
num.io.threads8
#45行用来处理磁盘IO的线程数量数值应该大于硬盘数
socket.send.buffer.bytes102400
#48行发送套接字的缓冲区大小
socket.receive.buffer.bytes102400
socket.request.max.bytes104857600
log.dirs/usr/local/kafka/logs
#51行接收套接字的缓冲区大小
#54行请求套接字的缓冲区大小
#31行指定监听的IP和端口可以修改每个
#60行kafka运行日志存放的路径也是数据存放的路径
num.partitions1 #65行topic在当前broker上的默认分区个数会被topic创建时的指定参数覆盖
num.recovery.threads.per.data.dir1
log.retention.hours168
#69行用来恢复和清理data下数据的线程数量
#103行segment文件数据文件保留的最长时间单位为小时默
认为7天超时将被删除
log.segment.bytes1073741824
#110行一个segment文件最大的大小默认为 1G超出将新建
一个新的segment文件
zookeeper.connect192.168.20.140:2181,192.168.20.141:2181,192.168.20.142:2181
#123行配置连接Zookeeper集群地址
# 如果设备延迟高可以将zookeeper的连接超时时间改高一些
zookeeper.connection.timeout.ms300003配置kafka启动脚本 #修改环境变量
vim /etc/profile
export KAFKA_HOME/usr/local/kafka
export PATH$PATH:$KAFKA_HOME/bin
source /etc/profile
#配置 Kafka 启动脚本
#!/bin/bash
#chkconfig:2345 22 88
#description:Kafka Service Control Script
KAFKA_HOME/usr/local/kafkacase $1 in
start)echo ---------- Kafka 启动 ------------${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties;;
stop)echo ---------- Kafka 停止 ------------${KAFKA_HOME}/bin/kafka-server-stop.sh;;
restart)$0 stop$0 start;;
status)echo ---------- Kafka 状态 ------------count$(ps -ef | grep kafka | egrep -cv grep|$$)if [ $count -eq 0 ]; thenecho kafka is not runningelseecho kafka is runningfi;;
*)echo Usage: $0 {start|stop|restart|status}exit 1;;
esac# 添加权限
chmod x /etc/init.d/kafka4启动kafka创建topic
service kafka start# 创建topic
./kafka-topics.sh --create --zookeeper 192.168.20.140:2181,192.168.20.141:2181,192.168.20.142:2181 --replication-factor 2 --partitions 3 --topic test