如何建立网站数据库,青岛网站美工,网站做外链什么意思,滴滴出行app开发公司2Spark Core 1.RDD 详解1) 为什么要有 RDD?2) RDD 是什么?3) RDD 主要属性 2.RDD-API1) RDD 的创建方式2) RDD 的算子分类3) Transformation 转换算子4) Action 动作算子 3. RDD 的持久化/缓存4. RDD 容错机制 Checkpoint5. RDD 依赖关系1) 宽窄依赖2) 为什么要设计宽窄依赖 … 2Spark Core 1.RDD 详解1) 为什么要有 RDD?2) RDD 是什么?3) RDD 主要属性 2.RDD-API1) RDD 的创建方式2) RDD 的算子分类3) Transformation 转换算子4) Action 动作算子 3. RDD 的持久化/缓存4. RDD 容错机制 Checkpoint5. RDD 依赖关系1) 宽窄依赖2) 为什么要设计宽窄依赖 6. DAG 的生成和划分 Stage7. RDD 累加器和广播变量1) 累加器2) 广播变量  1.RDD 详解 
1) 为什么要有 RDD? 
在许多迭代式算法(比如机器学习、图算法等)和交互式数据挖掘中不同计算阶段之间会重用中间结果即一个阶段的输出结果会作为下一个阶段的输入。但是之前的 MapReduce 框架采用非循环式的数据流模型把中间结果写入到 HDFS 中带来了大量的数据复制、磁盘 IO 和序列化开销。且这些框架只能支持一些特定的计算模式(map/reduce)并没有提供一种通用的数据抽象。 AMP 实验室发表的一篇关于 RDD 的论文:《Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing》就是为了解决这些问题的。 RDD 提供了一个抽象的数据模型让我们不必担心底层数据的分布式特性只需将具体的应用逻辑表达为一系列转换操作(函数)不同 RDD 之间的转换操作之间还可以形成依赖关系进而实现管道化从而避免了中间结果的存储大大降低了数据复制、磁盘 IO 和序列化开销并且还提供了更多的 API(map/reduec/filter/groupBy…)。 
2) RDD 是什么? 
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集是 Spark 中最基本的数据抽象代表一个不可变、可分区、里面的元素可并行计算的集合。 单词拆解 -Resilient 它是弹性的RDD 里面的中的数据可以保存在内存中或者磁盘里面 -Distributed 它里面的元素是分布式存储的可以用于分布式计算 -Dataset: 它是一个集合可以存放很多元素。 
3) RDD 主要属性 
进入 RDD 的源码中看下  RDD源码 在源码中可以看到有对 RDD 介绍的注释我们来翻译下 1.A list of partitions  一组分片(Partition)/一个分区(Partition)列表即数据集的基本组成单位。 对于 RDD 来说每个分片都会被一个计算任务处理分片数决定并行度。 用户可以在创建 RDD 时指定 RDD 的分片个数如果没有指定那么就会采用默认值。 2.A function for computing each split  一个函数会被作用在每一个分区。 Spark 中 RDD 的计算是以分片为单位的compute 函数会被作用到每个分区上。 3.A list of dependencies on other RDDs  一个 RDD 会依赖于其他多个 RDD。 RDD 的每次转换都会生成一个新的 RDD所以 RDD 之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时Spark 可以通过这个依赖关系重新计算丢失的分区数据而不是对 RDD 的所有分区进行重新计算。(Spark 的容错机制) 4.Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) 可选项对于 KV 类型的 RDD 会有一个 Partitioner即 RDD 的分区函数默认为 HashPartitioner。 5.Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file) 可选项,一个列表存储存取每个 Partition 的优先位置(preferred location)。 对于一个 HDFS 文件来说这个列表保存的就是每个 Partition 所在的块的位置。按照移动数据不如移动计算的理念Spark 在进行任务调度的时候会尽可能选择那些存有数据的 worker 节点来进行任务计算。 
总结 RDD 是一个数据集的表示不仅表示了数据集还表示了这个数据集从哪来如何计算主要属性包括 1.分区列表 2.计算函数 3.依赖关系 4.分区函数(默认是 hash) 5.最佳位置 分区列表、分区函数、最佳位置这三个属性其实说的就是数据集在哪在哪计算更合适如何分区 计算函数、依赖关系这两个属性其实说的是数据集怎么来的。 
2.RDD-API 
1) RDD 的创建方式 
1.由外部存储系统的数据集创建包括本地的文件系统还有所有 Hadoop 支持的数据集比如 HDFS、Cassandra、HBase 等 val rdd1  sc.textFile(“hdfs://node1:8020/wordcount/input/words.txt”) 2.通过已有的 RDD 经过算子转换生成新的 RDD val rdd2rdd1.flatMap(_.split( )) 3.由一个已经存在的 Scala 集合创建 val rdd3  sc.parallelize(Array(1,2,3,4,5,6,7,8)) 或者 val rdd4  sc.makeRDD(List(1,2,3,4,5,6,7,8)) makeRDD 方法底层调用了 parallelize 方法  
2) RDD 的算子分类 
RDD 的算子分为两类: 1.Transformation转换操作:返回一个新的 RDD 2.Action动作操作:返回值不是 RDD(无返回值或返回其他的) ❣️ 注意: 1、RDD 不实际存储真正要计算的数据而是记录了数据的位置在哪里数据的转换关系(调用了什么方法传入什么函数)。 2、RDD 中的所有转换都是惰性求值/延迟执行的也就是说并不会直接计算。只有当发生一个要求返回结果给 Driver 的 Action 动作时这些转换才会真正运行。 3、之所以使用惰性求值/延迟执行是因为这样可以在 Action 时对 RDD 操作形成 DAG 有向无环图进行 Stage 的划分和并行优化这种设计让 Spark 更加有效率地运行。 
3) Transformation 转换算子 4) Action 动作算子 统计操作  
-需求 给定一个键值对 RDD 
val rdd  sc.parallelize(Array((spark,2),(hadoop,6),(hadoop,4),(spark,6)))key 表示图书名称value 表示某天图书销量 请计算每个键对应的平均值也就是计算每种图书的每天平均销量。 最终结果:(“spark”,4),(“hadoop”,5)。 
-答案 1 
val rdd  sc.parallelize(Array((spark,2),(hadoop,6),(hadoop,4),(spark,6)))
val rdd2  rdd.groupByKey()
rdd2.collect
//Array[(String, Iterable[Int])]  Array((spark,CompactBuffer(2, 6)), (hadoop,CompactBuffer(6, 4)))
rdd2.mapValues(vv.sum/v.size).collect
Array[(String, Int)]  Array((spark,4), (hadoop,5))-答案 2 
val rdd  sc.parallelize(Array((spark,2),(hadoop,6),(hadoop,4),(spark,6)))
val rdd2  rdd.groupByKey()
rdd2.collect
//Array[(String, Iterable[Int])]  Array((spark,CompactBuffer(2, 6)), (hadoop,CompactBuffer(6, 4)))val rdd3  rdd2.map(t(t._1,t._2.sum /t._2.size))
rdd3.collect
//Array[(String, Int)]  Array((spark,4), (hadoop,5))3. RDD 的持久化/缓存 
在实际开发中某些 RDD 的计算或转换可能会比较耗费时间如果这些 RDD 后续还会频繁的被使用到那么可以将这些 RDD 进行持久化/缓存这样下次再使用到的时候就不用再重新计算了提高了程序运行的效率。 
val rdd1  sc.textFile(hdfs://node01:8020/words.txt)
val rdd2  rdd1.flatMap(xx.split( )).map((_,1)).reduceByKey(__)
rdd2.cache //缓存/持久化
rdd2.sortBy(_._2,false).collect//触发action,会去读取HDFS的文件,rdd2会真正执行持久化
rdd2.sortBy(_._2,false).collect//触发action,会去读缓存中的数据,执行速度会比之前快,因为rdd2已经持久化到内存中了持久化/缓存 API 详解 -ersist 方法和 cache 方法 RDD 通过 persist 或 cache 方法可以将前面的计算结果缓存但是并不是这两个方法被调用时立即缓存而是触发后面的 action 时该 RDD 将会被缓存在计算节点的内存中并供后面重用。 通过查看 RDD 的源码发现 cache 最终也是调用了 persist 无参方法(默认存储只存在内存中)  RDD源码 
存储级别 默认的存储级别都是仅在内存存储一份Spark 的存储级别还有好多种存储级别在 object StorageLevel 中定义的。  总结 1.RDD 持久化/缓存的目的是为了提高后续操作的速度 2.缓存的级别有很多默认只存在内存中,开发中使用 memory_and_disk 3.只有执行 action 操作的时候才会真正将 RDD 数据进行持久化/缓存 4.实际开发中如果某一个 RDD 后续会被频繁的使用可以将该 RDD 进行持久化/缓存 
4. RDD 容错机制 Checkpoint 
-持久化的局限 持久化/缓存可以把数据放在内存中虽然是快速的但是也是最不可靠的也可以把数据放在磁盘上也不是完全可靠的例如磁盘会损坏等。 -问题解决 Checkpoint 的产生就是为了更加可靠的数据持久化在 Checkpoint 的时候一般把数据放在在 HDFS 上这就天然的借助了 HDFS 天生的高容错、高可靠来实现数据最大程度上的安全实现了 RDD 的容错和高可用。 用法 
SparkContext.setCheckpointDir(目录) //HDFS的目录RDD.checkpoint-总结 -开发中如何保证数据的安全性性及读取效率 可以对频繁使用且重要的数据先做缓存/持久化再做 checkpint 操作。 -持久化和 Checkpoint 的区别 1.位置 Persist 和 Cache 只能保存在本地的磁盘和内存中(或者堆外内存–实验中) Checkpoint 可以保存数据到 HDFS 这类可靠的存储上。 2.生命周期 Cache 和 Persist 的 RDD 会在程序结束后会被清除或者手动调用 unpersist 方法 Checkpoint 的 RDD 在程序结束后依然存在不会被删除。 
5. RDD 依赖关系 
1) 宽窄依赖 
-两种依赖关系类型 RDD 和它依赖的父 RDD 的关系有两种不同的类型即 宽依赖(wide dependency/shuffle dependency) 窄依赖(narrow dependency)  图解  宽窄依赖 -如何区分宽窄依赖 窄依赖:父 RDD 的一个分区只会被子 RDD 的一个分区依赖 宽依赖:父 RDD 的一个分区会被子 RDD 的多个分区依赖(涉及到 shuffle)。 
2) 为什么要设计宽窄依赖 
1.对于窄依赖 窄依赖的多个分区可以并行计算 窄依赖的一个分区的数据如果丢失只需要重新计算对应的分区的数据就可以了。 2.对于宽依赖 划分 Stage(阶段)的依据:对于宽依赖,必须等到上一阶段计算完成才能计算下一阶段。 
6. DAG 的生成和划分 Stage 
DAG 介绍 -DAG 是什么 DAG(Directed Acyclic Graph 有向无环图)指的是数据转换执行的过程有方向无闭环(其实就是 RDD 执行的流程) 原始的 RDD 通过一系列的转换操作就形成了 DAG 有向无环图任务执行时可以按照 DAG 的描述执行真正的计算(数据被操作的一个过程)。 -DAG 的边界 开始:通过 SparkContext 创建的 RDD 结束:触发 Action一旦触发 Action 就形成了一个完整的 DAG。DAG 划分 Stage  DAG划分Stage 一个 Spark 程序可以有多个 DAG(有几个 Action就有几个 DAG上图最后只有一个 Action图中未表现,那么就是一个 DAG)。 一个 DAG 可以有多个 Stage(根据宽依赖/shuffle 进行划分)。 同一个 Stage 可以有多个 Task 并行执行(task 数分区数如上图Stage1 中有三个分区 P1、P2、P3对应的也有三个 Task)。 可以看到这个 DAG 中只 reduceByKey 操作是一个宽依赖Spark 内核会以此为边界将其前后划分成不同的 Stage。 同时我们可以注意到在图中 Stage1 中从 textFile 到 flatMap 到 map 都是窄依赖这几步操作可以形成一个流水线操作通过 flatMap 操作生成的 partition 可以不用等待整个 RDD 计算结束而是继续进行 map 操作这样大大提高了计算的效率。 
-为什么要划分 Stage? --并行计算 一个复杂的业务逻辑如果有 shuffle那么就意味着前面阶段产生结果后才能执行下一个阶段即下一个阶段的计算要依赖上一个阶段的数据。那么我们按照 shuffle 进行划分(也就是按照宽依赖就行划分)就可以将一个 DAG 划分成多个 Stage/阶段在同一个 Stage 中会有多个算子操作可以形成一个 pipeline 流水线流水线内的多个平行的分区可以并行执行。 
-如何划分 DAG 的 stage 对于窄依赖partition 的转换处理在 stage 中完成计算不划分(将窄依赖尽量放在在同一个 stage 中可以实现流水线计算)。 对于宽依赖由于有 shuffle 的存在只能在父 RDD 处理完成后才能开始接下来的计算也就是说需要要划分 stage。 
总结 Spark 会根据 shuffle/宽依赖使用回溯算法来对 DAG 进行 Stage 划分从后往前遇到宽依赖就断开遇到窄依赖就把当前的 RDD 加入到当前的 stage/阶段中 具体的划分算法请参见 AMP 实验室发表的论文 《Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing》 http://xueshu.baidu.com/usercenter/paper/show?paperidb33564e60f0a7e7a1889a9da10963461sitexueshu_se 
7. RDD 累加器和广播变量 
在默认情况下当 Spark 在集群的多个不同节点的多个任务上并行运行一个函数时它会把函数中涉及到的每个变量在每个任务上都生成一个副本。但是有时候需要在多个任务之间共享变量或者在任务(Task)和任务控制节点(Driver Program)之间共享变量。 为了满足这种需求Spark 提供了两种类型的变量 1.累加器 accumulators累加器支持在所有不同节点之间进行累加计算(比如计数或者求和)。 2.广播变量 broadcast variables广播变量用来把变量在所有节点的内存之间进行共享在每个机器上缓存一个只读的变量而不是为机器上的每个任务都生成一个副本。 
1) 累加器 
不使用累加器 
var counter  0
val data  Seq(1, 2, 3)
data.foreach(x  counter  x)
println(Counter value:  counter)运行结果 
Counter value: 6如果我们将 data 转换成 RDD再来重新计算 
var counter  0
val data  Seq(1, 2, 3)
var rdd  sc.parallelize(data)
rdd.foreach(x  counter  x)
println(Counter value:  counter)运行结果 
Counter value: 0使用累加器 通常在向 Spark 传递函数时比如使用 map() 函数或者用 filter() 传条件时可以使用驱动器程序中定义的变量但是集群中运行的每个任务都会得到这些变量的一份新的副本更新这些副本的值也不会影响驱动器中的对应变量。这时使用累加器就可以实现我们想要的效果: val xx: Accumulator[Int]  sc.accumulator(0)代码示例 
import org.apache.spark.rdd.RDD
import org.apache.spark.{Accumulator, SparkConf, SparkContext}object AccumulatorTest {def main(args: Array[String]): Unit  {val conf: SparkConf  new SparkConf().setAppName(wc).setMaster(local[*])val sc: SparkContext  new SparkContext(conf)sc.setLogLevel(WARN)//使用scala集合完成累加var counter1: Int  0;var data  Seq(1,2,3)data.foreach(x  counter1  x )println(counter1)//6println()//使用RDD进行累加var counter2: Int  0;val dataRDD: RDD[Int]  sc.parallelize(data) //分布式集合的[1,2,3]dataRDD.foreach(x  counter2  x)println(counter2)//0//注意上面的RDD操作运行结果是0//因为foreach中的函数是传递给Worker中的Executor执行,用到了counter2变量//而counter2变量在Driver端定义的,在传递给Executor的时候,各个Executor都有了一份counter2//最后各个Executor将各自个x加到自己的counter2上面了,和Driver端的counter2没有关系//那这个问题得解决啊!不能因为使用了Spark连累加都做不了了啊!//如果解决?---使用累加器val counter3: Accumulator[Int]  sc.accumulator(0)dataRDD.foreach(x  counter3  x)println(counter3)//6}
} 
2) 广播变量 
不使用广播变量使用广播变量代码示例 关键词sc.broadcast() 
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object BroadcastVariablesTest {def main(args: Array[String]): Unit  {val conf: SparkConf  new SparkConf().setAppName(wc).setMaster(local[*])val sc: SparkContext  new SparkContext(conf)sc.setLogLevel(WARN)//不使用广播变量val kvFruit: RDD[(Int, String)]  sc.parallelize(List((1,apple),(2,orange),(3,banana),(4,grape)))val fruitMap: collection.Map[Int, String] kvFruit.collectAsMap//scala.collection.Map[Int,String]  Map(2 - orange, 4 - grape, 1 - apple, 3 - banana)val fruitIds: RDD[Int]  sc.parallelize(List(2,4,1,3))//根据水果编号取水果名称val fruitNames: RDD[String]  fruitIds.map(xfruitMap(x))fruitNames.foreach(println)//注意:以上代码看似一点问题没有,但是考虑到数据量如果较大,且Task数较多,//那么会导致,被各个Task共用到的fruitMap会被多次传输//应该要减少fruitMap的传输,一台机器上一个,被该台机器中的Task共用即可//如何做到?---使用广播变量//注意:广播变量的值不能被修改,如需修改可以将数据存到外部数据源,如MySQL、Redisprintln()val BroadcastFruitMap: Broadcast[collection.Map[Int, String]]  sc.broadcast(fruitMap)val fruitNames2: RDD[String]  fruitIds.map(xBroadcastFruitMap.value(x))fruitNames2.foreach(println)}
}
 文章转载自: http://www.morning.wgbmj.cn.gov.cn.wgbmj.cn http://www.morning.khpgd.cn.gov.cn.khpgd.cn http://www.morning.qwmpn.cn.gov.cn.qwmpn.cn http://www.morning.tzlfc.cn.gov.cn.tzlfc.cn http://www.morning.ntzfj.cn.gov.cn.ntzfj.cn http://www.morning.lqws.cn.gov.cn.lqws.cn http://www.morning.nrqnj.cn.gov.cn.nrqnj.cn http://www.morning.pdghl.cn.gov.cn.pdghl.cn http://www.morning.kxxld.cn.gov.cn.kxxld.cn http://www.morning.mzcrs.cn.gov.cn.mzcrs.cn http://www.morning.jkszt.cn.gov.cn.jkszt.cn http://www.morning.zfwjh.cn.gov.cn.zfwjh.cn http://www.morning.krjyq.cn.gov.cn.krjyq.cn http://www.morning.lhyhx.cn.gov.cn.lhyhx.cn http://www.morning.pxdgy.cn.gov.cn.pxdgy.cn http://www.morning.cqrenli.com.gov.cn.cqrenli.com http://www.morning.fengnue.com.gov.cn.fengnue.com http://www.morning.tkyry.cn.gov.cn.tkyry.cn http://www.morning.hxpsp.cn.gov.cn.hxpsp.cn http://www.morning.routalr.cn.gov.cn.routalr.cn http://www.morning.fbjqq.cn.gov.cn.fbjqq.cn http://www.morning.qzpkr.cn.gov.cn.qzpkr.cn http://www.morning.mgtmm.cn.gov.cn.mgtmm.cn http://www.morning.fhcwm.cn.gov.cn.fhcwm.cn http://www.morning.c7624.cn.gov.cn.c7624.cn http://www.morning.rkrcd.cn.gov.cn.rkrcd.cn http://www.morning.kwz6232.cn.gov.cn.kwz6232.cn http://www.morning.wflpj.cn.gov.cn.wflpj.cn http://www.morning.gjlml.cn.gov.cn.gjlml.cn http://www.morning.rbjp.cn.gov.cn.rbjp.cn http://www.morning.trqsm.cn.gov.cn.trqsm.cn http://www.morning.rgwrl.cn.gov.cn.rgwrl.cn http://www.morning.slkqd.cn.gov.cn.slkqd.cn http://www.morning.mfct.cn.gov.cn.mfct.cn http://www.morning.rqlqd.cn.gov.cn.rqlqd.cn http://www.morning.mtrz.cn.gov.cn.mtrz.cn http://www.morning.rmqlf.cn.gov.cn.rmqlf.cn http://www.morning.kwcnf.cn.gov.cn.kwcnf.cn http://www.morning.ctfwl.cn.gov.cn.ctfwl.cn http://www.morning.jyznn.cn.gov.cn.jyznn.cn http://www.morning.mhnb.cn.gov.cn.mhnb.cn http://www.morning.lqlfj.cn.gov.cn.lqlfj.cn http://www.morning.jwbfj.cn.gov.cn.jwbfj.cn http://www.morning.mhnxs.cn.gov.cn.mhnxs.cn http://www.morning.sjqml.cn.gov.cn.sjqml.cn http://www.morning.mftdq.cn.gov.cn.mftdq.cn http://www.morning.qhrdx.cn.gov.cn.qhrdx.cn http://www.morning.nffwl.cn.gov.cn.nffwl.cn http://www.morning.sglcg.cn.gov.cn.sglcg.cn http://www.morning.srbbh.cn.gov.cn.srbbh.cn http://www.morning.knqzd.cn.gov.cn.knqzd.cn http://www.morning.jmdpp.cn.gov.cn.jmdpp.cn http://www.morning.xscpq.cn.gov.cn.xscpq.cn http://www.morning.yqgny.cn.gov.cn.yqgny.cn http://www.morning.rxtxf.cn.gov.cn.rxtxf.cn http://www.morning.lbrrn.cn.gov.cn.lbrrn.cn http://www.morning.trqsm.cn.gov.cn.trqsm.cn http://www.morning.fkgct.cn.gov.cn.fkgct.cn http://www.morning.syznh.cn.gov.cn.syznh.cn http://www.morning.sqmlw.cn.gov.cn.sqmlw.cn http://www.morning.jtkfm.cn.gov.cn.jtkfm.cn http://www.morning.sthp.cn.gov.cn.sthp.cn http://www.morning.jcfdk.cn.gov.cn.jcfdk.cn http://www.morning.nldsd.cn.gov.cn.nldsd.cn http://www.morning.rwpjq.cn.gov.cn.rwpjq.cn http://www.morning.nrqtk.cn.gov.cn.nrqtk.cn http://www.morning.pmnn.cn.gov.cn.pmnn.cn http://www.morning.cpwmj.cn.gov.cn.cpwmj.cn http://www.morning.kehejia.com.gov.cn.kehejia.com http://www.morning.tngdn.cn.gov.cn.tngdn.cn http://www.morning.wcqkp.cn.gov.cn.wcqkp.cn http://www.morning.srnhk.cn.gov.cn.srnhk.cn http://www.morning.rdkqt.cn.gov.cn.rdkqt.cn http://www.morning.hyhzt.cn.gov.cn.hyhzt.cn http://www.morning.rbffj.cn.gov.cn.rbffj.cn http://www.morning.thzwj.cn.gov.cn.thzwj.cn http://www.morning.jokesm.com.gov.cn.jokesm.com http://www.morning.trwkz.cn.gov.cn.trwkz.cn http://www.morning.snxbf.cn.gov.cn.snxbf.cn http://www.morning.qtzqk.cn.gov.cn.qtzqk.cn