当前位置: 首页 > news >正文

确定网站推广目标wordpress底部自豪

确定网站推广目标,wordpress底部自豪,超级优化残剑,大名企业做网站推广大数据高级面试题 Kafka的producer如何实现幂等性? Producer 幂等性 Producer 的幂等性指的是当发送同一条消息时#xff0c;数据在 Server 端只会被持久化一次#xff0c;数据不丟不重#xff0c;但是这里的幂等性是有条件的#xff1a; 只能保证 Producer 在单个会话内…大数据高级面试题 Kafka的producer如何实现幂等性? Producer 幂等性 Producer 的幂等性指的是当发送同一条消息时数据在 Server 端只会被持久化一次数据不丟不重但是这里的幂等性是有条件的 只能保证 Producer 在单个会话内不丟不重如果 Producer 出现意外挂掉再重启是无法保证的幂等性情况下是无法获取之前的状态信息因此是无法做到跨会话级别的不丢不重;幂等性不能跨多个 Topic-Partition只能保证单个 partition 内的幂等性当涉及多个 Topic-Partition 时这中间的状态并没有同步。 幂等性要解决的问题 在 0.11.0 之前Kafka 通过 Producer 端和 Server 端的相关配置可以做到数据不丢也就是 at least once但是在一些情况下可能会导致数据重复比如网络请求延迟等导致的重试操作在发送请求重试时 Server 端并不知道这条请求是否已经处理没有记录之前的状态信息所以就会有可能导致数据请求的重复发送这是 Kafka 自身的机制异常时请求重试机制导致的数据重复。 对于大多数应用而言数据保证不丢是可以满足其需求的但是对于一些其他的应用场景比如支付数据等它们是要求精确计数的这时候如果上游数据有重复下游应用只能在消费数据时进行相应的去重操作应用在去重时最常用的手段就是根据唯一 id 键做 check 去重。 在这种场景下因为上游生产导致的数据重复问题会导致所有有精确计数需求的下游应用都需要做这种复杂的、重复的去重处理。试想一下如果在发送时系统就能保证 exactly once这对下游将是多么大的解脱。这就是幂等性要解决的问题主要是解决数据重复的问题正如前面所述数据重复问题通用的解决方案就是加唯一 id然后根据 id 判断数据是否重复Producer 的幂等性也是这样实现的这一小节就让我们看下 Kafka 的 Producer 如何保证数据的 exactly once 的。 幂等性的实现原理at least once 幂等 exactly once Kafka Producer 在实现时有以下两个重要机制 PIDProducer ID用来标识每个 producer clientsequence numbersclient 发送的每条消息都会带相应的 sequence numberServer 端就是根据这个值来判断数据是否重复。 每个 Producer 在初始化时都会被分配一个唯一的 PIDbroker端会为producer每个Partition维护一个sequence number映射。sequence number时从0开始单调递增的。Producer 在发送数据时将会给每条 msg 标识一个 sequence numberServer 也就是通过这个来验证数据是否重复。这里的 PID 是全局唯一的Producer 故障后重新启动后会被分配一个新的 PID这也是幂等性无法做到跨会话的一个原因。 新消息的sequence number - broker端维护的sequence number大1说broker会接受处理这条消息。 新消息的sequence number比broker端维护的sequence number要小说明时重复消息broker可以将其直接丢弃 新消息的sequence number比broker端维护的sequence number要大过1说明中间存在了丢数据的情况那么会响应该情况对应的Producer会抛出OutOfOrderSequenceException。 Producer如何开启幂等性 Properties.put(“enable.idempotence”, ture) Properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG true)。 Kafka的ISR和OSR的作用分别是什么? 在Kafka中生产者和消费者只与leader 副本进行交互而 follow 副本只负责消息的同步很多时候 follower 副本中的消息相对 leader 副本而言会有一定的滞后。根据同步情况的不同kafka将副本分为了以下几种集合 AR ( Assigned Replicas 分区中的所有副本统称为 ARISROn-Sync Replicas所有与 leader 副本保持一定程度同步的副本包括 leader 副本在内〕组成OSR (Out-of-Sync Replicas 与 leader 副本同步滞后过多的副本不包 leader 副本组成 ISR 与 OSR并不是固定的 leader 副本负责维护和跟踪 ISR 集合中所有 follower 的滞后状态 follower 副本落后太多或失效时 leader 副本会把它从 ISR 集合中剔除 如果 OSR 集合中有 follower 副本 ”追上“ leader 副本那么 leader 副本它从 OSR 集合转移至 ISR 集合 Kafka生产者如何实现幂等性写入和事务? Kafka事务 Transactions Kafka 的事务处理主要是允许应用可以把消费和生产的 batch 处理涉及多个 Partition在一个原子单元内完成操作要么全部完成、要么全部失败。为了实现这种机制我们需要应用能提供一个唯一 id即使故障恢复后也不会改变这个 id 就是 TransactionnalId也叫 txn.id后面会详细讲述txn.id 可以跟内部的 PID 1:1 分配它们不同的是 txn.id 是用户提供的而 PID 是 Producer 内部自动生成的并且故障恢复后这个 PID 会变化有了 txn.id 这个机制就可以实现多 partition、跨会话的 EOS 语义。 当用户使用 Kafka 的事务性时Kafka 可以做到的保证 跨会话的幂等性写入即使中间故障恢复后依然可以保持幂等性 跨会话的事务恢复如果一个应用实例挂了启动的下一个实例依然可以保证上一个事务完成commit 或者 abort 跨多个 Topic-Partition 的幂等性写入Kafka 可以保证跨多个 Topic-Partition 的数据要么全部写入成功要么全部失败不会出现中间状态。 Kafka 事务的主要特点如下 原子性Kafka 事务中的所有操作要么全部成功完成要么全部回滚。这确保了消息的一致性。事务性消息Kafka 事务允许将多个消息作为一个事务进行发送。只有当事务中的所有消息都成功写入到 Kafka 集群中时事务才会被提交。跨分区和跨主题事务Kafka 事务支持在多个分区和主题之间进行事务性操作。这使得可以在不同的分区和主题之间保持一致性。事务协调器Kafka 使用一个事务协调器来管理和协调事务的执行。事务协调器负责分配事务 ID、管理事务的状态和元数据并确保事务的原子性。 Kafka消费者位置提交方式有哪些?分别什么场景下使用? 自动提交Auto Commit消费者通过设置enable.auto.commit为true来启用自动提交消费位置的功能。在使用自动提交时消费者会定期将当前消费的最新 offset 自动提交到 Kafka。该方式适用于简单的消费场景不需要太精确的消费位置控制且可以容忍一定程度的数据重复或丢失。手动提交Manual Commit消费者通过显式调用commitSync()或commitAsync()方法来手动提交消费位置。手动提交可以根据业务逻辑控制何时提交消费位置并且可以指定具体的消费位置进行提交。这种方式适用于需要更精确的消费位置控制以及对数据的准确性要求较高的场景。 使用自动提交和手动提交的场景如下 自动提交适用场景 简单消费逻辑对于简单的消费逻辑不需要过多地控制消费位置也不需要精确控制数据的准确性。 较低的可靠性要求可以容忍一定程度的数据重复或丢失如日志收集等场景。 手动提交适用场景 精确消费位置控制需要精确控制消费位置例如按照一定的条件进行批量消费、跳过某些消息等。 较高的可靠性要求对数据的准确性要求较高不能容忍数据重复或丢失。 批量提交消费位置通过手动提交可以灵活控制提交消费位置的时机可以根据实际情况进行批量提交。 需要注意的是手动提交消费位置时需要注意提交的顺序和频率以免引入不必要的延迟或增加系统负载。同时手动提交也需要处理可能出现的提交失败或错误的情况并进行相应的重试和异常处理。 Kafka消息丢失场景有哪些?如何避免? Kafka 可能发生消息丢失的场景有以下几种 Producer 端丢失 目前 Kafka Producer 是异步发送消息的如果你的 Producer 客户端使用了 producer.send(msg) 方法来发送消息方法会立即返回但此时并不能代表消息已经发送成功了。如果消息再发送的过程中发生了网络抖动那么消息可能没有传递到 Broker那么消息可能会丢失。如果发送的消息本身不符合如大小超过了 Broker 的承受能力等。 Broker 端丢失 Leader Broker 宕机了触发选举过程集群选举了一个落后 Leader 太多的 Broker 作为 Leader那么落后的那些消息就会丢失了。Kafka 为了提升性能使用页缓存(Page Cache)机制将消息写入页缓存而非直接持久化至磁盘采用了异步批量刷盘机制也就是说按照一定的消息量和时间间隔去刷盘刷盘的动作由操作系统来调度的如果刷盘之前Broker 宕机了重启后在页缓存的这部分消息则会丢失。 Consumer 端丢失 Consumer 没有正确消费消息就把位移提交了导致 Kafka 认为该消息已经被消费了从而导致消息丢失。场景1获取到消息后直接提交位移了然后再处理消息。这样在提交位移后处理完消息前如果程序挂掉这部分消息就算是丢失了。多线程并发消费消息且开启了自动提交导致消费完成之前程序就自动提交了位移如果程序挂掉也会出现消息丢失。 避免 Producer 端丢失 不要使用 producer.send(msg)而要使用 producer.send(msg, callback)。记住一定要使用带有回调通知的 send 方法。设置 retries 为一个较大的值。这里的 retries 同样是 Producer 的参数对应前面提到的 Producer 自动重试。当出现网络的瞬时抖动时消息发送可能会失败此时配置了 retries 0 的 Producer 能够自动重试消息发送避免消息丢失。 避免 Broker 端丢失 设置 acks all。acks 是 Producer 的一个参数代表了你对“已提交”消息的定义。如果设置成 all则表明所有副本 Broker 都要接收到消息该消息才算是“已提交”。这是最高等级的“已提交”定义。设置 unclean.leader.election.enable false。这是 Broker 端的参数它控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多那么它一旦成为新的 Leader必然会造成消息的丢失。故一般都要将该参数设置成 false即不允许这种情况的发生。设置 replication.factor 3。这也是 Broker 端的参数。其实这里想表述的是最好将消息多保存几份毕竟目前防止消息丢失的主要机制就是冗余。设置 min.insync.replicas 1。这依然是 Broker 端参数控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。确保 replication.factor min.insync.replicas。如果两者相等那么只要有一个副本挂机整个分区就无法正常工作了。我们不仅要改善消息的持久性防止数据丢失还要在不降低可用性的基础上完成。推荐设置成 replication.factor min.insync.replicas 1。 避免 Consumer 端丢失 确保消息消费完成再提交。Consumer 端有个参数 enable.auto.commit最好把它设置成 false并采用手动提交位移的方式。就像前面说的这对于单 Consumer 多线程处理的场景而言是至关重要的。 Kafka参数调优的注意事项 Broker 参数调优 num.network.threads 和 num.io.threads调整网络和I/O线程的数量以适应集群的负载。socket.send.buffer.bytes 和 socket.receive.buffer.bytes调整TCP Socket的发送和接收缓冲区大小以提高网络吞吐量。log.segment.bytes设置每个日志段的大小以平衡磁盘空间和读写性能。log.roll.hours 和 log.retention.hours控制日志段的滚动和保留策略以适应数据的存储和保留需求。 Consumer 参数调优 max.poll.records 和 fetch.max.bytes调整消费者每次拉取的消息数量和总字节数以提高拉取性能。fetch.min.bytes 和 fetch.max.wait.ms控制消费者的拉取策略以平衡吞吐量和延迟。enable.auto.commit 和 auto.commit.interval.ms配置消费者的自动提交偏移量的策略以确保消费的消息不会重复或丢失。 Producer 参数调优 acks设置生产者的消息确认机制0、1或all以平衡消息的可靠性和性能。batch.size 和 linger.ms控制生产者的消息批量发送行为以提高发送的吞吐量。compression.type设置消息的压缩算法以减小网络传输和磁盘存储的开销。 JVM 参数调优 Xmx 和 Xms调整Kafka Broker、Consumer和Producer的Java堆内存大小以适应不同的负载和数据量。XX:UseG1GC选择适合的垃圾回收器以提高JVM性能和内存管理效率。 网络和硬件调优 网络带宽和延迟确保Kafka集群的网络带宽和延迟满足实际需求。磁盘和存储选择高速和可靠的磁盘存储以确保数据的持久性和读写性能。 在调优 Kafka 参数时需要根据实际情况进行测试和性能分析以找到最优的参数配置。此外还可以使用Kafka提供的监控工具和指标来实时监测集群的性能和健康状态进一步进行调优和优化。 Kafka消费组重新平衡流程 重平衡的作用让消费者分组内消费者消费哪些主题分区达成一致。重平衡需要借助Kafka Broker端的Coordinator组件在Coordinator的帮助下完成消费者组的分区重新分配。触发重平衡的三个条件组内成员数量发生变化消费者组订阅主题的数量发生变化订阅主题的分区数量发生变化。重平衡的流程简约版 加入组JoinGroup当消费者心跳包响应 REBALANCE_IN_PROGRESS 时说明消费组正在重平衡此时消费者会停止消费并且发送请求加入消费组同步更新分配方案当 Coordinator 收到所有组内成员的加入组请求后会选出一个consumer Leader然后让consumer Leader进行分配分配完后会将分配方案放入SyncGroup请求中发送会CoordinatorCoordinator根据分配方案发送给每个消费者。 加入组 当组内成员加入组时它会向 coordinator 发送JoinGroup请求。在该请求中每个成员都要将自己订阅的主题上报 这样协调者就能收集到所有成员的订阅信息。一旦收集了全部成员的JoinGroup请求后 Coordinator 会从这些成员中选择一个担任这个消费者组的领导者。通常情况下第一个发送JoinGroup请求的成员自动成为领导者。领导者消费者的任务是收集所有成员的订阅信息 然后根据这些信息制定具体的分区消费分配方案。选出领导者之后 Coordinator 会把消费者组订阅信息封装进JoinGroup请求的 响应体中 然后发给领导者由领导者统一做出分配方案后 进入到下一步。 等待领导者消费者 领导者消费者Leader Consumer分配方案。领导者向 Coordinator 发送SyncGroup请求 将刚刚做出的分配方案发给协调者。值得注意的是其他成员也会向 Coordinator 发送SyncGroup请求 只不过请求体中并没有实际的内容。这一步的主要目的是让 Coordinator 接收分配方案 然后统一以 SyncGroup 响应的方式分发给所有成员 这样组内所有成员就都知道自己该消费哪些分区了。 重平衡的影响Rebalance过程的表现有些类似JVM FGC的情况期间整个应用都会阻塞所有Consumer实例都会停止消费等待Rebalance完成。Rebalance过程中所有Consumer实例都会参与重新分配。会导致TCP重新建立连接是一个比较慢的操作浪费资源。Rebalance的耗时取决于Consumer Group下的实例数量一旦实例数过多耗时极长会造成大量消费延迟。如何避免重平衡高峰期尽量避免对于kafka消费者组进行分区扩容操作以免触发Rebalance流程。消费者里面的业务逻辑尽量轻量化避免一些重的业务逻辑操作触发消费者heartbeat超时造成消费者下线从而触发Rebalance流程。合理调整consumer端的相关参数 a. session.timeout.msConsumer Group内实例的心跳超时时间默认值是 10s。b. heartbeat.interval.ms即心跳请求频率频繁发送心跳请求会额外消耗带宽资源但是能够更及时的触发Rebalance默认值为 3s。c. max.poll.interval.ms调用poll方法的时间间隔默认值为 5min。期间没消费完poll回的消息Coordinator会开启新一轮Rebalance。 Kafka消费者分区分配策略 Range分配策略 Range分配策略是面向每个主题的首先会对同一个主题里面的分区按照序号进行排序并把消费者线程按照字母顺序进行排序。然后用分区数除以消费者线程数量来判断每个消费者线程消费几个分区。如果除不尽那么前面几个消费者线程将会多消费一个分区。 RoundRobin分配策略 将消费组内所有消费者以及消费者所订阅的所有topic的partition按照字典序排序然后通过轮询算法逐个将分区以此分配给每个消费者。 如果同一消费组内所有的消费者订阅的消息都是相同的那么 RoundRobin 策略的分区分配会是均匀的。如果同一消费者组内所订阅的消息是不相同的那么在执行分区分配的时候就不是完全的轮询分配有可能会导致分区分配的不均匀。如果某个消费者没有订阅消费组内的某个 topic那么在分配分区的时候此消费者将不会分配到这个 topic 的任何分区。 Sticky分配策略这种分配策略是在kafka的0.11.X版本才开始引入的是目前最复杂也是最优秀的分配策略。分配策略实现了两个目的 分区的分配要尽可能的均匀分区的分配尽可能的与上次分配的保持相同。 ClickHouse中的ReplicatedMergeTree是什么?有什么优点? ReplicatedMergeTree引擎是MergeTree的派生引擎它在MergeTree的基础上加入了分布式协同的能力只有使用了ReplicatedMergeTree复制表系列引擎才能应用副本的能力。或者用一种更为直接的方式理解即使用ReplicatedMergeTree的数据表就是副本。 ReplicatedMergeTree的核心逻辑中大量运用了ZooKeeper的能力以实现多个ReplicatedMergeTree副本实例之间的协同包括主副本选举、副本状态感知、操作日志分发、任务队列和BlockID去重判断等。在执行INSERT数据写入、MERGE分区和MUTATION操作的时候都会涉及与ZooKeeper的通信。 优点 高可靠性ReplicatedMergeTree使用多个副本Replica存储数据确保数据的冗余备份。当某个节点发生故障或不可用时系统可以从其他副本中获取数据保证数据的可靠性和可用性。数据冗余和容错性每个副本都是完整的数据副本即使某些副本不可用数据仍然可从其他副本中读取。该冗余机制提供了容错性防止数据丢失和服务中断。强一致性ReplicatedMergeTree引擎采用分布式一致性协议确保所有副本中的数据保持一致。这意味着数据更新和变更操作在所有副本上都会同步进行保证了数据的强一致性。数据合并和压缩ReplicatedMergeTree通过合并相邻的数据块来减少磁盘空间使用。此外它还支持数据压缩算法可以进一步减小数据的存储空间提高存储效率。可扩展性ReplicatedMergeTree引擎支持水平扩展即向集群中添加更多的节点或副本以增加数据处理能力和存储容量。这使得系统可以满足不断增长的数据量和负载需求。实时性能ReplicatedMergeTree引擎在处理大规模数据集时表现出色能够快速执行聚合、过滤和排序等操作。它适用于需要实时分析和查询的场景如日志分析和实时报表生成 ClickHouse的分布式查询流程是什么样的? ClickHouse的分布式查询流程如下 查询解析 用户提交一个SQL查询请求该请求被ClickHouse接收并解析。查询包括选择要从数据库中检索的数据、筛选条件、排序方式等等。 查询计划生成 一旦查询被解析ClickHouse会生成一个查询计划。这个计划决定了如何在分布式环境中执行查询包括选择使用哪些分布式表、如何分割数据和并行执行查询。 分布式查询优化 ClickHouse在生成查询计划后会进行一系列优化操作以提高查询性能。这可能包括重新排序操作、剪枝、推测执行等。 查询分发 生成的查询计划告诉ClickHouse如何将查询分发到各个分布式节点上。这些节点通常是集群中的多个物理或虚拟机器它们存储着数据的分片。 分片扫描 一旦查询到达各个分布式节点这些节点会扫描它们所负责的数据分片以满足查询的条件。 局部计算 分布式节点在本地执行计算这可以包括过滤、聚合和计算表达式。每个节点只需要处理自己负责的数据分片。 局部结果汇总 每个分布式节点生成部分查询结果。这些局部结果会被收集并汇总以生成最终的查询结果。这通常涉及将各个节点的结果合并、合计、排序等操作。 查询结果返回 最终的查询结果会被返回给用户用户可以在客户端应用程序中处理这些结果。 整个过程中ClickHouse利用其列式存储引擎、数据压缩技术和并行处理能力以高效处理大规模数据集。该数据库系统还提供了许多高级功能如分区表、合并树引擎等以优化查询性能和数据管理。这使得ClickHouse在数据分析场景中非常受欢迎。 ClickHouse存储结构及优势 ClickHouse的存储结构和其优势是为了支持高性能大数据分析而设计的。以下是ClickHouse的存储结构和相关优势 存储结构 列式存储ClickHouse采用列式存储即将表中的每个列单独存储而不是按行存储。这种存储方式使得数据压缩和检索效率更高。 分区表ClickHouse支持分区表允许您将数据按时间或其他列的值划分为不同的分区。这有助于加速查询和数据管理特别是在大数据集上。 MergeTree引擎ClickHouse的核心存储引擎是MergeTree它支持数据合并和剪枝操作使其非常适合时间序列数据的存储和分析。 数据压缩ClickHouse使用多种数据压缩算法减小存储空间占用并加速数据的读取。数据在存储时经过压缩但在查询时仍然可以进行快速解压缩。 分布式存储ClickHouse支持分布式存储允许数据分布在多个节点上。这提高了可伸缩性和容错性。 合并树ClickHouse使用合并树技术来加速查询通过将数据在不同层次进行合并减少了需要扫描的数据量。 优势 高性能查询ClickHouse的列式存储和压缩技术使其能够执行高性能的查询特别适合于聚合、过滤和分析操作。合并树技术进一步提高了查询性能。 快速数据加载ClickHouse可以高效地加载大量数据尤其是在数据采用列式存储和分区表的情况下数据加载速度很快。 可伸缩性ClickHouse支持分布式架构允许在需要时添加更多节点以应对不断增长的数据量和查询负载。 低存储成本由于数据压缩和列式存储ClickHouse通常需要较少的存储空间从而降低了存储成本。 容错性ClickHouse的分布式架构具有高可用性和容错性可以自动处理节点故障并保持数据一致性。 支持复杂查询ClickHouse支持SQL查询语言可以处理复杂的分析查询包括聚合、连接、筛选等。 总的来说ClickHouse是一种强大的列式数据库系统特别适用于大规模数据分析。其存储结构和优势使其成为处理大数据集的理想选择提供高性能、可伸缩性和容错性。 ClickHouse各种索引的区别和使用场景 ClickHouse支持不同类型的索引每种索引都有其自身的优点和适用场景。以下是ClickHouse中一些常见的索引类型、其区别以及适用场景 Primary Key主键索引 区别主键索引是最基本的索引类型用于唯一标识表中的每一行。它是一个列或列的组合确保表中没有重复的主键值。 使用场景主键索引适用于需要快速查找特定行或执行合并操作的场景。它还用于确保表中数据的唯一性。 MergeTree索引 区别MergeTree索引是ClickHouse的默认存储引擎主要用于时间序列数据。它将数据按时间分区并支持数据合并操作以加速查询。 使用场景适用于时间序列数据如日志、指标数据等。它支持按时间范围快速筛选数据。 Bitmap索引 区别Bitmap索引是一种位图索引它使用位图来表示行是否包含特定值。它通常用于高基数列唯一值较多的列。 使用场景Bitmap索引适用于高基数列如性别、国家等其中值的唯一性较高。它可以快速筛选包含特定值的行。 Range索引 区别Range索引允许您指定列的范围并且只存储该范围内的数据。它有助于减小索引的大小提高查询性能。 使用场景适用于需要在特定范围内查询数据的情况如按日期范围查询或按数值范围查询。 Set索引 区别Set索引用于处理具有有限值集合的列它将列中的不同值映射到一个有限的编号集合中以节省存储空间。 使用场景适用于列中只有有限的不同值如枚举值或特定状态的标志。 Distributed索引 区别Distributed索引是ClickHouse中用于分布式查询的索引它指导查询在分布式环境中执行。 使用场景适用于分布式查询确保查询可以在集群中的多个节点上并行执行。 不同类型的索引可以根据具体的数据和查询需求进行选择。ClickHouse允许在表创建时选择合适的索引类型以便更好地支持不同的查询和分析操作。索引的选择应该根据数据量、查询模式、数据分布和性能需求等因素来决定。 一级索引ClickHouse 的表使用主键索引才能让数据查询有更好的性能这是因为数据和索引会按主键进行排序存储用主键索引查询数据可以很快地处理数据并返回结果。二级索引/跳数索引min_max存储每个块的索引表达式的最小值和最大值set可以理解为列出字段内所有出现的枚举值可以设置取多少条Bloom Filter允许对集合成员进行高效的是否存在测试但代价是有轻微的误报 ClickHouse查询性能优化 单表查询RBO、CBO prewhere替代where prewhere 和 where 语句的作用相同用来过滤数据。不同之处在于 prewhere 只支持*MergeTree族系列引擎的表首先会读取指定的列数据来判断数据过滤等待数据过滤之后再读取 Select 声明的列字段来补全其余属性。 当查询列明显多于筛选列时使用 prewhere 可十倍提升查询性能prewhere 会自动优化执行过滤阶段的数据读取方式降低 IO 操作。 数据采样 通过采样运算可极大提升数据分析的性能采样修饰符SAMPLE只有在 MergeTree engine 表中才有效且在创建表时需要指定采样策略。 列裁剪与分区裁剪 数据量太大时应避免使用select *操作查询的性能会与查询的字段大小和数量成反比字段越少消耗的 IO 资源越少性能就会越高。分区裁剪就是只读取需要的分区在过滤条件中指定。 order by结合where、limit 千万以上数据集进行 order by 查询时需要搭配 where 条件和 limit 语句一起使用。 避免构建虚拟列 如非必须不要在结果集上构建虚拟列虚拟列非常消耗资源浪费性能可以考虑在前端进行处理或者在表中构造实际字段进行额外存储。 uniqCombined替代distinct uniqCombined 相比 distinct 性能可提升 10 倍以上uniqCombined 底层采用类似 HyperLogLog 算法实现能接收 2% 左右的数据误差可直接使用这种去重方式提升查询性能。Count(distinct) 会使用 uniqExact 精确去重。 不建议在千万级不同数据上执行 distinct 去重查询改为近似去重 uniqCombined。 多表优化 用IN代替JOIN 大小表JOIN 多表 join 时要满足小表在右的原则右表关联时被加载到内存中与左表进行比较ClickHouse 中无论是 Left join 、Right join 还是 Inner join 永远都是拿着右表中的每一条记录到左表中查找该记录是否存在所以右表必须是小表。 注意谓词下推 其他优化 1关闭虚拟内存物理内存和虚拟内存的数据交换会导致查询变慢 2为每一个账户添加join_use_nulls配置左表中的一条记录在右表中不存在右表的相应字段会返回该字段相应数据类型的默认值而不是标准SQL中的Null值 3对 ClickHouse 数据的增删改操作都会产生新的临时分区会给 MergeTree 带来额外的合并任务。因此数据变更操作不宜太频繁这样会产生非常多的临时分区。一次操作的数据也不能太快。临时分区写入过快会导致 Merge 速度跟不上而报错。  官方一般建议一秒钟发起一次左右的写入操作每次操作写入的数据量保持在 2W~5W 之间具体根据服务器性能而定。 4分布式表使用GLOBAL两张分布式表上的 IN 和 JOIN 之前必须加上GLOBAL关键字右表只会在接收查询请求的那个节点查询一次并将其分发到其他节点上。如果不加 GLOBAL 关键字的话每个节点都会单独发起一次对右表的查询而右表又是分布式表就导致右表一共会被查询N^2次N是该分布式表的分片数量这就是查询放大会带来很大开销。 EXPLAIN命令查询执行计划进行优化 查找执行计划中的性能瓶颈如全表扫描、文件排序等。 根据瓶颈调整查询语句、创建或修改索引、优化表结构等。 重新执行查询并比较执行计划和性能。 优化表连接和子查询 尽量避免笛卡尔积连接使用JOIN条件过滤无关记录。优先使用INNER JOIN避免使用OUTER JOIN。将子查询替换为JOIN或EXISTS子句提高性能。 合理使用聚合函数和窗口函数 避免在大表上使用聚合函数如COUNT()、SUM()等。使用窗口函数进行分组和排序操作提高查询性能。 避免全表扫描和降低数据读取量 尽量使用索引进行查询避免全表扫描。使用WHERE子句过滤无关记录减少数据读取量。 优化数据过滤和排序操作 使用索引进行过滤和排序操作。避免在ORDER BY子句中使用函数和表达式。 使用分区和索引进行查询优化 调整并发设置和内存限制: 根据系统资源和查询需求调整ClickHouse的并发设置如max_threads参数。调整内存限制参数如max_memory_usage以保证查询能在限定的资源下高效运行。 处理大数据量和复杂查询场景: 对于大数据量查询可以使用LIMIT子句分批查询降低内存消耗。 对于复杂查询可以将查询拆分为多个简单查询使用临时表或物化视图存储中间结果降低查询复杂度。 SQL查询优化的最佳实践和常见问题解决方案: 使用EXPLAIN命令查看查询执行计划找到性能瓶颈。合理设计表结构、索引和分区以提高查询性能。避免使用不必要的聚合函数、窗口函数和JOIN操作。避免全表扫描尽量使用索引进行查询。使用WHERE子句过滤无关记录降低数据读取量。调整ClickHouse的并发设置和内存限制提高查询性能。对于大数据量和复杂查询场景采用分批查询、拆分查询和使用临时表等策略降低查询复杂度。 Flink窗口机制有哪些?应用场景分别是什么? 滚动窗口Tumbling Windows 滚动窗口的 assigner 分发元素到指定大小的窗口。滚动窗口的大小是固定的且各自范围之间不重叠。 比如说如果你指定了滚动窗口的大小为 5 分钟那么每 5 分钟就会有一个窗口被计算且一个新的窗口被创建。**适用场景**适合做每个时间段的聚合计算BI分析。例如统计某页面每分钟点击的pv。场景1我们需要统计每一分钟中用户购买的商品的总数需要将用户的行为事件按每一分钟进行切分这种切分被称为翻滚时间窗口Tumbling Time Window。 滑动窗口Sliding Windows 与滚动窗口类似滑动窗口的 assigner 分发元素到指定大小的窗口窗口大小通过 window size 参数设置。 滑动窗口需要一个额外的滑动距离window slide参数来控制生成新窗口的频率。 因此如果 slide 小于窗口大小滑动窗口可以允许窗口重叠。这种情况下一个元素可能会被分发到多个窗口。**适用场景**对最近一段时间段内进行统计(如某接口近几分钟的失败调用率)场景比如每隔3秒计算最近5秒内每个基站的日志数量、每30秒计算一次最近一分钟用户购买的商品总数。 会话窗口Session Windows 会话窗口的 assigner 会把数据按活跃的会话分组。 与滚动窗口和滑动窗口不同会话窗口不会相互重叠且没有固定的开始或结束时间。 会话窗口在一段时间没有收到数据之后会关闭即在一段不活跃的间隔之后。 会话窗口的 assigner 可以设置固定的会话间隔session gap或 用 session gap extractor 函数来动态地定义多长时间算作不活跃。 当超出了不活跃的时间段当前的会话就会关闭并且将接下来的数据分发到新的会话窗口。会话窗口并没有固定的开始或结束时间所以它的计算方法与滑动窗口和滚动窗口不同。在 Flink 内部会话窗口的算子会为每一条数据创建一个窗口 然后将距离不超过预设间隔的窗口合并。 想要让窗口可以被合并会话窗口需要拥有支持合并的 Trigger 和 Window Function 比如说 ReduceFunction、AggregateFunction 或 ProcessWindowFunction。**适用场景**在这种用户交互事件流中我们首先想到的是将事件聚合到会话窗口中一段用户持续活跃的周期由非活跃的间隙分隔开。场景一如上图所示就是需要计算每个用户在活跃期间总共购买的商品数量如果用户30秒没有活动则视为会话断开假设raw data stream是单个用户的购买行为流。场景二3秒内如果没有数据进入则计算每个基站的日志数量场景三比如音乐 app 听歌的场景我们想统计一个用户在一个独立的 session 中听了多久的歌曲如果超过15分钟没听歌那么就是一个新的 session 了 全局窗口Global Windows 全局窗口的 assigner 将拥有相同 key 的所有数据分发到一个全局窗口。 这样的窗口模式仅在你指定了自定义的 trigger 时有用。 否则计算不会发生因为全局窗口没有天然的终点去触发其中积累的数据。 Flink窗口函数的应用 ReduceFunction 和AggregateFunction 应用示例用于聚合窗口内的数据例如计算窗口内元素的总和、平均值、最大值、最小值等。 WindowFunction 应用示例用于执行自定义操作如在窗口内执行复杂计算或将窗口内的数据写入外部存储系统。 ProcessWindowFunction 应用示例适用于需要访问窗口内所有元素的场景可以实现更灵活的操作如窗口内的元素排序、分组、过滤等。 FoldFunction 应用示例用于在窗口内构建累积的结果可以实现自定义的累积逻辑 ApplyFunction 应用示例用于将窗口内的数据传递给外部系统如调用外部服务或发送数据到外部数据存储。 CoProcessFunction (双流处理) 应用示例适用于处理多个输入流的情况可以实现各种复杂的联合操作如事件匹配数据关联等。 WindowProcessFunction 应用示例类似于 ProcessWindowFunction,但提供更丰富的上下文信息可用于更复杂的处理逻辑如窗口触发条件的动态调整 KeyedProcessFunction 应用示例适用于需要在键控窗口上执行自定义操作的场景如处理超时事件、实现复杂的状态机等。 SideOutput 应用示例可用于将窗口内的部分数据输出到不同的侧输出流用于实现分流和筛选操作。 Flink中数据倾斜原因和解决方案 数据倾斜的定位定位反压 Flink Web UI 自带的反压监控Flink Task Metrics通过监控反压的信息可以获取到数据处理瓶颈的 Subtask 确定数据倾斜 Flink Web UI 自带Subtask 接收和发送的数据量 倾斜原因Flink 任务出现数据倾斜的直观表现是任务节点频繁出现反压但是增加并行度后并不能解决问题部分节点出现 OOM 异常是因为大量的数据集中在某个节点上导致该节点内存被爆任务失败重启。场景业务上有严重的数据热点、技术上大量使用了 KeyBy、GroupBy 等操作错误的使用了分组 Key人为产生数据热点 key 分布不均匀的无统计场景。例如上游数据分布不均匀使用keyBy来打散数据。解决思路 通过添加随机前缀打散 key 的分布使得数据不会集中在几个 Subtask。key 分布不均匀的统计场景解决思路聚合统计前先进行预聚合例如两阶段聚合加盐局部聚合去盐全局聚合。 问题1 flink实时程序在线上环境上运行遇到一个很诡异的问题flink使用eventtime读取kafka数据发现无法触发计算。—事件时间倾斜 问题描述 watermark的传递机制当并行执行的情况下每次接受的水印发送的水印都是最小的木桶效应。但是当某个分区始终无数据的时候就不会更新该分区的watermark值那么窗口就一直不会被触发计算。这种现象在某些hash极端导致数据倾斜很普遍。 场景Flink消费Kafka上下游并行度不一致导致的数据倾斜 解决方案我们 Flink 消费 Kafka 的数据时是推荐上下游并行度保持一致即 Kafka 的分区数等于 Flink Consumer 的并行度。但是会有一种情况为了加快数据的处理速度来设置 Flink 消费者的并行度大于 Kafka 的分区数。如果你不做任何的设置则会导致部分 Flink Consumer 线程永远消费不到数据。需要设置 Flink 的 Redistributing也就是数据重分配。Rebalance 分区策略数据会以 round-robin 的方式对数据进行再次分区可以全局负载均衡。Rescale 分区策略基于上下游的并行度会将数据以循环的方式输出到下游的每个实例中。 方案二 flink 1.11新增了支持watermark空闲检测WatermarkStrategy.withIdleness()方法允许用户在配置的时间内即超时时间内没有记录到达时将一个流标记为空闲从而进一步支持 Flink 正确处理多个并发之间的事件时间倾斜的问题并且避免了空闲的并发延迟整个系统的事件时间。通过将 Kafka 连接器迁移至新的接口FLINK-17669用户可以受益于针对单个并发的空闲检测。 Flink Watermark机制 Apache Flink中的Watermark机制是用于处理事件时间event time数据的一种重要机制用于解决乱序事件和迟到事件等与时间相关的问题。Watermark是一种特殊类型的记录它包含了一个时间戳用于表示事件时间流中的时间进展。Watermark机制有助于Flink处理有序性和时序性的数据流。 以下是关于Flink Watermark机制的一些关键概念和作用 Watermark生成 Watermark是由数据源如Kafka、Kinesis等生成的或者可以在Flink程序中手动创建。Watermark通常与事件时间字段的时间戳相关联表示事件时间的进度。Watermark会周期性地或基于数据的实际时间戳生成并随着事件时间的增长而递增。 Watermark传播 Watermark在数据流中随事件一起传播。Flink的操作符会接收Watermark并将其传递到下游操作符。在传递过程中Watermark的时间戳通常是逐步递增的。 Watermark的作用 Watermark用于告知Flink何时认为特定时间戳之前的数据已经全部到达即事件时间窗口可以安全地关闭和触发。Watermark还用于处理迟到事件。当事件时间窗口已关闭时Flink可以继续接收后续迟到的事件并将它们分配到已关闭的窗口中。 Watermark与窗口操作 在窗口操作中Flink会根据Watermark的时间戳来触发窗口计算。当Watermark达到窗口结束时间时Flink将关闭窗口并进行计算。Watermark也用于处理乱序事件。当Watermark到达某个时间戳时Flink可以安全地关闭该时间戳之前的窗口。 Watermark的延迟 Watermark的延迟是指从数据生成到Watermark生成的时间间隔。这个延迟是为了处理迟到事件和乱序事件以确保数据流中的数据能够被正确处理。延迟的设置需要权衡较长的延迟可以处理更多的迟到事件但可能会导致延迟窗口关闭。较短的延迟可以更快地关闭窗口但可能错过迟到事件。 Flink的Watermark机制有助于处理事件时间数据确保数据在流处理中的正确处理和窗口操作的准确触发。它使Flink能够处理乱序事件、迟到事件以及具有时间维度的流处理需求。在Flink应用程序中合理设置Watermark机制是至关重要的以确保流处理的正确性和性能。 FlinkCEP复杂事件处理应用 概述FlinkCEP是在Flink上层实现的复杂事件处理库。 它可以让你在无限事件流中检测出特定的事件模型有机会掌握数据中重要的那部分。 模式API 单个模式 一个模式可以是一个单例或者循环模式。单例模式只接受一个事件循环模式可以接受多个事件。 组合模式 单个模式连接起来组成一个完整的模式序列 严格连续期望所有匹配的事件严格的一个接一个出现中间没有任何不匹配的事件。 next()指定严格连续 松散连续: 忽略匹配的事件之间的不匹配的事件。 followedBy()指定松散连续 不确定的松散连续: 更进一步的松散连续允许忽略掉一些匹配事件的附加匹配。 followedByAny()指定不确定的松散连续。 模式组 检测模式 在指定了要寻找的模式后该把它们应用到输入流上来发现可能的匹配了。为了在事件流上运行你的模式需要创建一个PatternStream。 给定一个输入流input一个模式pattern和一个可选的用来对使用事件时间时有同样时间戳或者同时到达的事件进行排序的比较器comparator 场景假设您有一个网络服务器的访问日志您想要检测一种情况在过去5分钟内某个IP地址或一组IP地址在不到1秒内发起了10次或更多的请求。您可以使用Flink CEP来实现此目标。 Flink SQL和Table API的区别是什么? Flink本身是批流统一的处理框架所以Table API和SQL就是批流统一的上层处理API。 Table API是一套内嵌在Java和Scala语言中的查询API它允许我们以非常直观的方式组合来自一些关系运算符的查询比如select、filter和join。而对于Flink SQL就是直接可以在代码中写SQL来实现一些查询Query操作。Flink的SQL支持基于实现了SQL标准的Apache CalciteApache开源SQL解析工具。 SQL风格的查询语言 Flink SQLFlink SQL是一种用于查询和处理数据的SQL语言类似于传统关系型数据库的SQL。您可以编写SQL查询来操作数据流或批处理作业。这使得它对熟悉SQL的用户来说更加直观和易于使用。Table APIFlink Table API是一种基于API的方式使用类似SQL的查询表达式来操作数据。它允许以编程方式构建查询而不是使用纯SQL。 编程模型 Flink SQLFlink SQL更适合那些更习惯SQL查询语言的数据分析师和SQL开发人员因为它更接近传统SQL。它通常用于简单的查询和数据分析任务。Table APIFlink Table API提供了更多的编程灵活性允许开发人员以编程方式构建查询执行更复杂的数据操作并进行更灵活的数据处理。 集成和扩展性 Flink SQLFlink SQL具有更强的生态系统集成可以轻松与外部系统进行连接如Kafka、Elasticsearch、Hive等。Table APITable API提供更多的编程扩展性可以用于自定义表函数和表源。 Flink Streaming如何处理迟到事件? 事件时间数据经常包含乱序事件或延迟到达的事件。处理迟到事件需要合理配置水印Watermark以及使用特定的窗口和触发策略。 配置Watermark生成 在Flink中Watermark用于表示事件时间的进度。Watermark会告知系统事件时间的最大进度以便触发窗口操作。在数据流中要定期生成Watermark并将其传递给Flink。水印的生成通常基于事件时间字段的时间戳。 窗口配置 选择合适的窗口类型例如滚动窗口、滑动窗口或会话窗口以根据业务需求划分事件时间。设置窗口的大小和滑动间隔这将影响窗口的划分。 窗口分配策略 使用allowedLateness方法来为窗口分配策略设置最大允许的迟到时间。这将定义在触发窗口之后允许事件时间的延迟。对于迟到的事件它们可以被分配给窗口只要它们的事件时间小于窗口结束时间加上allowedLateness。 迟到事件处理 当窗口触发时系统将根据窗口内的数据执行聚合操作。然而迟到事件也可能已经到达。使用sideOutputLateData来将迟到事件流输出到另一个数据流然后您可以对这些迟到事件执行自定义操作。侧输出迟到事件会通过sideOutputLateData方法输出到late-events数据流中以供后续处理。正常事件则由MyWindowFunction进行处理。 通过合理配置Watermark和窗口以及使用allowedLateness和sideOutputLateData您可以有效地处理迟到事件并保持事件时间处理的正确性。 Flink与Spark Streaming的技术选型对比 一、设计理念 Spark的技术理念是使用微批来模拟流的计算,基于Micro-batch,数据流以时间为单位被切分为一个个批次,通过分布式数据集RDD进行批量处理,是一种伪实时。 Flink是基于事件驱动的是面向流的处理框架, Flink基于每个事件一行一行地流式处理是真正的流式计算. 另外他也可以基于流来模拟批进行计算实现批处理。 二、运行架构 Spark Streaming 运行时的角色(standalone 模式)主要有 Master:主要负责整体集群资源的管理和应用程序调度Worker:负责单个节点的资源管理driver 和 executor 的启动等Driver:用户入口程序执行的地方即 SparkContext 执行的地方主要是 DGA 生成、stage 划分、task 生成及调度Executor:负责执行 task反馈执行状态和执行结果。 Flink 运行时的角色(standalone 模式)主要有: Jobmanager: 协调分布式执行他们调度任务、协调 checkpoints、协调故障恢复等。至少有一个 JobManager。高可用情况下可以启动多个 JobManager其中一个选举为 leader其余为 standbyTaskmanager: 负责执行具体的 tasks、缓存、交换数据流至少有一个 TaskManagerSlot: 每个 task slot 代表 TaskManager 的一个固定部分资源Slot 的个数代表着 taskmanager 可并行执行的 task 数。 三、任务调度 Spark Streaming 连续不断的生成微小的数据批次构建有向无环图DAG根据DAG中的action操作形成job每个job有根据窄宽依赖生成多个stage。流程构建 DGA 图——划分 stage——生成 taskset——调度 task Flink 根据用户提交的代码生成 StreamGraph经过优化生成 JobGraph然后提交给 JobManager进行处理JobManager 会根据 JobGraph 生成 ExecutionGraphExecutionGraph 是 Flink 调度最核心的数据结构JobManager 根据 ExecutionGraph 对 Job 进行调度。 四、时间机制 Spark Streaming 支持的时间机制有限只支持处理时间。使用processing time模拟event time必然会有误差 如果产生数据堆积的话误差则更明显。 flink支持三种时间机制事件时间注入时间处理时间同时支持 watermark 机制处理迟到的数据,说明Flink在处理乱序大实时数据的时候,更有优势。 Flink Savepoint和Checkpoint的区别 Apache Flink中的Savepoint和Checkpoint都是用于保证数据流处理的一致性和容错性的机制但它们有不同的目的和使用场景以下是它们的区别 Checkpoint检查点和Savepoint保存点 目的 Checkpoint的主要目的是实现容错性。它用于将应用程序的状态定期保存到持久性存储中以便在发生故障时能够从故障点之前的状态继续进行处理确保数据不会丢失。Savepoint的主要目的是应用程序的升级、降级、变更或迁移。它允许将应用程序的状态保存到持久性存储中然后在新的应用程序版本上重新加载该状态。 触发方式 Checkpoint可以根据时间间隔或数据条数等规则定期触发或者可以在代码中显式触发。Savepoint通常是由用户手动触发的而不是定期触发。用户可以根据需要创建Savepoint以备将来的用途。 数据存储 Checkpoint的状态数据通常存储在分布式文件系统如HDFS或分布式存储系统中以确保高可用性和可靠性。Savepoint的状态数据通常存储在分布式文件系统或分布式存储系统中。 恢复操作 当应用程序失败时Flink可以使用最近成功的检查点进行恢复从最近的一次检查点开始继续处理数据流。Savepoint通常用于将状态从一个应用程序版本迁移到另一个应用程序版本而不是简单的故障恢复。它允许在不同版本的应用程序之间共享状态。 开销 Checkpoint需要额外的存储和网络开销因为它涉及将状态数据写入外部存储。Savepoint也需要额外的存储和网络开销。但它通常不会在常规故障恢复中使用。 总的来说Checkpoint和Savepoint都有状态保存和恢复的作用但它们的使用场景和触发方式不同。Checkpoint用于实现容错性由Flink自动触发用于故障恢复。Savepoint由用户手动触发用于应用程序升级、变更或迁移它更多是一种版本迁移工具。在实际应用中它们通常会一起使用以保证数据的完整性和应用程序的可维护性。 Flink任务链和任务槽的作用是什么? 一、任务链 对于分布式执行Flink 会将算子的 Subtasks 链接成 Tasks。每个 Task 由一个线程执行。将算子链接成 Task 形成任务链(task chain) 是个有用的优化能减少线程之间的切换减少消息的序列化/反序列化减少数据在缓冲区的交换减少延迟的同时提高整体的吞吐量。 二、任务槽 是 Flink 集群资源分配和管理的机制。每个任务槽相当于一个独立的资源池可以分配给一个或多个任务执行。通过任务槽可以对 Flink 集群内的资源进行合理的分配和管理从而更好地利用硬件资源提高任务的执行效率和吞吐量。 具体来说一个 Flink 作业Job可以由多个 TaskManager 组成并通过网络连接相互通信。每个 TaskManager 可以包含多个任务槽每个任务槽可以执行一个或多个任务。任务链可以将多个算子连接在一起让它们在同一个任务槽中运行。这样就能够避免将数据从一个任务槽发送到另一个任务槽的开销从而提高处理效率。 总的来说任务链和任务槽都是 Flink 的优化措施可以帮助用户更好地利用硬件资源提高 Flink 作业的执行效率和吞吐量。 Flink内存模型与内存优化 官网版本 一、Flink JVM 进程的进程总内存Total Process Memory Flink 应用使用的内存Flink 总内存 Total Flink MemoryJVM 堆内存Heap Memory堆外内存Off-Heap Memory 直接内存Direct Memory本地内存Native Memory Flink 运行过程中的 JVM 使用的内存元数据区JVM MetaSpace日常消耗JVM Overhead 一、内存模型 Flink总体内存主要包含 JobManager 内存模型和 TaskManager 内存模型。 JobManager 内存模型 Flink 总内存 Total Flink Memory #JobManager总进程内存 jobmanager.memory.process.size:4096m JVM 堆内存Heap Memory # 作业管理器的 JVM 堆内存大小jobmanager.memory.heap.size2048m 堆外内存Off-Heap Memory #作业管理器的堆外内存大小。jobmanager.memory.off-heap.size1536m TaskManager 内存模型 总体内存**Total Process Memory**Flink Java 应用程序包括用户代码和 JVM 运行整个进程所消耗的总内存。 总进程内存(Total Process Memory) Flink 总内存 JVM 元空间 JVM 执行开销 **Total Flink Memory**仅 Flink Java 应用程序消耗的内存包括用户代码但不包括 JVM 为其运行而分配的内存。 Flink 总内存 Framework堆内外 task 堆内外 network managed Memory JVM Heap (JVM 堆上内存)**Framework Heap **框架堆内存Task Heap : 任务堆内存Off-Heap Mempry(JVM 堆外内存)Managed memory: 托管内存由 Flink 管理的原生托管内存保留用于排序、哈希表、中间结果缓存和 RocksDB 状态后端。**DirectMemory**JVM 直接内存 **Framework Off-Heap Memory**Flink 框架堆外内存。 TaskManager 本身所占用的对外内存不计入Slot资源。 **Task Off-Heap **Task 堆外内存。 专用于Flink 框架的堆外直接或本机内存。 **Network Memory**网络内存。 网络数据交换所使用的堆外内存大小如网络数据交换缓冲区。 **JVM metaspace**JVM 元空间。Flink JVM 进程的元空间大小,默认为256MB。**JVM Overhead **JVM执行开销。JVM 执行时自身所需要的内容包括线程堆栈、IO、 编译缓存等所使用的内存,这是一个上限分级成分的的总进程内存。 二、内存优化 为 Standalone 配置内存 建议为 Standalone 配置 Flink 总内存设置 JobManager 和 TaskManager 的 flink.size 大小,声明为 Flink 本身提供了多少内存。 为 Containers(容器) 配置内存 建议为容器化部署Kubernetes或Yarn配置总进程内存设置 process.size 大小它声明了总共应该分配多少内存给 Flink JVM 进程并对应于请求容器的大小。 为 state backends(状态后端)配置内存 为 state backends(状态后端)配置内存时这仅与TaskManager相关。在部署 Flink 流应用程序时所使用的状态后端类型将决定集群的最佳内存配置。HashMap 状态后端 运行无状态作业或使用 HashMapStateBackend 时将托管内存设置为零。这将确保为 JVM 上的用户代码分配最大数量的堆内存。 RocksDB 状态后端 该 EmbeddedRocksDBStateBackend 使用本机内存。默认情况下RocksDB 设置为将本机内存分配限制为托管内存的大小。因此为你的状态保留足够的托管内存非常重要。如果禁用默认的 RocksDB 内存控制RocksDB 分配的内存超过请求的容器大小总进程内存的限制则可以在容器化部署中终止 TaskManager 。 为 batch Job(批处理作业)配置内存 Flink 的批处理操作符利用托管内存来更高效地运行。这样做时可以直接对原始数据执行某些操作而无需反序列化为 Java 对象。这意味着托管内存配置对应用程序的性能有实际影响。Flink 将尝试分配和使用 为批处理作业配置的尽可能多的托管内存但不会超出其限制。这可以防止 OutOfMemoryError’s因为 Flink 准确地知道它必须利用多少内存。如果托管内存不足Flink 会优雅地溢出到磁盘。 Flink内存管理机制及其参数调优 Flink状态管理内部原理是什么? 一、Flink的State类型 托管状态Managed State由Flink管理的Flink帮忙存储、恢复和优化。Keyed State Keyed State是KeyedStream上的状态。假如输入流按照id为Key进行了keyBy分组形成一个KeyedStream数据流中所有id为1的数据共享一个状态可以访问和更新这个状态以此类推每个Key对应一个自己的状态。下图展示了Keyed State因为一个算子子任务可以处理一到多个Key算子子任务1处理了两种Key两种Key分别对应自己的状态。ValueStateT: 保存一个可以更新和检索的值。 这个值可以通过 update(T) 进行更新通过 T value() 进行检索。ListStateT: 保存一个元素的列表。可以往这个列表中追加数据并在当前的列表上进行检索。可以通过 add(T) 或者 addAll(ListT) 进行添加元素通过 IterableT get() 获得整个列表。还可以通过 update(ListT) 覆盖当前的列表。ReducingStateT: 保存一个单值表示添加到状态的所有值的聚合。接口与 ListState 类似但使用 add(T) 增加元素会使用提供的 ReduceFunction 进行聚合。AggregatingStateIN, OUT: 保留一个单值表示添加到状态的所有值的聚合。和 ReducingState 相反的是, 聚合类型可能与 添加到状态的元素的类型不同。 接口与 ListState 类似但使用 add(IN) 添加的元素会用指定的 AggregateFunction 进行聚合。MapStateUK, UV: 维护了一个映射列表。 你可以添加键值对到状态中也可以获得反映当前所有映射的迭代器。使用 put(UKUV) 或者 putAll(MapUKUV) 添加映射。 使用 get(UK) 检索特定 key。 使用 entries()keys() 和 values() 分别检索映射、键和值的可迭代视图。你还可以通过 isEmpty() 来判断是否包含任何键值对。 Operator State Operator State可以用在所有算子上每个算子子任务或者说每个算子实例共享一个状态流入这个算子子任务的数据可以访问和更新这个状态。下图展示了Operator State算子子任务1上的所有数据可以共享第一个Operator State以此类推每个算子子任务上的数据共享自己的状态。 广播状态 (Broadcast State) 广播状态是一种特殊的算子状态。引入它的目的在于支持一个流中的元素需要广播到所有下游任务的使用情形。在这些任务中广播状态用于保持所有子任务状态相同。 该状态接下来可在第二个处理记录的数据流中访问。可以设想包含了一系列用于处理其他流中元素规则的低吞吐量数据流这个例子自然而然地运用了广播状态。不同之处 它具有 map 格式它仅在一些特殊的算子中可用。这些算子的输入为一个广播数据流和非广播数据流这类算子可以拥有不同命名的多个广播状态 。 原生状态Raw State开发者自己管理的需要自己序列化。区别状态的数据结构 Managed State支持了一系列常见的数据结构如ValueState、ListState、MapState等。Raw State只支持字节任何上层数据结构需要序列化为字节数组。使用时需要用户自己序列化以非常底层的字节数组形式存储Flink并不知道存储的是什么样的数据结构。 具体的使用场景 绝大多数的算子都可以通过继承Rich函数类或其他提供好的接口类在里面使用Managed State。Raw State是在已有算子和Managed State不够用时用户自定义算子时使用。 二、状态算子的扩缩容 流应用的一个基本需求是根据输入速率的增加或减少而调整算子的并行性。有状态算子调整并行度比较难。因为我们需要把状态重新分组分配到与之前数量不等的并行任务上。Keyed State带有键值分区状态的算子可以通过将键重新划分来进行任务的扩缩容。但是为了提高效率Flink 不会以键为单位来进行划分。相反Flink 以键组作为单位来重新分配每个键组里面包含了多个键。Operate State带有算子列表状态的算子在扩缩容时会对列表中的条目进行重新分配。所有并行任务的列表项会被统一收集起来并再均匀重新分配。如果列表项的数量少于算子的新并行度一些任务将以空状态开始。带有算子联合列表状态的算子会在扩缩容时把状态列表中的全部条目广播到全部任务中。然后任务自己来选择使用哪些项和丢弃哪些项。带有算子广播状态的算子在扩缩容时会把状态拷贝到全部新任务上。这样做是因为广播状态要确保所有任务具有相同的状态。在缩容的情况下直接简单地停掉多余的任务即可。 三、状态后端 在 Flink 中状态的存储、访问以及维护都是由一个可插拔的组件决定的这个组件就叫作状态后端(State Backend)。状态后端主要负责管理本地状态的存储方式和位置。有状态的流计算是 Flink 的一大特点状态本质上是数据数据是需要维护的例如数据库就是维护数据的一种解决方案。后端分类Flink 1.13 以前 MemoryStateBackend 基于内存存储。将状态维护在 JVM 堆上的一个内部状态后端。建议在本地开发或调试时使用 MemoryStateBackend因为它具有有限的状态大小。这意味着它在处理大规模状态时可能会出现性能问题或内存限制。 FsStateBackend 基于文件存储。配置通过 URL(type, address, path) 等文件系统完成。FsStateBackend 非常适合处理大规模状态、长窗口或大型键值状态的 Apache Flink 有状态流处理作业。它专为高效地处理大量状态数据而设计。此外FsStateBackend 确实是高可用性设置的理想选择。它确保将状态数据可靠地存储在文件系统中以应对故障并提供容错性。同时在 JobManager 的内存或 ZooKeeper 中存储的最小元数据进一步增强了高可用性能力。因此当 Flink 应用程序需要强大的状态数据处理和保持高可用性时通常建议使用 FsStateBackend。 RocksDBStateBackend 基于 RocksDB 存储。配置通过 URL(type, address, path) 等文件系统完成RocksDBStateBackend 使用 RocksDB 数据库在本地磁盘上保存进行中的数据。 Flink 1.13 以及以后 HashMapStateBackendFsstatebackend和MemoryStatebackend整合系统默认 HashMapStateBackend 把状态存放在内存里。具体实现 哈希表状态后端在内部会直接把状态当作对象(Objects)保存在 Taskmanager 的 JVM 堆内存上。普通的状态以及窗口中收集的数据和触发器都会以键值对的形式存储起来所以底层是一个哈希表(HashMap)这种状态后端也因此得名。 EmbeddedRocksDBStateBackend内嵌 RocksDB 状态后端 RocksDB 是一种内嵌的 key-value 存储介质可以把数据持久化到本地硬盘。配置 EmbeddedRocksDBStateBackend后会将处理中的数据全部放入 RocksDB 数据库中RocksDB 默认存储在 TaskManager 的本地数据目录里。 区别 HashMap 和 RocksDB 两种状态后端最大的区别就在于本地状态存放在哪里。HashMapStateBackend 是内存计算读写速度非常快但是状态的大小会受到集群可用内存的限制如果应用的状态随着时间不停地增长就会耗尽内存资源。EmbeddedRocksDBStateBackend 是硬盘存储所以可以根据可用的磁盘空间进行扩展所以它非常适合于海量状态的存储。不过由于每个状态的读写都需要做序列化/反序列化而且可能需要直接从磁盘读取数据这就会导致性能的降低平均读写性能要比 HashMapStateBackend 慢一个数量级。 主要作用 Local State Management本地状态管理 State Management 的主要任务是确保状态的更新和访问。 Remote State Checkpointing远程状态备份。 Flink 程序是分布式运行的而 State 都是存储到各个节点上的一旦 TaskManager 节点出现问题就会导致 State 的丢失。State Backend 提供了 State Checkpointing 的功能可以将 TaskManager 本地的 State 的备份到远程的存储介质上可以是分布式的存储系统或者数据库。不同的 State Backends 备份的方式不同会有效率高低的区别。 状态后端的主要作用为在每一个 TaskManager 节点上存储和管理状态以及将状态进行远程备份两个部分。 四、状态持久化 Checkpoint 有状态流应用中的检查点(Checkpoint)其实就是所有任务的状态在某个时间点的一个快照一份拷贝。简单来讲就是一次“存档”将之前处理数据的进度进行保存。在一个流应用程序运行时Flink 会定期保存检查点在检查点中会记录每个算子的 ID 和状态。如果发生故障Flink会使用最近一次成功保存的检查点来恢复应用的状态并重新启动处理流程就如同“读档”一样。 Savepoint Checkpoint 的主要目的是为意外失败的作业提供恢复机制(如 TaskManager/JobManager 进程挂了)。Checkpoint 由Flink 管理即 Flink 创建、管理和删除。Checkpoint 无需用户交互。Savepoint 的应用场景为升级 Flink 版本调整用户逻辑改变并行度以及进行红蓝部署等。Savepoint 的用例是有计划的需要手动备份和恢复Savepoint 更多地关注可移植性。Savepoint 由用户管理即用户创建、管理和删除。 五、状态 TTL ​ 在实际应用中很多状态会随着时间的推移逐渐增长如果不加以限制最终就会导致存储空间的耗尽。一个优化的思路是直接在代码中调用 .clear() 方法去清除状态但是有时候我们的逻辑要求不能直接清除。这时就需要配置一个状态的“生存时间”(Time-To-Live, TTL)当状态在内存中存在的时间超出这个值时就将它清除。 Flink状态一致性和容错机制 一、容错机制 Checkpoint 有状态流应用中的检查点(Checkpoint)其实就是所有任务的状态在某个时间点的一个快照一份拷贝。简单来讲就是一次“存档”将之前处理数据的进度进行保存。 在一个流应用程序运行时Flink 会定期保存检查点在检查点中会记录每个算子的 ID 和状态。如果发生故障Flink会使用最近一次成功保存的检查点来恢复应用的状态并重新启动处理流程就如同“读档”一样。 保存时机 随时保存缺点耗费资源周期保存周期异步增量保存 实现方式 朴素算法 总结下来就是数据源暂停接收数据待数据流中的数据全部处理完毕再把数据源偏移键值状态算子状态等写入检查点路径中优点是实现简单缺点是处理太慢。 Chandy-Lamport 算法 Savepoint Savepoint 作为实时任务的全局镜像其在底层使用的代码和Checkpoint的代码是一样的触发方式 使用 flink savepoint 命令触发 Savepoint,其是在程序运行期间触发 savepoint。使用 flink cancel -s 命令取消作业时并触发 Savepoint。使用 Rest API 触发 Savepoint格式为/jobs/:jobid /savepoints 二、状态一致性 处理语义At-Most-Once特点最多计算一次。缺点有可能会有数据丢失 At-Most-Once 是最简单的恢复方式直接从失败处的下个数据开始恢复程序之前的失败数据处理就不管了。可以保证数据或事件最多由应用程序中的所有算子处理一次。这意味着如果数据在被流应用程序完全处理之前发生丢失则不会进行其他重试或者重新发送。 At-Least-Once特点至少计算一次。缺点有可能重复处理数据 应用程序中的所有算子都保证数据或事件至少被处理一次。这通常意味着如果事件在流应用程序完全处理之前丢失则将从源头重放或重新传输事件。然而由于事件是可以被重传的因此一个事件有时会被处理多次(至少一次)至于有没有重复数据不会关心所以这种场景需要人工干预自己处理重复数据。 Exactly-Once保证每一条消息只被流处理系统处理一次。) 通过这种机制流应用程序中每个算子的所有状态都会定期做 Checkpoint。如果是在系统中的任何地方发生失败每个算子的所有状态都回滚到最新的全局一致 Checkpoint 点。在回滚期间将暂停所有处理源也会重置为与最近 Checkpoint 相对应的正确偏移量。整个流应用程序基本上是回到最近一次的一致状态然后程序可以从该状态重新启动。 End-to-End Exactly-Once端到端的精确一次 Source可重设数据的读取位置如 KafkaTransformationCheckpoint 机制Asynchronous Barrier SnapshottingSink从故障恢复时数据不会重复写入外部系统 幂等写入如 HBase、Redis 这样的 KV 数据库按 K 覆盖 V。关系型数据库可以在插入数据时检测是否有重复键如果有重复键则执行更新操作。事务写入预写日志和二阶段提交。 Flink批流统一的意义及实现 Flink是一个分布式流处理框架具有批处理和流处理的能力并且支持将批处理和流处理统一起来这也是Flink的一个重要特点。 批流统一的意义在于消除了传统上批处理和流处理之间的差异使得开发人员可以使用相同的编程模型来处理批处理和流处理任务。这样可以带来以下好处 简化开发开发人员只需要学习和实现一种编程模型就可以同时处理批处理和流处理任务减少了学习成本和开发复杂度。 灵活性和实时性通过将批处理和流处理统一起来可以实现实时的流处理任务同时也可以处理离线的批处理任务。这使得数据处理更加灵活并且可以根据需求实时处理和分析数据。 资源利用率批流统一可以将批处理和流处理作业一起提交到同一个执行引擎中执行从而更好地利用计算资源。例如在低峰期可以使用空闲资源执行批处理作业而在高峰期可以动态分配资源进行流处理。 在Flink中实现批流统一主要依靠以下机制 时间语义Flink使用事件时间和处理时间来处理流数据。事件时间是根据事件真实发生的时间来处理数据适合于有序或无序事件流而处理时间是根据数据到达处理节点的时间来处理数据。通过时间语义Flink可以同时支持批处理和实时流处理任务。 状态管理Flink提供了可靠的状态管理机制用于在流处理和批处理任务中跟踪和维护状态。状态可以用于存储中间结果、缓存数据等在批处理和流处理任务中都可以使用。 数据分区和窗口Flink支持将流数据分为不同的分区并提供了窗口机制来对数据进行分组和聚合操作。窗口可以基于事件时间或处理时间进行定义从而支持批处理和流处理的需求。 总之Flink批流统一的实现主要基于时间语义、状态管理和窗口机制等核心特性使得开发人员可以以一种统一的方式处理批处理和流处理任务提高开发效率和数据处理的灵活性。 Kafka和Flink内存参数配置的关系和优化方法 Flink 和 Kafka 都是流处理领域的重要技术。Flink 作为一个流处理引擎可以和 Kafka 一起使用来实现各种数据处理和分析任务。 Flink 调优建议 并行度调整Flink 的并行度是决定性能瓶颈的因素之一。可以根据任务的需求和计算资源来调整并行度。 状态后端调整Flink 支持多种状态后端比如 RocksDB 和 MemoryStateBackend。不同的状态后端对性能影响很大可以根据任务的需求和计算资源来选择合适的状态后端。 内存管理调整Flink 在运行时会占用一定的内存空间需要合理分配内存资源以免导致内存溢出等问题。可以通过调整 JVM 的堆内存和堆外内存大小来优化内存管理。 算子链调整Flink 的算子链可以影响任务的性能。可以通过将一些算子合并成一个算子链来提高性能。 Kafka 调优建议 分区数量调整Kafka 的性能和分区数量相关可以根据消息的大小和数量来适当调整分区数量。 消费者数量调整消费者的数量也会影响 Kafka 的性能。可以根据消费者的数量和计算资源来适当调整消费者数量。 消息压缩设置Kafka 支持多种消息压缩方式可以根据消息的类型和大小来选择合适的消息压缩方式。 批量拉取和提交设置Kafka 支持批量拉取和提交消息可以通过调整批量大小来优化性能。 总的来说Flink 和 Kafka 的调优都需要根据具体情况进行调整。可以通过监控系统的性能指标如 CPU 使用率、内存使用率、网络带宽等指标来优化调整。 Flink kafka source的并行度需要和kafka topic的分区数一致。最大化利用kafka多分区topic的并行读取能力。 KafkaFlinkClickHouse在联合场景中的优势 Flink读取Kafka数据下沉到Clickhouse 整体流程 向kafka特定主题下导入json格式数据编写Flink Kafka Comsumer消费主题下的数据利用Flink算子对数据进行处理(etl)将处理后的数据下沉到Clickhouse数据库中 Why FlinkClickHouse 指标实现 sql 化描述分析师提出的指标基本都以 SQL 进行描述。指标的上下线互不影响一个 Flink 任务消费 Topic如果还需要其它指标可以保证指标的上下线互不影响。数据可回溯方便异常排查当日活下降需要回溯排查是哪些指标口径的逻辑问题比如是报的数据差异或是数据流 Kafka 掉了或者是因为用户没有上报某个指标导致日活下降而 Flink 则无法进行回溯。计算快一个周期内完成所有指标计算需要在五分钟内将成百上千的所有维度的指标全部计算完成。支持实时流分布式部署运维简单支持 Kafka 数据实时流。 Why ClickHouse so Fast ClickHouse 采用列式存储 LZ4、ZSTD 数据压缩。计算存储结合本地化向量化执行。ClickHouse 计算存储本地化是指每一台计算机器存在本地 SSD 盘只需要计算自己的数据再进行节点合并。LSM merge treeIndex。将数据写入 ClickHouse 之后会在后台开始一个线程将数据进行 merge做 Index 索引。如建常见的 DT 索引和小时级数据索引以提高查询性能。SIMDLLVM 优化。SIMD 是单指令多数据集。SQL 语法及 UDF 完善。 优化性能。 总的来说Flink 和 Kafka 的调优都需要根据具体情况进行调整。可以通过监控系统的性能指标如 CPU 使用率、内存使用率、网络带宽等指标来优化调整。 Flink kafka source的并行度需要和kafka topic的分区数一致。最大化利用kafka多分区topic的并行读取能力。 KafkaFlinkClickHouse在联合场景中的优势 Flink读取Kafka数据下沉到Clickhouse 整体流程 向kafka特定主题下导入json格式数据编写Flink Kafka Comsumer消费主题下的数据利用Flink算子对数据进行处理(etl)将处理后的数据下沉到Clickhouse数据库中 Why FlinkClickHouse 指标实现 sql 化描述分析师提出的指标基本都以 SQL 进行描述。指标的上下线互不影响一个 Flink 任务消费 Topic如果还需要其它指标可以保证指标的上下线互不影响。数据可回溯方便异常排查当日活下降需要回溯排查是哪些指标口径的逻辑问题比如是报的数据差异或是数据流 Kafka 掉了或者是因为用户没有上报某个指标导致日活下降而 Flink 则无法进行回溯。计算快一个周期内完成所有指标计算需要在五分钟内将成百上千的所有维度的指标全部计算完成。支持实时流分布式部署运维简单支持 Kafka 数据实时流。 Why ClickHouse so Fast ClickHouse 采用列式存储 LZ4、ZSTD 数据压缩。计算存储结合本地化向量化执行。ClickHouse 计算存储本地化是指每一台计算机器存在本地 SSD 盘只需要计算自己的数据再进行节点合并。LSM merge treeIndex。将数据写入 ClickHouse 之后会在后台开始一个线程将数据进行 merge做 Index 索引。如建常见的 DT 索引和小时级数据索引以提高查询性能。SIMDLLVM 优化。SIMD 是单指令多数据集。SQL 语法及 UDF 完善。 总结ClickHouse 对此有很大需求。在数据分析或者维度下拽时需要更高的特性如时间窗口的一部分功能点。
http://www.tj-hxxt.cn/news/136519.html

相关文章:

  • 网站建设与管理清考作业网站轮播广告
  • 网站源码推荐网站代码关键词标题
  • 唐山制作手机网站网站内容建设和管理
  • angular网站模板下载wordpress对话框模板
  • 张掖建设局网站做的比较好的二手交易网站有哪些
  • 广州建设监理协会网站乐陵人力资源中心
  • 天水市秦州区建设局网站免费proxy服务器地址
  • 中国空间站最新进展东莞债务优化公司
  • 沈阳建设工程质量检测中心网站天津装修公司哪家口碑好些
  • 设计制作个人网站thinkphp 做网站如何
  • 苏州保洁公司哪家好一点windows优化大师功能
  • net域名做网站怎么样青岛需要做网站的公司有哪些
  • 中国建设工程招标官方网站网页ui设计尺寸规范
  • 淘宝做网站哈尔滨网站建设方案外包
  • 泉州网站制作建设谷搜易外贸网站建设
  • 惠州网站建设(推荐乐云践新)软件二次开发
  • 门户手机网站开发烟台网站建设企汇互联见效付款
  • 温州地区做网站重庆网站建设微信开发
  • 网站制作wordpress网站建设的宿主选择
  • 南山网站设计公司ps 做儿童摄影网站首页
  • 西安网站建设方案维护网页设计与制作 教学效果
  • 县区网站集约化建设收费网站模板
  • 用视频做影视的网站百度知道网页版登录入口
  • 湘潭高新区建设局网站南京英文网站建设
  • 5g对网站建设的影响网站优化做些什么
  • 贵州建设官方网站it之家网站源码
  • 硬件开发一站式平台网页游戏网站链接
  • 宝应网站设计网站开发典型
  • 在国际网站做外贸需要条件天猫网店代运营
  • 菏泽建设网站邢台百姓网官网