当前位置: 首页 > news >正文

山东城市建设招生网站营销型公司和销售型公司

山东城市建设招生网站,营销型公司和销售型公司,kunkka wordpress,万网手机网站Spark 应用调优人数统计优化摇号次数分布优化Shuffle 常规优化数据分区合并加 Cache优化中签率的变化趋势中签率局部洞察优化倍率分析优化表信息 : apply : 申请者 : 事实表lucky : 中签者表 : 维度表两张表的 Schema ( batchNum#xff0c;carNum ) : ( 摇号批次#xff0c… Spark 应用调优人数统计优化摇号次数分布优化Shuffle 常规优化数据分区合并加 Cache优化中签率的变化趋势中签率局部洞察优化倍率分析优化表信息 : apply : 申请者 : 事实表lucky : 中签者表 : 维度表两张表的 Schema ( batchNumcarNum ) : ( 摇号批次申请编号 )分区键都是 batchNum 运行环境 : 配置项设置 : 优化点 : 人数统计 统计至今参与摇号的总人次和幸运的中签者人数 val rootPath: String _// 申请者数据因为倍率的原因每一期同一个人可能有多个号码 val hdfs_path_apply s${rootPath}/apply val applyNumbersDF spark.read.parquet(hdfs_path_apply) applyNumbersDF.count// 中签者数据 val hdfs_path_lucky s${rootPath}/lucky val luckyDogsDF spark.read.parquet(hdfs_path_lucky) luckyDogsDF.countSQL 实现 : selectcount(*) from applyNumbersDFselectcount(*) from luckyDogsDF去重计数得到实际摇号数 : val applyDistinctDF applyNumbersDF.select(batchNum, carNum).distinctapplyDistinctDF.countSQL 实现 : selectcount(distinct batchNum ,carNum) from applyDistinctDF优化 分析 : 共有 3 个 Actions会触发 3 个 Spark Jobs 用 Cache 原则 RDD/DataFrame/Dataset 引用次数为 1坚决不用 Cache当引用次数大于 1且运行成本占比超过 30%考虑用 Cache 优化 : 利用 Cache 机制来提升执行性能 val rootPath: String _// 申请者数据因为倍率的原因每一期同一个人可能有多个号码 val hdfs_path_apply s${rootPath}/apply val applyNumbersDF spark.read.parquet(hdfs_path_apply) // 缓存 applyNumbersDF.cacheapplyNumbersDF.countval applyDistinctDF applyNumbersDF.select(batchNum, carNum).distinct applyDistinctDF.count摇号次数分布 不同人群摇号次数的分布 : 统计所有申请者累计参与了多少次摇号所有中签者摇了多少次号才能幸运地摇中签 统计所有申请者的分布情况 val result02_01 applyDistinctDF.groupBy(col(carNum)).agg(count(lit(1)).alias(x_axis)).groupBy(col(x_axis)).agg(count(lit(1)).alias(y_axis)).orderBy(x_axis)result02_01.write.format(csv).save(_)SQL 实现 : with t1 as (selectcarNum,count(1) as x_axisfrom applyDistinctDFgroup by carNum ) selectx_axis,count(1) as y_axis from t1 group by x_axis order by x_axis优化 分析 : 共两次 Shuffle。以 carNum 做分组计数 以 x_axis 列再次做分组计数 Shuffle 的本质 : 数据的重新分发凡是有 Shuffle 地方就要关注数据分布 对过小的数据分片要对进行合并 Shuffle 常规优化 优化点 : 减少 Shuffle 过程中磁盘与网络的请求次数 Shuffle 的常规优化 By Pass 排序操作 : 条件计算逻辑不涉及聚合或排序Reduce 的并行度 spark.shuffle.sort.bypassMergeThreshold调整读写缓冲区 : 条件 : Execution Memory 大 对读写缓冲区做调优 : spark.shuffle.file.buffer : Map 写入缓冲区大小spark.reducer.maxSizeInFlight : Reduce 读缓冲区大小 读写缓冲区是以 Task 为粒度进行设置所以调整这些参数时, 扩大 50% 默认调优spark.shuffle.file.buffer 32KBspark.shuffle.file.buffer 48 KB (32KB * 1.5)spark.reducer.maxSizeInFlight 48 MBspark.reducer.maxSizeInFlight 72MB ( 48MB * 1.5) 性能对比 : 数据分区合并 优化点 : 提升 Reduce 阶段的 CPU 利用率 该数据集在内存的精确大小 : def sizeNew(func: DataFrame, spark: SparkSession): String {val result funcval lp result.queryExecution.logicalval size spark.sessionState.executePlan(lp).optimizedPlan.stats.sizeInByteEstimated size: size/1024 KB }把 applyDistinctDF 作实参调用 sizeNew 函数返回大小 2.6 GB 将数据集尺寸/并行度(spark.sql.shuffle.partitions 200) Reduce 每个数据分片的存储大小 ( 2.6 GB / 200 13 MB数据分片大小在 200 MB 左右为宜13 MB 太小 优化设置 : 计算集群配置 Executors core 3 * 2 6其 minPartitionNum 为 6 # 开启 AQE spark.sql.adaptive.enabled true# 自动分区合并 spark.sql.adaptive.coalescePartitions.enabled true # 合并后的大小 spark.sql.adaptive.advisoryPartitionSizeInBytes 160MB/200MB/210MB/400MB # 分区合并后的最小分区数 spark.sqladaptive.coalescePartitions.minPartitionNum 6总结 : 并行度过高、数据分片过小CPU 调度开销会变大执行性能也变差检验值 : 分片粒度为 200 MB 左右时执行性能是最优的并行度过低、数据分片过大CPU 数据处理开销也会过大执行性能会锐减 性能对比 : 加 Cache Cache : 避免数据集在磁盘中的重复扫描与重复计算 applyDistinctDF.cache applyDistinctDF.countval result02_01 applyDistinctDF.groupBy(col(carNum)).agg(count(lit(1)).alias(x_axis)).groupBy(col(x_axis)).agg(count(lit(1)).alias(y_axis)).orderBy(x_axis)result02_01.write.format(csv).save(_)性能对比 : 得到中签者的摇号次数 val result02_02 applyDistinctDF.join(luckyDogsDF.select(carNum), Seq(carNum), inner).groupBy(col(carNum)).agg(count(lit(1)).alias(x_axis)).groupBy(col(x_axis)).agg(count(lit(1)).alias(y_axis)).orderBy(x_axis)result02_02.write.format(csv).save(_)SQL 实现 : with t3 as (selectcarNum,count(1) as x_axisfrom applyDistinctDF t1 join luckyDogsDF t2on t1.carNum t2.carNumgroup by carNum ) selectx_axis,count(1) as y_axis from t3 group by x_axis order by x_axis优化 分析 : 计算中有一次数据关联两次分组、聚合排序 applyDistinctDF 有 1.35 亿条记录luckyDogsDF 有 115 w条记录大表 Join 小表最先想用广播变量 用广播变量来优化大小表关联计算 : 估算小表在内存中的存储大小设置广播阈值 spark.sql.autoBroadcastJoinThreshold 用 sizeNew 计算 luckyDogsDF 得到大小 18.5MB 设置广播阈值要大于 18.5MB 即 : 设置为 20MB : spark.sql.autoBroadcastJoinThreshold 20MB性能对比 : 中签率的变化趋势 计算中签率分别统计每个摇号批次中的申请者和中签者人数 // 统计每批次申请者的人数 val apply_denominator applyDistinctDF.groupBy(col(batchNum)).agg(count(lit(1)).alias(denominator))// 统计每批次中签者的人数 val lucky_molecule luckyDogsDF.groupBy(col(batchNum)).agg(count(lit(1)).alias(molecule))val result03 apply_denominator.join(lucky_molecule.select, Seq(batchNum), inner).withColumn(ratio, round(col(molecule)/ col(denominator), 5)).orderBy(batchNum)result03.write.format(csv).save(_)SQL 实现 : with t1 as (selectbatchNum,count(1) as denominatorfrom applyDistinctDFgroup by batchNum ), t2 as (selectbatchNum,count(1) as moleculefrom luckyDogsDFgroup by batchNum ) selectbatchNum,round(molecule/denominator, 5) as ratio from t1 join t2 on t1.batchNum t2.batchNum order by batchNum中签率局部洞察 统计 2018 年的中签率 // 筛选出2018年的中签数据并按照批次统计中签人数 val lucky_molecule_2018 luckyDogsDF.filter(col(batchNum).like(2018%)).groupBy(col(batchNum)).agg(count(lit(1)).alias(molecule))// 通过与筛选出的中签数据按照批次做关联计算每期的中签率 val result04 apply_denominator.join(lucky_molecule_2018, Seq(batchNum), inner).withColumn(ratio, round(col(molecule)/ col(denominator), 5)).orderBy(batchNum)result04.write.format(csv).save(_)SQL 实现 : with t1 as (selectbatchNum,count(1) as moleculefrom luckyDogsDFwhere batchNum like 2018%group by batchNum ) selectbatchNum,round(molecule/denominator, 5) from apply_denominator t2 on t1.batchNum t2.batchNum order by batchNum优化 DPP 的条件 : 事实表必须是分区表且分区字段可以是多个必须包含 Join KeyDPP 仅支持等值 Joins不支持大于、小于这种不等值关联关系维表过滤后的数据集要小于广播阈值调整 spark.sql.autoBroadcastJoinThreshold DPP 优化 : 降低事实表 applyDistinctDF 的磁盘扫描量 applyDistinctDF.select(batchNum, carNum).distinctapplyDistinct.count性能对比 : 倍率分析 倍率的分布情况 : 不同倍率下的中签人数不同倍率下的中签比例 2016 年后的不同倍率下的中签人数 : val result05_01 applyNumbersDF.join(luckyDogsDF.filter(col(batchNum) 201601).select(carNum), Seq(carNum), inner).groupBy(col(batchNum), col(carNum)).agg(count(lit(1)).alias(multiplier)).groupBy(carNum).agg(max(multiplier).alias(multiplier)).groupBy(multiplier).agg(count(lit(1)).alias(cnt)).orderBy(multiplier)result05_01.write.format(csv).save(_)with t3 as (selectbatchNum,carNum,count(1) as multiplierfrom applyNumbersDF t1 join luckyDogsDF t2 on t1.carNum t2.carNumwhere t2.batchNum 201601group by batchNum, carNum ), t4 as (selectcarNum,max(multiplier) as multiplierfrom t3group by carNum ) selectmultiplier,count(1) as cnt from t4 group by multiplier order by multiplier;优化 关联中的 Join Key 是 carNum (非分区键)所以无法用 DPP 机制优化 将大表 Join 小表 , SMJ 转 BHJ : 计算 luckyDogsDF 的内存大小确保 广播阈值利用 Spark SQL 的静态优化机制将 SMJ 转为 BHJ确保过滤后 luckyDogsDF 广播阈值利用 Spark SQL 的 AQE 机制动态将 SMJ 转为 BHJ # 静态BHJ spark.sql.autoBroadcastJoinThreshold 20MB# AQE 动态BHJ spark.sql.autoBroadcastJoinThreshold 10MB性能对比 : 计算不同倍率人群的中签比例 // Step01: 过滤出2016-2019申请者数据统计出每个申请者在每期内的倍率并在所有批次中选取 val apply_multiplier_2016_2019 applyNumbersDF.filter(col(batchNum) 201601).groupBy(col(batchNum), col(carNum)).agg(count(lit(1)).alias(multiplier)).groupBy(carNum).agg(max(multiplier).alias(multiplier)).groupBy(multiplier).agg(count(lit(1)).alias(apply_cnt))// Step02: 将各个倍率下的申请人数与各个倍率下的中签人数左关联并求出各个倍率下的中签率 val result05_02 apply_multiplier_2016_2019.join(result05_01.withColumnRenamed(cnt, lucy_cnt), Seq(multiplier), left).na.fill(0).withColumn(ratio, round(col(lucy_cnt)/ col(apply_cnt), 5)).orderBy(multiplier)result05_02.write.format(csv).save(_)SQL 实现 : with t5 as (selectbatchNum,carNumcount(1) as multiplierfrom applyNumbersDF where batchNum 201601group by batchNum, carNum ), t6 as (selectcarNum,max(multiplier) as multiplierfrom t1group by carNum ), t7 as (selectmultiplier,count(1) as apply_cntfrom t2 group by multiplier ) select multiplier,round(coalesce(lucy_cnt, 0)/ apply_cnt, 5) as ratio from t7 left left join t5 on t5.multiplier t7.multiplier order by multiplier;
http://www.tj-hxxt.cn/news/137823.html

相关文章:

  • 科技大学全国排名建网站seo
  • 深圳网站建设方案外包国外网站404错误页
  • 服饰网站建设模板荣耀手机正品官网查询
  • 常州市建设局网站电话网站管理后台制作
  • 禁止粘贴的网站wordpress文章内图片不显示不出来
  • 网站的背景图怎么做的seo搜索引擎优化技术
  • 新县住房和城乡规划建设网站专业北京网站建设公司哪家好
  • 快速建设一个网站网页设计图片加载不出
  • 网站建设ppt简介西班牙语网站建设
  • 建设一个网站怎么赚钱南宁建站模板厂家
  • 建设厅网站上的信息采集表附近哪里需要招人
  • 网站建设专业输入法合肥网站制作联系方式
  • 旅游网站首页制作wordpress 密码爆破
  • 网站营销方法有哪些内容wordpress数据库删除
  • 同一域名可以做相同网站吗网站标题 关键词 描述之间的关系
  • 90后小姑娘做网站wordpress外网访问网页异常
  • 免费做app页面的网站linux 网站开发
  • 手工做皮具国外的网站盐城seo排名
  • 连锁网站开发建设网站的目的和功能定位
  • dz做网站js不起作用o2o生鲜电商平台有哪些
  • 做化妆品注册和注册的网站吗易语言做网站图片下载
  • 邢台网站建设邢台淘宝客网站做百度竞价
  • 校园网站建设管理制度网站快速收录平台
  • 一个网站的建设需要哪些流程生活分类信息网站源码
  • 北京好一点的微网站开发公司互联网保险有哪些
  • 哪个企业做网站做跨境网站注意事项
  • 网页怎么优化网站做优化有什么用吗
  • 建设官方网站的好处和坏处wordpress设置网站导航
  • 商城建设网站策划怎么创建自己公司的网站
  • 网站建设朋友圈通江县住房和城乡建设局网站