当前位置: 首页 > news >正文

重庆优化网站百度购物平台

重庆优化网站,百度购物平台,购买网站需要注意什么,eclipse怎么做网页目录 一、基本算子 1.map算子 2.flatMap算子 3.filter算子 4.foreach算子 5.saveAsTextFile算子 6.redueceByKey算子 二、常用Transformation算子 1.mapValues算子 2.groupBy算子 3.distinct算子 4.union算子 5.join算子 6.intersection算子 7.glom算子 8.groupByKey算…目录 一、基本算子 1.map算子 2.flatMap算子 3.filter算子 4.foreach算子 5.saveAsTextFile算子 6.redueceByKey算子 二、常用Transformation算子        1.mapValues算子 2.groupBy算子 3.distinct算子 4.union算子 5.join算子 6.intersection算子 7.glom算子 8.groupByKey算子 9.sortBy算子 10.sortByKey算子 三、常用Action算子 1.countByKey算子 2.collect算子 3.reduce算子 4.takeSample算子 5.takeOrdered算子 四、分区操作算子 1.mapPartitions算子 2.foreachPartition算子 3.partitionBy算子 4.repartition算子和coalesce算子 一、基本算子 RDD中map、filter、flatMap及foreach等函数为最基本算子都是都RDD中每个元素进行操作将元素传递到函数中进行转换。 1.map算子 map(f:TU): RDD[T]RDD[U],表示将RDD经由某一函数f后转变为另一个RDD。 功能map算子是将RDD的数据一条条处理处理的逻辑 基于map算子中接受的处理函数返回新的RDD。 #cording:utf-8 from pyspark import SparkConf,SparkContextif __name__ __main__:# 构建SparkContext对象conf SparkConf().setAppName(test).setMaster(local[*])sc SparkContext(confconf)rdd sc.parallelize([1,2,3,4,5,6],3)# 定义方法作为算子的传入函数体def add(data):return data * 10print(rdd.map(add).collect())# 更简单的方式 是定义lambda表达式来写匿名函数print(rdd.map(lambda data:data * 10).collect())对于算子的接受函数来说两种方法都可以lambda表达式 适用于 一行代码就搞定的函数体如果是多行需要定义独立的方法2.flatMap算子 flatMap(f:TSeq[U]): RDD[T]RDD[U])表示将RDD经由某一函数f后转变为一个新的 RDD但是与map 不同RDD中的每一个元素会被映射成新的0到多个元素(f 函数返回的是一个序列Seq。 功能对RDD执行map操作然后进行解除嵌套操作。 #cording:utf-8 from pyspark import SparkConf,SparkContextif __name__ __main__:# 构建SparkContext对象conf SparkConf().setAppName(test).setMaster(local[*])sc SparkContext(confconf)rdd sc.parallelize([hadoop hadoop spark,spark hadoop hadoop,hadoop flink spark])#得到所有的单词组成RDDrdd2 rdd.map(lambda line: line.split( ))rdd3 rdd.flatMap(lambda line: line.split( ))print(rdd2.collect())print(rdd3.collect()) 3.filter算子 filter(f.TBool): RDD[T]RDD[T]表示将 RDD经由某一函数f后只保留f返回True的数据组成新的RDD。 功能过滤想要的数据进行保留返回值是True的数据保留返回值是False的数据则会被丢弃。 #corfding:utf8 from pyspark import SparkConf,SparkContext if __name__ __main__:conf SparkConf().setAppName(test).setMaster(local[*])sc SparkContext(confconf)# 通过filter算子过滤奇数rdd sc.parallelize((1,2,3,4,5,6,7,8,9,10))result_rdd rdd.filter(lambda x: x % 2 1)print(result_rdd.collect()) 4.foreach算子 foreach(func)将函数 func应用在数据集的每一个元素上通常用于更新一个累加器或者和外部存储系统进行交互例如 Redis。 功能对RDD的每一个元素执行你提供的逻辑的操作和map一个意思但是这个方法没有返回值。 ps该算子是分区(Executor)直接执行的跳过Driver由分区所在的Executor直接执行。 #cording:utf8 from pyspark import SparkConf,SparkContext if __name__ __main__:conf SparkConf().setMaster(local[*]).setAppName(test)sc SparkContext(confconf)rdd sc.parallelize([1, 5, 4, 2, 3, 6])print(rdd.foreach(lambda x: 10 * x))print(----------------------------------)print(rdd.foreach(lambda x: print(10 * x)))5.saveAsTextFile算子 saveAsTextFile(path:String)数据集内部的元素会调用其 toString方法转换为字符串形式然后根据传入的路径保存成文本文件既可以是本地文件系统也可以是HDFS等。 ps该算子是分区(Executor)直接执行的跳过Driver由分区所在的Executor直接执行。 #cording:utf8 from pyspark import SparkConf,SparkContext if __name__ __main__:conf SparkConf().setMaster(local[*]).setAppName(test)sc SparkContext(confconf)rdd sc.parallelize([1, 5, 4, 2, 3, 6])rdd.saveAsTextFile(hdfs://pyspark01/output/out1) 6.redueceByKey算子 功能针对KV型RDD自动按照key分组然后根据你提供的聚合逻辑完成组内数据value的聚合操作。 #cording:utf-8 from pyspark import SparkConf,SparkContextif __name__ __main__:# 构建SparkContext对象conf SparkConf().setAppName(test).setMaster(local[*])sc SparkContext(confconf)rdd sc.parallelize([(a,1),(b,1),(a,1),(a,1),(b,1),(c,1),(a,1)])#使用reduceByKey函数进行聚合reduce_rdd rdd.reduceByKey(lambda a,b : a b).collect()print(聚合结果,reduce_rdd) 二、常用Transformation算子        1.mapValues算子 功能针对二元元组RDD对其内部的二元元组的Value执行map操作。 #cording:utf-8 from pyspark import SparkConf,SparkContextif __name__ __main__:# 构建SparkContext对象conf SparkConf().setAppName(test).setMaster(local[*])sc SparkContext(confconf)rdd sc.parallelize([(a,2),(b,11),(a,1)])#使用map函数map_rdd rdd.map(lambda x: (x[0],x[1]*10)).collect()print(结果,map_rdd)# 使用mapValue函数value_rdd rdd.mapValues(lambda value: value*10).collect()print(结果,value_rdd) 2.groupBy算子 功能将RDD数据进行分组。 #cording:utf8from pyspark import SparkConf,SparkContextif __name__ __main__:conf SparkConf().setMaster(local[*]).setAppName(test)sc SparkContext(confconf)# 创建数据test_rdd sc.parallelize([(a,1),(b,1),(a,2),(b,2),(b,3)])# 通过groupBy函数对数据进行分组# groupBy函数传入函数的意思是通过这个函数来确定按照谁来分组返回谁即可# 分组规则和SQL一致也就是相同的在同一个组Hash分组result_1 test_rdd.groupBy(lambda t: t[0])result_2 result_1.map(lambda t: (t[0],list(t[1])))print(result_1.collect())print(result_2.collect()) 3.distinct算子 功能对RDD数据进行去重复返回新的RDD。 #cording:utf8from pyspark import SparkConf,SparkContextif __name__ __main__:conf SparkConf().setAppName(test).setMaster(local[*])sc SparkContext(confconf)rdd_1 sc.parallelize((1,2,1,2,3,4,5,6))rdd_2 sc.parallelize([(a,1),(b,1),(a,1),(a,1),(b,1),(c,1),(a,1)])# 使用distinct算子进行去重print(数字:,rdd_1.distinct().collect())print(元组:,rdd_2.distinct().collect()) 4.union算子 功能将两个RDD合并成一个RDD返回。只合并不去重RDD的类型不同也是可以合并的。 #corfding:utf8 from pyspark import SparkConf,SparkContext if __name__ __main__:conf SparkConf().setAppName(test).setMaster(local[*])sc SparkContext(confconf)# 通过union算子合并RDDrdd_1 sc.parallelize((1,2,3,4,5))rdd_2 sc.parallelize((6,7,8,9,10))print(rdd_1.union(rdd_2).collect()) 5.join算子 功能对两个RDD执行join操作可实现SQL外/内连接join算子只能用于二元元组。 #corfding:utf8 from pyspark import SparkConf,SparkContext if __name__ __main__:conf SparkConf().setAppName(test).setMaster(local[*])sc SparkContext(confconf)rdd1 sc.parallelize([(1001,zhangsan),(1002,lisi),(1003,wangwu),(1004,zhaoliu)])rdd2 sc.parallelize([(1001,销售部),(1002,科技部)])# 通过join算子来进行rdd之间的关联# 对于join算子来说关联条件按照二元元组的key来进行关联print(rdd1.join(rdd2).collect())# 左外连接右外连接可以更换一下rdd的顺序或者调用rightOuterJoin即可print(rdd1.leftOuterJoin(rdd2).collect()) 6.intersection算子 功能:求两个RDD的交集返回一个新的RDD。 #corfding:utf8 from pyspark import SparkConf,SparkContext if __name__ __main__:conf SparkConf().setAppName(test).setMaster(local[*])sc SparkContext(confconf)rdd1 sc.parallelize([(a,1),(b,3)])rdd2 sc.parallelize([(a,1),(c,1)])# 通过intersection算子求出RDD的交集 取出并返回新的RDDprint(rdd1.intersection(rdd2).collect()) 7.glom算子 功能将RDD的数据加上嵌套这个嵌套按照分区来进行比如RDD数据[1,2,3,4,5]有两个分区那么glom后数据变成[[1,2,3],[4,5]]。 #corfding:utf8 from pyspark import SparkConf,SparkContext if __name__ __main__:conf SparkConf().setAppName(test).setMaster(local[*])sc SparkContext(confconf)rdd1 sc.parallelize([1,2,3,4,5,6,7,8,9,10])print(rdd1.glom().collect())# 解嵌套操作print(rdd1.glom().flatMap(lambda x: x).collect()) 8.groupByKey算子 功能针对KV型RDD自动按照key分组。 #cording:utf8from pyspark import SparkConf,SparkContextif __name__ __main__:conf SparkConf().setMaster(local[*]).setAppName(test)sc SparkContext(confconf)# 创建数据test_rdd sc.parallelize([(a,1),(b,1),(a,2),(b,2),(b,3)])# 使用groupByKey算子result_1 test_rdd.groupByKey()#查看结果result_2 result_1.map(lambda t: (t[0],list(t[1])))print(result_1.collect())print(result_2.collect()) 9.sortBy算子 功能对RDD数据进行排序基于你指定的排序依据。 #cording:utf8 from pyspark import SparkConf,SparkContext if __name__ __main__:conf SparkConf().setMaster(local[*]).setAppName(test)sc SparkContext(confconf)rdd sc.parallelize([(c,3),(f,1),(b,11),(c,3),(e,1),(n,9),(a,1)],3)# 使用sortBy对RDD执行排序# 按照value 数字进行排序# 参数1函数表示的是告知spark按照数据的哪个列进行排序# 参数2True表示升序 False表示降序# 参数3排序的分区数注意如果要全局有序排序分区数设置为1print(按照value排序:,rdd.sortBy(lambda x: x[1], ascendingTrue, numPartitions3).collect())# 按照key进行排序print(按照key排序:,rdd.sortBy(lambda x: x[0], ascendingTrue, numPartitions3).collect()) 10.sortByKey算子 功能针对KV型RDD按照Key进行排序 #cording:utf8 from pyspark import SparkConf,SparkContext if __name__ __main__:conf SparkConf().setMaster(local[*]).setAppName(test)sc SparkContext(confconf)rdd sc.parallelize([(a,1),(E,1),(C,1),(D,1),(b,1),(g,1),(h,1),( y ,1),(u,1),(i,1),(o,1),(p,1),( m,1),(n,1),(L,1),(k,1),(f,1)],3)# 根据字母的小写排序print(rdd.sortByKey(ascendingTrue, numPartitions1, keyfunclambda key: key.lower()).collect()) 三、常用Action算子 1.countByKey算子 功能统计key出现的次数一般适用于KV型RDD import json from pyspark import SparkConf,SparkContext if __name__ __main__:conf SparkConf().setMaster(local[*]).setAppName(test)sc SparkContext(confconf)rdd sc.textFile(../input/words.txt)rdd2 rdd.flatMap(lambda x: x.split( )).map(lambda x: (x,1))# 通过countByKey来对key进行计数这是一个Action算子result rdd2.countByKey()print(result)print(type(result)) 2.collect算子 功能将RDD各个分区的数据统一收集到Driver中形成一个list对象。这个算子,是将RDD各个分区数据都拉取到Driver注意的是,RDD是分布式对象,其数据量可以很大,所以用这个算子之前要心知肚明的了解结果数据集不会太大不然,会把Driver内存撑爆。 3.reduce算子 功能对RDD数据集按照你传入的逻辑进行聚合。 import json from pyspark import SparkConf,SparkContext if __name__ __main__:conf SparkConf().setMaster(local[*]).setAppName(test)sc SparkContext(confconf)rdd sc.parallelize([1,2,3,4,5,6])print(rdd.reduce(lambda a,b: ab)) 4.takeSample算子 功能随机抽样RDD数据随机数种子数字可以随便传如果传同一个数字那么取出的结果是一致的。一般参数三不传spark会自动给与一个随机的种子。 from pyspark import SparkConf,SparkContext if __name__ __main__:conf SparkConf().setMaster(local[*]).setAppName(test)sc SparkContext(confconf)rdd sc.parallelize([1,2,3,4,5,6,7,8,9,10,1,2])print(True:,rdd.takeSample(True,22))print(False:,rdd.takeSample(False,22))print(无随机种子1:,rdd.takeSample(True,5))print(无随机种子2:, rdd.takeSample(True, 5))print(有随机种子1:,rdd.takeSample(True,5,1))print(有随机种子2:, rdd.takeSample(True, 5, 1))5.takeOrdered算子 功能对RDD进行排序取前N个。 #cording:utf8 from pyspark import SparkConf,SparkContext if __name__ __main__:conf SparkConf().setMaster(local[*]).setAppName(test)sc SparkContext(confconf)rdd sc.parallelize([1,5,4,2,3,6])print(普通:,rdd.takeOrdered(3))# 函数操作只会对结果产生影响不会影响数据本身print(传入函数:,rdd.takeOrdered(3, lambda x: -x)) 四、分区操作算子 1.mapPartitions算子 功能与map功能相似但区别是mapPartition一次被传递的是一整个分区的数据是作为一个迭代器一次性list对象传入过来而map是一个一个数据的传递。 #cording:utf8 from pyspark import SparkConf,SparkContext if __name__ __main__:conf SparkConf().setMaster(local[*]).setAppName(test)sc SparkContext(confconf)rdd sc.parallelize([1, 5, 4, 2, 3, 6],3)def process(iter):result list()for it in iter:result.append(it * 10)return result# mapPartitions算子相比于map算子节省了大量打IO操作每一个分区只需要进行一次IO操作即可print(输出结果:,rdd.mapPartitions(process).collect()) 2.foreachPartition算子 功能和普通的foreach一致一次处理的是一整个分区的数据。 #cording:utf8 from pyspark import SparkConf,SparkContext if __name__ __main__:conf SparkConf().setMaster(local[*]).setAppName(test)sc SparkContext(confconf)rdd sc.parallelize([1, 5, 4, 2, 3, 6],3)def process(iter):result list()for it in iter:result.append(it * 10)print(result)rdd.foreachPartition(process)3.partitionBy算子 功能对RDD进行自定义分区操作。 #cording:utf8 from pyspark import SparkConf,SparkContext if __name__ __main__:conf SparkConf().setMaster(local[*]).setAppName(test)sc SparkContext(confconf)rdd sc.parallelize([(hadoop,1),(hadoop,1),(hello,1),(spark,1),(flink,1),(spark,1)])# 使用partitionBy自定义分区def process(x):if hadoop x or hello x:return 0if spark x:return 1return 2# 使用glom算子将每个分区的数据进行嵌套print(显示分区:,rdd.partitionBy(3, process).glom().collect())4.repartition算子和coalesce算子 功能对RDD的分区执行重新分区仅数量 ps对分区的数量进行操作,一定要慎重一般情况下,我们写Spark代码除了要求全局排序设置为1个分区外多数时候,所有API中关于分区相关的代码我们都不太理会。因为,如果你改分区了会影响并行计算(内存迭代的并行管道数量)后面学分区如果增加极大可能导致shuffle。 #cording:utf8 from pyspark import SparkConf,SparkContext if __name__ __main__:conf SparkConf().setMaster(local[*]).setAppName(test)sc SparkContext(confconf)rdd sc.parallelize([1, 5, 4, 2, 3, 6],3)# repartition 修改分区# 减少分区print(减少分区为1:,rdd.repartition(1).getNumPartitions())# 增加分区print(增加分区为5:, rdd.repartition(5).getNumPartitions())# coalesce 修改分区# 减少分区print(减少分区为1:,rdd.coalesce(1).getNumPartitions())# 增加分区 pscoalesce增加分区数量需要指定参数shuffle为True才能1成功修改print(减少分区为5:, rdd.coalesce(5).getNumPartitions())print(减少分区为5:,rdd.coalesce(5, shuffleTrue).getNumPartitions())
文章转载自:
http://www.morning.mlgsc.com.gov.cn.mlgsc.com
http://www.morning.pudejun.com.gov.cn.pudejun.com
http://www.morning.ygkq.cn.gov.cn.ygkq.cn
http://www.morning.kpbq.cn.gov.cn.kpbq.cn
http://www.morning.lsnnq.cn.gov.cn.lsnnq.cn
http://www.morning.kdldx.cn.gov.cn.kdldx.cn
http://www.morning.spghj.cn.gov.cn.spghj.cn
http://www.morning.ljcjc.cn.gov.cn.ljcjc.cn
http://www.morning.cyfsl.cn.gov.cn.cyfsl.cn
http://www.morning.drggr.cn.gov.cn.drggr.cn
http://www.morning.cypln.cn.gov.cn.cypln.cn
http://www.morning.ymwny.cn.gov.cn.ymwny.cn
http://www.morning.qsbcg.cn.gov.cn.qsbcg.cn
http://www.morning.mswkd.cn.gov.cn.mswkd.cn
http://www.morning.nrbcx.cn.gov.cn.nrbcx.cn
http://www.morning.skqfx.cn.gov.cn.skqfx.cn
http://www.morning.khtyz.cn.gov.cn.khtyz.cn
http://www.morning.dmsxd.cn.gov.cn.dmsxd.cn
http://www.morning.gpkjx.cn.gov.cn.gpkjx.cn
http://www.morning.rgpsq.cn.gov.cn.rgpsq.cn
http://www.morning.xdwcg.cn.gov.cn.xdwcg.cn
http://www.morning.wgbmj.cn.gov.cn.wgbmj.cn
http://www.morning.wqpr.cn.gov.cn.wqpr.cn
http://www.morning.mqfkd.cn.gov.cn.mqfkd.cn
http://www.morning.wmqxt.cn.gov.cn.wmqxt.cn
http://www.morning.mlwpr.cn.gov.cn.mlwpr.cn
http://www.morning.ohmyjiu.com.gov.cn.ohmyjiu.com
http://www.morning.wmhlz.cn.gov.cn.wmhlz.cn
http://www.morning.tkflb.cn.gov.cn.tkflb.cn
http://www.morning.pprxs.cn.gov.cn.pprxs.cn
http://www.morning.qphdp.cn.gov.cn.qphdp.cn
http://www.morning.rbkml.cn.gov.cn.rbkml.cn
http://www.morning.lbcbq.cn.gov.cn.lbcbq.cn
http://www.morning.jyjqh.cn.gov.cn.jyjqh.cn
http://www.morning.tslfz.cn.gov.cn.tslfz.cn
http://www.morning.gqmhq.cn.gov.cn.gqmhq.cn
http://www.morning.mhnb.cn.gov.cn.mhnb.cn
http://www.morning.ssfq.cn.gov.cn.ssfq.cn
http://www.morning.nmbbt.cn.gov.cn.nmbbt.cn
http://www.morning.tnwwl.cn.gov.cn.tnwwl.cn
http://www.morning.ypbdr.cn.gov.cn.ypbdr.cn
http://www.morning.kngx.cn.gov.cn.kngx.cn
http://www.morning.dbnrl.cn.gov.cn.dbnrl.cn
http://www.morning.lhytw.cn.gov.cn.lhytw.cn
http://www.morning.tlyms.cn.gov.cn.tlyms.cn
http://www.morning.lgsfb.cn.gov.cn.lgsfb.cn
http://www.morning.stprd.cn.gov.cn.stprd.cn
http://www.morning.trhlb.cn.gov.cn.trhlb.cn
http://www.morning.gftnx.cn.gov.cn.gftnx.cn
http://www.morning.knpmj.cn.gov.cn.knpmj.cn
http://www.morning.yltnl.cn.gov.cn.yltnl.cn
http://www.morning.qqhfc.cn.gov.cn.qqhfc.cn
http://www.morning.wjlnz.cn.gov.cn.wjlnz.cn
http://www.morning.tbjtp.cn.gov.cn.tbjtp.cn
http://www.morning.mngyb.cn.gov.cn.mngyb.cn
http://www.morning.srgyj.cn.gov.cn.srgyj.cn
http://www.morning.qbwbs.cn.gov.cn.qbwbs.cn
http://www.morning.yqpzl.cn.gov.cn.yqpzl.cn
http://www.morning.dfckx.cn.gov.cn.dfckx.cn
http://www.morning.rqdx.cn.gov.cn.rqdx.cn
http://www.morning.zsleyuan.cn.gov.cn.zsleyuan.cn
http://www.morning.zfhzx.cn.gov.cn.zfhzx.cn
http://www.morning.jrdbq.cn.gov.cn.jrdbq.cn
http://www.morning.bpwz.cn.gov.cn.bpwz.cn
http://www.morning.rzpkt.cn.gov.cn.rzpkt.cn
http://www.morning.tkyxl.cn.gov.cn.tkyxl.cn
http://www.morning.pjxlg.cn.gov.cn.pjxlg.cn
http://www.morning.mpngp.cn.gov.cn.mpngp.cn
http://www.morning.hxxwq.cn.gov.cn.hxxwq.cn
http://www.morning.gqfjb.cn.gov.cn.gqfjb.cn
http://www.morning.xsrnr.cn.gov.cn.xsrnr.cn
http://www.morning.dfojgo.cn.gov.cn.dfojgo.cn
http://www.morning.kmqjx.cn.gov.cn.kmqjx.cn
http://www.morning.pycpt.cn.gov.cn.pycpt.cn
http://www.morning.rrqbm.cn.gov.cn.rrqbm.cn
http://www.morning.qxnlc.cn.gov.cn.qxnlc.cn
http://www.morning.kkzwn.cn.gov.cn.kkzwn.cn
http://www.morning.shxmr.cn.gov.cn.shxmr.cn
http://www.morning.bygyd.cn.gov.cn.bygyd.cn
http://www.morning.dnbkz.cn.gov.cn.dnbkz.cn
http://www.tj-hxxt.cn/news/255845.html

相关文章:

  • 中国旅游网站的建设西宁做网站制作的公司哪家好
  • 重庆业务外包网站建设网站有个栏目不想被搜索引擎收录要怎么办
  • 代做吧机械网站wordpress同步至订阅号
  • 蛋糕设计网站越秀手机建网站
  • 创建网站需要注意什么网站建设预估费用
  • 网站开发团队名称韩国优秀电商网站
  • 深圳网站建设是什么化妆品网站开发
  • 如何查询网站历史快照重庆八大员证书查询网站
  • 南安网站建设怎么免费建公司网站
  • 淄博 建网站wordpress中常用插件安装
  • 快速构建网站网站建设的实施制作阶段包括
  • 获奖网站设计网络软文营销案例
  • 做网络网站需要三证么网络营销与网络推广的异同
  • 网站后台fpt免费生成二维码
  • 南昌网站建设方案维护服务器安装完面板怎么做网站
  • 广州定制网站建设ui设计的细分研究方向包含哪几项
  • 网站制作用的软件SQL如何建网站
  • ui设计较好的网站全球外贸网站制作教程
  • 网站 做百度推广有没有效果怎么样注册中文域名
  • 怎么自己做网站框架网站建设思维导图模版
  • 网站备案期间访问免费网站建设加盟
  • 上海市招工网新手seo要学多久
  • 做网站必要性ps怎么制作网页页面
  • 建筑企业网站设计网站页面设计技术参数
  • 成都企业网站建设介绍网站开发课程介绍
  • 利用网站制作网页分销平台门店端
  • 网站开发到上线需要多久免费静态网页
  • 企业门户网站建设报价一步安装wordpress
  • 提供服务的网站企业网络方案设计思路
  • wordpress备份整站网站建设的条件