网站站长统计怎么做,pv3d 优秀网站,做网站服装app,网络广告策划的流程顺序为1: Spark 整体架构
Spark 是新一代的大数据处理引擎#xff0c;支持批处理和流处理#xff0c;也还支持各种机器学习和图计算#xff0c;它就是一个Master-worker 架构#xff0c;所以整个的架构就如下所示#xff1a;
2: Spark 任务提交命令 一般我们使用shell 命令提…1: Spark 整体架构
Spark 是新一代的大数据处理引擎支持批处理和流处理也还支持各种机器学习和图计算它就是一个Master-worker 架构所以整个的架构就如下所示
2: Spark 任务提交命令 一般我们使用shell 命令提交命令如下
./bin/spark-submit \--master yarn \--deploy-mode cluster \--driver-cores 2 \--driver-memory 1g \--executor-cores 4 \--num-executors 10 \--executor-memory 8g \--class PackageName.ClassName XXXX.jar \--name Spark Job Name \driver 和executor 就是对应的spark的 driver 和executor 的配置然后再指定个部署模式master 就可以了。
3: spark提交任务的流程 如上所示我们在提交spark 任务的时候有个参数可供选择模式就是deploy-mode 的参数它的值有client 和cluster 之分client 说白了就是driver 在客户端任务的分配和调度都在客户端这种只适合用于测试毕竟一旦流量大了的话客户端是顶不住的啊而cluster 模式是spark对应的driver 和executor 都是在yarn集群上相对来说稳定我们这里只着重说yarn-cluster 的提交流程
3.1: 首先如上面的第二点所示我们先提交启动命令 3.2: 然后客户端这边先运行提交的jar包里面的main方法 3.3: 接着和RM 通信告诉RM 启动ApplicationMaster./bin/ApplicatioMaster 3.4: RM 随机选择一台NM 准备启动AM 3.5: 在AM 里面启动driver然后让driver 进行SparkContext 的初始化以及进行任务的切分 3.6AM 再向RM 申请资源 3.7: 申请资源后AM 就启动ExecutionBackend, 3.8: ExecutionBackend 启动 Executors 3.9Driver 把任务分配给Executors cluster 流程图如下所示 client 流程图如下所示
4: Spark 中的RDD 4.1: RDD 概念 还是老样子先理解一个东西之前我们看看它的学名是什么Resilient Distribute DataSet 就是弹性分布式数据集说白了它的本质还是数据的集合MR 和Flink的输入是一条条数据这Spark 说我乖点了输入我就先把它们整成一个个数据集哈哈哈符合预先聚合的思想。说白了RDD 就是三个层次的抽象Dataset, partition, 以及record对应到生活中的例子就是班级组以及单个同学
4.2RDD 的特点是啥从学名可以窥见一二弹性分布式 我们先说弹性说白了就是灵活可以容错它是怎么容错的呢就是RDD之间互相有依赖关系如果某个RDD的分区没有了可以从原始数据和依赖关系得来不用让其他分区重算 那么分布式呢就是一个RDD的分区数据可以分发到不同的节点上进行计算靠近数据的计算嘛 一个RDD的分区数可以自定义如果没有自定义则和hdfs的block数一样或者和cpu的核数成比例关系一核心大概可以处理2-4个分区。
4.3: RDD 之间的血缘关系 血缘血缘套到数据集这边就知道是上下游RDD的关系了这里的血缘指的就是子RDD和父RDD 之间是分区之间一对一还是多对一说白了如果一个父RDD的一个分区的数据给到了子RDD的多个分区这明摆着宽广嘛就是宽依赖否则就是摘依赖一对一的关系如下所示 4.4: RDD 之间的stage 划分 这里就有个问题了为什么要进行stage的划分呢? 大家可以想想这个spark的RDD 也是基于内存的计算如果一个任务一直这样计算下去比如算到90%的时候机器突然宕机了任务全部失败了是不是又得重新计算所以我们就要进行个中间数据的备份嘛比如算完了一个stage的数据先把它存储到外部系统这样就不怕任务突然中断了
所以大家是不是结合上面的宽窄依赖就想到了如果出现了宽依赖我们就截断划分一个stage毕竟可以先保存一份数据嘛可能一个子RDD有多个父RDD每个父RDD就自己先把数据保存起来做个备份嘛
所以我们stage 划分就是根据宽窄依赖去划分的碰到宽依赖就划分一个stagestage 里面的算子和数据自成一个天地。
4.5: spark的编程思想之RDD 所以兄弟们可以看到说白了我们进行spark的编程的时候就是基于RDD的然后和算子一起把RDD当作点这些算子当成边不就构建成了我们的这个有向无环图(DAG) 嘛没有相互依赖的RDD 进行一个并行计算有相互依赖的窄依赖的类型也可以进行并行计算当碰到宽依赖的时候就要进行数据打散了不就是shuffle嘛最后算好的数据落盘就行了。
5: Spark中groupByKeyreduceByKey, combineByKey, aggregrateByKey的区别
怎么说呢其实这些算子都是针对数据做聚合的操作groupByKey 和reduceByKey 没有定义初始值的结构groupByKey 默认用的hash 分区而reduceByKey 可以自定义分区也会提前进行combine这两个算子分区内的逻辑和分区间的逻辑都是一致的 而aggregrateByKey 和 combineByKey 和 则在reduceByKey的基础上做了些变化aggregrateByKey 是自定义分区内和分区间的逻辑而combineByKey 是在上述的基础上也增加一个初始值自定义。
6: spark中的hashShuffle,sortShuffle, 以及优化后的这两个变种
6.1: 普通的hashShuffle 说白了就是在shuffle的过程中使用hash对key 进行分组假如上游有100个类似的mapTask下游有100个reduce task每个mapTask 会针对下游生成100个文件总共就是10000个小文件了这个对于集群来说负担就太大了如下所示
6.2: 优化后的HashShuffle 上述的普通版产生的小文件个数太多了所以我们需要优化下优化的重点思路就是复用既然每个上游的task都要生成这么多文件可不可以一个Executor 里面的缓冲区复用呢答案是可以的spark 官方就根据cpu core 的数量 * 下一个stage的task个数来确定缓冲区数量以及文件个数相对来说少了挺多的原先要10000个现在可能只要5*100个就可以了。哈哈哈不过这个优化机制要通过spark.shuffle.consolidateFilestrue 开启
6.3: sortShuffle sortShuffle 是什么呢说白了就是在shuffle 之前先进行排序然后也会有溢写到磁盘可能会生成多个文件但是最终每个task会对所有的文件进行合并最终只生成一个文件和文件对应的索引让下游的task 根据索引文件去找数据拉取数据。如下所示 这个对应的文件更少了对下游的压力也更小了。
6.4: byPassSortShuffle 这个和上面的sortShuffle 如出一辙唯一的区别就是在写文件的时候不会进行排序省去了这部分的开销不过这个需要触发的点有两个 6.4.1: shuffle map task 数量小于spark.shuffle.sort.bypassMergeThreshold200参数的值。 6.4.2: 不是聚合类的 shuffle 算子
兄弟们最终是不是还想问问现阶段该怎么启用sortShufflespark2.x 已经把所有的shuffle都默认成了sortShuffle了。
7: spark中的cache,persist 和checkpoint 的区别 三者都是把数据持久化的前面两个是把数据缓存到内存而checkpoint 是吧数据存到hdfs 或者本地文件前者不会截断血缘关系后者会截断毕竟hdfs副本可以容错cache 实际上就是调用的persist
8: RDD , DataFrame, 以及DataSet 的区别
8.1: RDD 就是我们刚刚说的弹性分布式数据集嘛是面向对象的是需要序列化和反序列化的在网络IO中性能消耗大是不支持Spark Sql的但是它是编译类型安全的
8.2: DataFrame 怎么说呢就是相当于一个表类似于python 和R它只知道表头只知道下面的Name,age,Height不知道具体每个字段对应的数据类型它就相当于表里的v如下所示 所以它是编译时类型不安全的不过它有个优点它的存储是放在java的堆外内存不用进行GC了也不用进行序列化和反序列化带来的开销了
8.3: 这就是天生为sql 创建的数据集了它就相当于sql里面的表既知道表的字段头也知道每个表的字段的数据类型而且它还支持序列化的时候轻量的序列化它结合了DF 和 RDD的优点编译时也是类型安全的并且带来了一个全新的概念Encode它可以保证按需序列化数据了。
9: SparkShuffle 的优化 上一步我们讲了spark上游MapTask生成小文件的优化那么接下来我们也可以看看sparkReduce的这个Reduce 端的优化。 9.1: 既然要优化reducer端首先能不能让每次buffer存储的数据量多点就节省了reduce 拉取的个数这个值通过spark.shuffle.file.buffer 来控制一般32k可以是64k 9.2: 既然要拉取的数据量少点是不是可以提高每批次拉取的数据量呢那就是spark.reducer.maxSizeInFlight 的值控制的一般是48M可以到96M
10. sparkStreaming 和kafka的连接方式 说白了这个问题就是问的sparkStreaming 如何从kafka 拉取数据的 10.1Receiver 方式这个是Spark 1.5 之前的版本说白了就是Executor 里面有个receiver的组件它负责和kafka创建连接并且使用kafka的高阶api 去和kafka 通信从kafka 消费过来的数据基本上是先放在内存中然后供spark的executor 去做其他处理所以数据量激增的时候有可能导致内存炸掉 毕竟数据放在内存中很有可能丢失是吧所以它做了一个checkPoint会对每个批次的数据做个备份学名叫做WALWrite ahead log 机制就是会把数据备份到分布式文件系统比如hdfs上面 10.2Direct 方式直接每次轮询kafka的partition的数据有几个partition 这边也就轮询几次使用kafka的低阶的api自己保存每个partition的offset自己维护保证了可以和kafka的同步可以做到数据精确消费一次 生产环境一般都用DIrect 方式毕竟不用checkpoint不用降低数据处理速率而且可以精准消费一次不存在重复消费的可能。 说白了就是Receiver 是被动的可能会被kafka的数据给干爆Direct 是主动的自己掌握offset ,去主动消费
11sparkStreaming 窗口函数的原理 因为sparkStreaming 本质上是微批数据当作实时数据如下所示
val ssc new StreamingContext(sc,Seconds(5))//5表示5秒中对数据进行切分形成一个RDD 所以窗口数据也是微批的合集窗口一定要是微批的整数倍这样才能算是可以统计出来的有效窗口
12: spark 常见算子解释
12.1: mapToPair, 说白了就是把原来的数据转换成关系对的形式不管之前的数据怎么样转换之后的数据就是pair对的形式比如 tupleString,Long 12.2: transform, tranformToPair, 都是基于sparkStreaming 的流式数据结构上的算子transform可能就是返回一个流式的数据Dstream, 里面的数据单位是Object; 而tranformToPair 返回的是带有pair 对的流是数据。 12.3: spark 编程最关键的是你要知道你的输入是什么你要输出什么是tuple呢还是String呢还是其他数据呢RDD里面装载的基础数据结构要知道
12.3: 自定义排序类就是要实现Ordered 以及 Serialziable 接口定义好大于小于compreTo的逻辑 自定义的accumulator 说白了就是一个累加器继承实现AccumulatorParam的方法最关键的是要知道这个累加器的逻辑是什么对哪个值累加在addAccumulator 中实现这个累加的逻辑 自定义的聚合函数也就是udaf的function , 是要继承UserDefinedAggregateFunction的类最重要的是要做三件事初始化传进来的值在update方法里面对每个分区的值做业务判断处理; 在merge 方法里面对所有分区的值做个融合判断处理最终在buffer中更新
13:spark电商项目的核心
13.1: 首先从业务方面可以分为session访问指标计算top10 session页面转换率计算点击/下单/支付广告相关的指标计算(top10 广告省份广告)订单指标(省份销量高的产品属性销量高的产品) 的相关统计计算产品相关(各区域内产品点击次数) 的指标计算核心点是和产品经理核对好各个指标的计算方法 13.2: 大数据大数据最重要的关注点就是我们要关注性能因为好的性能和差的性能跑任务的时间可以相差到几个甚至十几个小时所以我们在编写代码的时候就要尽可能考虑到这些点从系统可用资源方面代码层面数据倾斜和组成方面进行考虑尽量达到最大的性能尽量考虑稳定性。
13.3: spark调优 13.3.1: 算子调优 13.3.2: shuffle 调优 13.3.3: 资源调优 13.3.4: jvm 调优降低cache 内存的比例
14: Spark 内存管理机制 14.1: 静态内存变为动态内存统一内存管理了分成了Storage Memory 和Execution Memory 分别是1:1 的关系存储数据存储shuffle过程中的数据