网站邮箱接口怎么设置,长春网络推广公司哪个好,做海报那个网站好,如何给企业做网站1. shuffle前言
对spark任务划分阶段#xff0c;遇到宽依赖会断开#xff0c;所以在stage 与 stage 之间会产生shuffle#xff0c;大多数Spark作业的性能主要就是消耗在了shuffle环节#xff0c;因为该环节包含了大量的磁盘IO、序列化、网络数据传输等操作。
负责shuffle…1. shuffle前言
对spark任务划分阶段遇到宽依赖会断开所以在stage 与 stage 之间会产生shuffle大多数Spark作业的性能主要就是消耗在了shuffle环节因为该环节包含了大量的磁盘IO、序列化、网络数据传输等操作。
负责shuffle过程的执行、计算和处理的组件主要就是ShuffleManager也即shuffle管理器。而随着Spark的版本的发展ShuffleManager也在不断迭代。
ShuffleManager 大概有两个 HashShuffleManager 和 SortShuffleManager。
历史
在spark 1.2以前默认的shuffle计算引擎是HashShuffleManager
在spark 1.2以后的版本中默认的ShuffleManager改成了SortShuffleManager
在spark 2.0以后抛弃了 HashShuffleManager。
2. HashShuffleManager 上游 stage 有 2个 Executor每个Executor 有 2 个 task。
下游 stage 有 3个task。
shuffle write阶段
将相当于mapreduce的shuffle write 上游的mapTask任务的数据按照key的hash 分桶写出中间文件个数为下游reduceTask的任务即下游RDD分区的个数。
写出中间文件个数 maptask的个数 * reducetask的个数。
shuffle read 阶段
就相当于mapreduce 的 shuffle read 每个reducetask 拉取自己的数据。
由于shuffle write的过程中task给下游stage的每个task都创建了一个磁盘文件因此shuffle read的过程中每个reducetask只要从上游stage的所有maptask所在节点上拉取属于自己的那一个磁盘文件即可。
弊端
shuffle write阶段占用大量的内存空间会导致频繁的GC容易导致OOM(out of memory)也会产生大量的小文件写入过程中会产生大量的磁盘IO性能受到影响。适合小数据集的处理。
3. HashShuffleManager 优化 开启consolidate机制。
设置参数spark.shuffle.consolidateFiles。该参数默认值为false将其设置为true即可开启优化机制。
shuffle write阶段
我们知道如果executor的个数为5个一个executor上的核心是1个有10个分区的数据要处理即一个核心要处理2个任务。
开启consolidate机制后上游的每个mapTask任务的数据仍然按照key的hashCode值分桶但每个任务并不会形成很多个中间小文件而是对于每个executor的每个核来说只会产生下游reduceTask个数的文件。优化后HashShuffleManager允许上游的交给由一个executor的一个core处理的多个maptask任务的数据以追加形式写入文件组这样就可以有效将多个task的磁盘文件进行一定程度上的合并从而大幅度减少磁盘文件的数量进而提升shuffle write的性能。
写出中间文件个数 上游的CPU核数 * 下游task的个数
shuffle read 阶段
就相当于mapreduce 的 shuffle read 每个reducetask 拉取自己的数据。
由每个reducetask只要从上游stage的所在节点上拉取属于自己的那一个磁盘文件即可。
弊端
优化后的HashShuffleManager虽然比优化前减少了很多小文件但在处理大量数据时还是会产生很多的小文件。
4. SortShuffleManager
Spark在引入Sort-Based Shuffle以前比较适用于中小规模的大数据处理。为了让Spark在更大规模的集群上更高性能处理更大规模的数据于是就引入了SortShuffleManager。 shuffle write阶段
shuffle操作之前数据会被划分为多个分区。每个分区被发往不同的executor进行计算。在map阶段每个executor会根据key的hashCode值将数据进行分桶产生小文件。每个桶对应的一个下游分区。在每个桶中数据会被按照key进行局部排序这个操作不是必须的排序后这些小文件会写入到内存中的一个大的shuffle文件。在写入shuffle文件的同时会生成一个index索引文件。索引文件可以快速定位和读取所需要的键值对数据而不需要扫描整个文件。
SortShuffleManager不会为每个Reducer中的Task生成一个单独的文件相反会把上游中每个mapTask所有的输出数据Data只写到一个文件中。并使用了Index文件存储具体 mapTask 输出数据在该文件的位置。
因此 上游 中的每一个mapTask中产生两个文件Data文件 和 Index 文件其中Data文件是存储当前Task的Shuffle输出的而Index文件中存储了data文件中的数据通过partitioner的分类索引。
写出文件数 maptask的个数 * 2 index 和 data
可见SortShuffle 的产生的中间文件的多少与 上个stage 的 maptask 数量有关。
shuffle read 阶段
下游的Stage中的Task就是根据这个Index文件获取自己所要抓取的上游Stage中产生的数据。
在sortShuffleManager中我们可以启动byPass机制不排序的机制。开关的值默认是mapTask的个数是200. 触发bypass机制的条件
shuffle map task的数量小于spark.shuffle.sort.bypassMergeThreshold参数的值默认200或者不是聚合类的shuffle算子比如groupByKey
5. 总结
回顾整个Shuffle的历史Shuffle产生的临时文件的数量的变化以此为
Hash ShuffleM*R
Consolidate 方式的Hash ShuffleC*R
Sort Shuffle2*M
其中M上游stage的task数量R下游stage的task数量C上游stage运行task的CPU核数