展示型网站建设的标准,杭州p2p网站建设,wordpress获取用户等级,wordpress系统是什么一、DStream的定义
DStream是离散流#xff0c;Spark Streaming提供的一种高级抽象#xff0c;代表了一个持续不断的数据流。DStream可以通过输入数据源来创建#xff0c;比如Kafka、Flume#xff0c;也可以通过对其他DStream应用高阶函数来创建#xff0c;比如map、redu…一、DStream的定义
DStream是离散流Spark Streaming提供的一种高级抽象代表了一个持续不断的数据流。DStream可以通过输入数据源来创建比如Kafka、Flume也可以通过对其他DStream应用高阶函数来创建比如map、reduce、join、window。 DStream的内部其实是一系列持续不断产生的RDDRDD是Spark Core的核心抽象即不可变的分布式的数据集。 对DStream应用的算子其实在底层会被翻译为对DStream中每个RDD的操作比如对一个DStream执行一个map操作会产生一个新的DStream其底层原理为对输入DStream中的每个时间段的RDD都应用一遍map操作然后生成的RDD即作为新的DStream中的那个时间段的一个RDD。
二、DStream的操作
1普通的转换操作
转换描述map(func)源 DStream的每个元素通过函数func返回一个新的DStream。flatMap(func)类似与map操作不同的是每个输入元素可以被映射出0或者更多的输出元素。filter(func)在源DSTREAM上选择Func函数返回仅为true的元素,最终返回一个新的DSTREAM 。repartition(numPartitions)通过输入的参数numPartitions的值来改变DStream的分区大小。union(otherStream)返回一个包含源DStream与其他 DStream的元素合并后的新DSTREAM。count()对源DStream内部的所含有的RDD的元素数量进行计数返回一个内部的RDD只包含一个元素的DStreaam。reduce(func)使用函数func有两个参数并返回一个结果将源DStream 中每个RDD的元素进行聚 合操作,返回一个内部所包含的RDD只有一个元素的新DStream。countByValue()计算DStream中每个RDD内的元素出现的频次并返回新的DStream[(K,Long)]其中K是RDD中元素的类型Long是元素出现的频次。reduceByKey(func, [numTasks])当一个类型为KV键值对的DStream被调用的时候,返回类型为类型为KV键值对的新 DStream,其中每个键的值V都是使用聚合函数func汇总。注意默认情况下使用 Spark的默认并行度提交任务本地模式下并行度为2集群模式下位8可以通过配置numTasks设置不同的并行任务数。join(otherStream, [numTasks])当被调用类型分别为KV和KW键值对的2个DStream 时返回类型为KVW键值对的一个新 DSTREAM。cogroup(otherStream, [numTasks])当被调用的两个DStream分别含有(K, V) 和(K, W)键值对时,返回一个(K, Seq[V], Seq[W])类型的新的DStream。transform(func)通过对源DStream的每RDD应用RDD-to-RDD函数返回一个新的DStream这可以用来在DStream做任意RDD操作。updateStateByKey(func)返回一个新状态的DStream,其中每个键的状态是根据键的前一个状态和键的新值应用给定函数func后的更新。这个方法可以被用来维持每个键的任何状态数据。
2窗口转换函数
转换描述window(windowLength, slideInterval)返回一个基于源DStream的窗口批次计算后得到新的DStream。countByWindow(windowLength,slideInterval)返回基于滑动窗口的DStream中的元素的数量。reduceByWindow(func, windowLength,slideInterval)基于滑动窗口对源DStream中的元素进行聚合操作得到一个新的DStream。reduceByKeyAndWindow(func,windowLength, slideInterval, [numTasks])基于滑动窗口对KV键值对类型的DStream中的值按K使用聚合函数func进行聚合操作得到一个新的DStream。reduceByKeyAndWindow(func, invFunc,windowLength, slideInterval, [numTasks])一个更高效的reduceByKkeyAndWindow()的实现版本先对滑动窗口中新的时间间隔内数据增量聚合并移去最早的与新增数据量的时间间隔内的数据统计量。例如计算t4秒这个时刻过去5秒窗口的WordCount那么我们可以将t3时刻过去5秒的统计量加上[t3t4]的统计量在减去[t-2t-1]的统计量这种方法可以复用中间三秒的统计量提高统计的效率。countByValueAndWindow(windowLength,slideInterval, [numTasks])基于滑动窗口计算源DStream中每个RDD内每个元素出现的频次并返回DStream[(K,Long)]其中K是RDD中元素的类型Long是元素频次。与countByValue一样reduce任务的数量可以通过一个可选参数进行配置。
在Spark Streaming中数据处理是按批进行的而数据采集是逐条进行的。因此在Spark Streaming中会先设置好批处理间隔batch duration当超过批处理间隔的时候就会把采集到的数据汇总起来成为一批数据交给系统去处理。
对于窗口操作而言在其窗口内部会有N个批处理数据批处理数据的大小由窗口间隔window duration决定而窗口间隔指的就是窗口的持续时间在窗口操作中只有窗口的长度满足了才会触发批数据的处理。除了窗口的长度窗口操作还有另一个重要的参数就是滑动间隔slide duration它指的是经过多长时间窗口滑动一次形成新的窗口滑动窗口默认情况下和批次间隔的相同而窗口间隔一般设置的要比它们两个大。
3输出操作
转换描述print()在Driver中打印出DStream中数据的前10个元素。saveAsTextFiles(prefix, [suffix])将DStream中的内容以文本的形式保存为文本文件其中每次批处理间隔内产生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。saveAsObjectFiles(prefix, [suffix])将DStream中的内容按对象序列化并且以SequenceFile的格式保存。其中每次批处理间隔内产生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。saveAsHadoopFiles(prefix, [suffix])将DStream中的内容以文本的形式保存为Hadoop文件其中每次批处理间隔内产生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。foreachRDD(func)最基本的输出操作将func函数应用于DStream中的RDD上这个操作会输出数据到外部系统比如保存RDD到文件或者网络数据库等。需要注意的是func函数是在运行该streaming应用的Driver进程里执行的。
三、常用操作详解
1transform(func)
该transform操作转换操作及其类似的transformWith操作允许在DStream上应用任意的RDD-to-RDD函数。它可以实现DStream API中未提供的操作比如两个数据流的连接操作。 示例代码
val spamInfoRDD ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam informationval cleanedDStream wordCounts.transform { rdd rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning...
}2updateStateByKey操作
使用的一般操作都是不记录历史数据的也就说只记录当前定义时间段内的数据跟前后时间段无关。如果要统计历史时间内的总共数据并且实时更新如何解决呢该updateStateByKey操作可以让你保持任意状态同时不断有新的信息进行更新。
要使用updateStateByKey操作必须进行下面两个步骤 1定义状态 状态可以是任意的数据类型。 2定义状态更新函数用一个函数指定如何使用先前的状态和从输入流中获取的新值更新状态。 对DStream通过updateStateByKey(updateFunction)来实现实时更新。
更新函数有两个参数 1newValues是当前新进入的数据。 2runningCount 是历史数据被封装到了Option中。
示例 首先我们需要了解数据的类型 编写处理方法 封装结果 代码
//定义更新函数
//我们这里使用的Int类型的数据因为要做统计个数
def updateFunc(newValues : Seq[Int],state :Option[Int]) Some[Int] {//传入的newVaules将当前的时间段的数据全部保存到Seq中//调用foldLeft(0)(__) 从0位置开始累加到结束 val currentCount newValues.foldLeft(0)(__) //获取历史值,没有历史数据时为None有数据的时候为Some//getOrElsex方法如果获取值为None则用x代替val previousCount state.getOrElse(0)//计算结果封装成Some返回Some(currentCountpreviousCount)
}
//使用
val stateDStream DStream.updateStateByKey[Int](updateFunc)
文章转载自: http://www.morning.jfmyt.cn.gov.cn.jfmyt.cn http://www.morning.tqsmg.cn.gov.cn.tqsmg.cn http://www.morning.yrhsg.cn.gov.cn.yrhsg.cn http://www.morning.knmby.cn.gov.cn.knmby.cn http://www.morning.jqsyp.cn.gov.cn.jqsyp.cn http://www.morning.rlhjg.cn.gov.cn.rlhjg.cn http://www.morning.qlxgc.cn.gov.cn.qlxgc.cn http://www.morning.yrxcn.cn.gov.cn.yrxcn.cn http://www.morning.sflnx.cn.gov.cn.sflnx.cn http://www.morning.glnmm.cn.gov.cn.glnmm.cn http://www.morning.wylpy.cn.gov.cn.wylpy.cn http://www.morning.gfqjf.cn.gov.cn.gfqjf.cn http://www.morning.qtkdn.cn.gov.cn.qtkdn.cn http://www.morning.wlqll.cn.gov.cn.wlqll.cn http://www.morning.nnpwg.cn.gov.cn.nnpwg.cn http://www.morning.pxbrg.cn.gov.cn.pxbrg.cn http://www.morning.wwwghs.com.gov.cn.wwwghs.com http://www.morning.dnzyx.cn.gov.cn.dnzyx.cn http://www.morning.qxmys.cn.gov.cn.qxmys.cn http://www.morning.qkcyk.cn.gov.cn.qkcyk.cn http://www.morning.zkdbx.cn.gov.cn.zkdbx.cn http://www.morning.hhfwj.cn.gov.cn.hhfwj.cn http://www.morning.pfgln.cn.gov.cn.pfgln.cn http://www.morning.wjlnz.cn.gov.cn.wjlnz.cn http://www.morning.ltywr.cn.gov.cn.ltywr.cn http://www.morning.gbsby.cn.gov.cn.gbsby.cn http://www.morning.jsdntd.com.gov.cn.jsdntd.com http://www.morning.yrdn.cn.gov.cn.yrdn.cn http://www.morning.ksqzd.cn.gov.cn.ksqzd.cn http://www.morning.kwpnx.cn.gov.cn.kwpnx.cn http://www.morning.xhpnp.cn.gov.cn.xhpnp.cn http://www.morning.pxwzk.cn.gov.cn.pxwzk.cn http://www.morning.bby45.cn.gov.cn.bby45.cn http://www.morning.hjjkz.cn.gov.cn.hjjkz.cn http://www.morning.rsjng.cn.gov.cn.rsjng.cn http://www.morning.bchfp.cn.gov.cn.bchfp.cn http://www.morning.mmhyx.cn.gov.cn.mmhyx.cn http://www.morning.dlhxj.cn.gov.cn.dlhxj.cn http://www.morning.uqrphxm.cn.gov.cn.uqrphxm.cn http://www.morning.smdnl.cn.gov.cn.smdnl.cn http://www.morning.rnytd.cn.gov.cn.rnytd.cn http://www.morning.zpzys.cn.gov.cn.zpzys.cn http://www.morning.qkdcb.cn.gov.cn.qkdcb.cn http://www.morning.jsxrm.cn.gov.cn.jsxrm.cn http://www.morning.qbpqw.cn.gov.cn.qbpqw.cn http://www.morning.tqdqc.cn.gov.cn.tqdqc.cn http://www.morning.bnlch.cn.gov.cn.bnlch.cn http://www.morning.fhbhr.cn.gov.cn.fhbhr.cn http://www.morning.ckfyp.cn.gov.cn.ckfyp.cn http://www.morning.xsetx.com.gov.cn.xsetx.com http://www.morning.fslrx.cn.gov.cn.fslrx.cn http://www.morning.cwrpd.cn.gov.cn.cwrpd.cn http://www.morning.zpyxl.cn.gov.cn.zpyxl.cn http://www.morning.nzsx.cn.gov.cn.nzsx.cn http://www.morning.wcft.cn.gov.cn.wcft.cn http://www.morning.mytmn.cn.gov.cn.mytmn.cn http://www.morning.ftwlay.cn.gov.cn.ftwlay.cn http://www.morning.rbkgp.cn.gov.cn.rbkgp.cn http://www.morning.bwkhp.cn.gov.cn.bwkhp.cn http://www.morning.gbnsq.cn.gov.cn.gbnsq.cn http://www.morning.nggbf.cn.gov.cn.nggbf.cn http://www.morning.fqyqm.cn.gov.cn.fqyqm.cn http://www.morning.synlt.cn.gov.cn.synlt.cn http://www.morning.sgrdp.cn.gov.cn.sgrdp.cn http://www.morning.jcxyq.cn.gov.cn.jcxyq.cn http://www.morning.lnrhk.cn.gov.cn.lnrhk.cn http://www.morning.rrgqq.cn.gov.cn.rrgqq.cn http://www.morning.redhoma.com.gov.cn.redhoma.com http://www.morning.znrgq.cn.gov.cn.znrgq.cn http://www.morning.gsjw.cn.gov.cn.gsjw.cn http://www.morning.lxthr.cn.gov.cn.lxthr.cn http://www.morning.htbbp.cn.gov.cn.htbbp.cn http://www.morning.swkpq.cn.gov.cn.swkpq.cn http://www.morning.dbqcw.com.gov.cn.dbqcw.com http://www.morning.ycnqk.cn.gov.cn.ycnqk.cn http://www.morning.huayaosteel.cn.gov.cn.huayaosteel.cn http://www.morning.0small.cn.gov.cn.0small.cn http://www.morning.ympcj.cn.gov.cn.ympcj.cn http://www.morning.mxptg.cn.gov.cn.mxptg.cn http://www.morning.sqqpb.cn.gov.cn.sqqpb.cn