当前位置: 首页 > news >正文

网站空间 windows linux玉器企业网站源码

网站空间 windows linux,玉器企业网站源码,wordpress页面固定链接修改,做网站从哪里找货源DataStream API编程模型 1.【Flink-Scala】DataStream编程模型之 数据源、数据转换、数据输出 2.【Flink-scala】DataStream编程模型之 窗口的划分-时间概念-窗口计算程序 3.【Flink-scala】DataStream编程模型之 窗口计算-触发器-驱逐器 文章目录 DataStream API编程模型前言…DataStream API编程模型 1.【Flink-Scala】DataStream编程模型之 数据源、数据转换、数据输出 2.【Flink-scala】DataStream编程模型之 窗口的划分-时间概念-窗口计算程序 3.【Flink-scala】DataStream编程模型之 窗口计算-触发器-驱逐器 文章目录 DataStream API编程模型前言一、水位线1.1 水位线1.1.1 概念1.1.2 水位线如何发挥作用呢1.1.3 水位线原理1.1.3.1 消息正常到达系统的时间1.1.3.2消息延迟到达系统时的情况1.1.3.3 采用事件时间时的情况1.1.3.4 引入水位线机制的情况 1.1.4 水位线的设置方法1.1.4.1水位线生成策略--固定延迟生成水位线1.1.4.2 水位线生成策略-单调递增生成水位线1.1.4.3 自动义生成水位线策略 总结 前言 本小节学习水位线和延迟数据处理再学习状态编程水位线和延迟数据处理关联性强一点如果篇幅太长我就再开一小节写。 开始吧 水位线这和实际生活中河流/水库到达哪个水位线就要有什么问题一样就是达到水位线后做什么处理。 我这样想河流里或者是水龙头的水是水流把水换成数据就是datastream[数据流]。水位线这个概念也能扯上点关系 一、水位线 1.1 水位线 1.1.1 概念 水位线是一种衡量事件时间进展的机制它是数据本身的一个隐藏属性本质上就是一个时间戳。 水位线是配合事件时间来使用的通常基于事件时间的数据自身都包含一个水位线用于处理乱序事件。 使用处理时间来处理事件时不会有延迟因此也不需要水位线所以水位线只出现在事件时间窗口。 正确地处理乱序事件通常是结合窗口和水位线这两种机制来实现的。 1.1.2 水位线如何发挥作用呢 在流处理过程中从事件产生到流经数据源再到流经算子中间是有一个过程和时间的。 虽然大部分情况下流到算子的数据都是按照事件产生的时间顺序到达的但是也不排除由于网络、系统等原因导致乱序的产生和迟到数据。 但是对于迟到数据而言我们又不能无限期地等下去必须要有个机制来保证在经过一个特定的时间后必须触发窗口去进行计算。 在进行窗口计算时使用接入时间或处理时间的消息都是以系统的墙上时间比如现在是8:50我写的博客那么这个事件就是850就是生成时间为准因此事件都是按序到达的。但在实际应用中由于网络或者系统等外部因素影响事件数据往往不能及时到达Flink系统从而造成数据乱序到达或者延迟到达等问题。针对这两个问题Flink主要采用以水位线为核心的机制来应对。 此时就是水位线发挥作用了它表示当达到水位线后在水位线之前的数据已经全部到达即使后面还有延迟的数据系统可以触发相应的窗口计算。 只有水位线越过窗口对应的结束时间窗口才会关闭和进行计算。 一般而言只有以下两个条件同时成立才会触发窗口计算 1条件T1水位线时间 窗口结束时间 2条件T2在[窗口开始时间,窗口结束时间)中有数据存在。 理想情况下水位线应该与处理时间一致并且处理时间与事件时间只相差常数时间甚至为零。 当水位线与处理时间完全重合时就意味着消息产生后马上被处理不存在消息迟到的情况。 然而由于网络拥塞或系统原因消息常常存在迟到的情况 因此在设置水位线时总是考虑一定的延时从而给予迟到的数据一些机会。具体的延迟大小根据水位线实现方式的不同而也有所差别 1.1.3 水位线原理 1.1.3.1 消息正常到达系统的时间 window1[5s-15s] window2 [10s-20s] window3[15s-25s] 现在假设有一个单词数据流需要采用基于处理时间的滑动窗口进行实时的词频统计滑动窗口大小为10s滑动步长为5s。 假设数据源分别在第12秒、第12秒和第17秒的时候生成3条内容为单词“a”的消息这些消息将进入窗口中。 时间15s, 2条数据时间15s1条数据5-15s,3条数据每个窗口提交后最后统计值分别是 (a, 2)(a, 3) 和 (a, 1) 1.1.3.2消息延迟到达系统时的情况 正常是12-17s来了3条数据现在开始有迟到数据。 假设在12s时候出现一条迟到6s的数据18sde 数据这条延迟的消息会落入 window2 [10s-20s] 和 window3[15s-25s]。 窗口提交后最后统计值将分别是 (a, 1)(a, 3) 和 (a, 2)。正常应该是(a, 2)(a, 3) 和 (a, 1) 可看出这条延迟的消息没有对window2 [10s-20s]的计算结果造成影响但却影响了window1[5s-15s]和 window3[15s-25s]的计算结果导致二者计算结果出错。 因为当这条消息在第18秒到达时window1[5s-15s]计算已结束这条消息不会被统计到window1[5s-15s]中而会落入window3[15s-25s]导致被统计window3[15s-25s] 1.1.3.3 采用事件时间时的情况 采用事件时间,则当系统时间行进到第18秒时这条迟到了6秒的消息会落入 window2 [10s-20s] 因为这条消息的事件生成时间是第12秒所以就应该属于window1[5s-15s]和window2 [10s-20s] 但是在第18秒时window1[5s-15s]已经关闭所以这条延迟的消息只会落入 window2 [10s-20s]。 最终三个窗口的计算结果是(a,1)(a, 3) 和 (a, 1)也就是说window2[10s-20s]和 window3[15s-25s]提交了正确的结果但是 window1[5s-15s]的结果还是错误的 1.1.3.4 引入水位线机制的情况 就本例而言水位线本质上就是告诉Flink一条消息可以延迟多久 因此这里让水位线等于系统当前时间减去5秒。由于只有水位线越过窗口对应的结束时间窗口才会关闭和进行计算 因此第1个窗口window1[5s-15s]将会在第20秒的时候进行计算第2个窗口window2[10s-20s]将会在第25秒的时候进行计算第3个窗口window3[15s-25s]将会在第30秒的时候进行计算。 当系统时间行进到第18秒时这条迟到了6秒的消息会落入window1[5s-15s]和 window2 [10s-20s]因为这条消息的事件生成时间是第12秒所以就应该属于window1[5s-15s]和window2 [10s-20s]。 最终三个窗口提交正确结果即(a, 2)(a, 3) 和 (a, 1) 1.1.4 水位线的设置方法 水位线事关事件时间那么就需要知道事件时间戳。 就必须为数据流中的每个元素分配一个时间戳。 在Flink系统中分配时间戳和生成水位线这两个工作是同时进行的前者是由TimestampAssigner来实现的后者则是由WatermarkGenerator来实现的。 当我们构建了一个DataStream之后可以使用assignTimestampsAndWatermarks方法来分配时间戳和生成水位线调用该方法时需要传入一个WatermarkStrategy对象语法如下 DataStream.assignTimestampsAndWatermarks(WatermarkStrategyT)一般情况下Flink要求WatermarkStrategy对象中同时包含了TimestampAssigner对象和WatermarkGenerator对象。 WatermarkStrategy是一个接口提供了很多静态的方法对于一些常用的水位线生成策略我们不需要去实现这个接口可以直接调用静态方法来生成水位线。 或者我们也可以通过实现WatermarkStrategy接口中的createWatermarkGenerator方法和createTimestampAssigner方法来自定义水位线策略。 说到底就是两个分配时间戳生成水位线有的地方叫水印。 1.1.4.1水位线生成策略–固定延迟生成水位线 固定延迟生成水位线的语法如下 WatermarkStrategy.forBoundedOutOfOrderness(Duration maxOutOfOrderness)比如现在要实现一个延迟3秒的固定延迟水位线并从消息中获取时间戳具体语句如下 val dataStream ...... dataStream.assignTimestampsAndWatermarks( WatermarkStrategy .forBoundedOutOfOrderness[StockPrice](Duration.ofSeconds(3))//这里延迟3s .withTimestampAssigner(new SerializableTimestampAssigner[StockPrice] {override def extractTimestamp(element: StockPrice, recordTimestamp: Long): Long element.timeStamp//分配时间戳} ) )使用的是这个方法forBoundedOutOfOrderness 1.1.4.2 水位线生成策略-单调递增生成水位线 单调递增生成水位线是通过WatermarkStrategy接口的静态方法forMonotonousTimestamps提供的语法如下 WatermarkStrategy.forMonotonousTimestamps()学习单词 在程序中按照如下方式使用 val dataStream ...... dataStream.assignTimestampsAndWatermarks( WatermarkStrategy .forMonotonousTimestamps() .withTimestampAssigner(new SerializableTimestampAssigner[StockPrice] {override def extractTimestamp(element: StockPrice, recordTimestamp: Long): Long element.timeStamp} ) )1.1.4.3 自动义生成水位线策略 自定义肯定就是实现某个接口的什么方法啦之前就说过 水位线设置就两个分配时间戳生成水位线 这里我们只需要实现WatermarkStrategy接口中的createWatermarkGenerator方法和createTimestampAssigner方法就可以了。 水位线策略 createWatermarkGenerator方法需要返回一个WatermarkGenerator对象。 WatermarkGenerator是一个接口需要实现这个接口里面的onEvent方法和onPeriodicEmit方法 1onEvent数据流中的每个元素或事件到达以后都会调用这个方法如果我们想依赖每个元素生成一个水位线然后发射到下游就可以实现这个方法。 2onPeriodicEmit当数据量比较大的时候为每个元素都生成一个水位线会影响系统性能所以Flink还提供了一个周期性生成水位线的方法。这个水位线的生成周期的设置方法是env.getConfig.setAutoWatermarkInterval(5000L)其中5000L是间隔时间可以由用户自定义。 在自定义水位线生成策略时Flink提供了两种不同的方式 1.定期水位线在这种机制中系统会通过onEvent方法对系统中到达的事件进行监控然后在系统调用onPeriodicEmit方法时生成一个水位线。两个方法都使用 2.标点水位线在这种机制中系统会通过onEvent方法对系统中到达的事件进行监控并等待具有特定标记的事件到达一旦监测到特定事件到达就立即生成一个水位线。通常这种机制不会调用onPeriodicEmit方法来生成一个水位线。只使用一个方法 代码 import java.text.SimpleDateFormat import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, TimestampAssigner, TimestampAssignerSupplier, Watermark, WatermarkGenerator, WatermarkGeneratorSupplier, WatermarkOutput, WatermarkStrategy} import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows import org.apache.flink.streaming.api.windowing.time.Timecase class StockPrice(stockId:String,timeStamp:Long,price:Double) object WatermarkTest { def main(args: Array[String]): Unit {//设定执行环境 val env StreamExecutionEnvironment.getExecutionEnvironment//设定时间特性为事件时间 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)//设定程序并行度 env.setParallelism(1)//创建数据源 val source env.socketTextStream(localhost, 9999)//指定针对数据流的转换操作逻辑 val stockDataStream source.map(s s.split(,)).map(sStockPrice(s(0).toString,s(1).toLong,s(2).toDouble))//为数据流分配时间戳和水位线 val watermarkDataStream stockDataStream.assignTimestampsAndWatermarks(new MyWatermarkStrategy)//执行窗口计算 val sumStream watermarkDataStream.keyBy(stockId).window(TumblingEventTimeWindows.of(Time.seconds(3))).reduce((s1, s2) StockPrice(s1.stockId,s1.timeStamp, s1.price s2.price))//打印输出 sumStream.print(output)//指定名称并触发流计算env.execute(WatermarkTest)} //指定水位线生成策略class MyWatermarkStrategy extends WatermarkStrategy[StockPrice] {override def createTimestampAssigner(context:TimestampAssignerSupplier.Context):TimestampAssigner[StockPrice]{new SerializableTimestampAssigner[StockPrice] {override def extractTimestamp(element: StockPrice, recordTimestamp: Long): Long {element.timeStamp //从到达消息中提取时间戳}}}override def createWatermarkGenerator(context:WatermarkGeneratorSupplier.Context): WatermarkGenerator[StockPrice] {new WatermarkGenerator[StockPrice](){val maxOutOfOrderness 10000L //设定最大延迟为10秒var currentMaxTimestamp: Long 0Lvar a: Watermark nullval format new SimpleDateFormat(yyyy-MM-dd HH:mm:ss.SSS)override def onEvent(element: StockPrice, eventTimestamp: Long, output:WatermarkOutput): Unit { currentMaxTimestamp Math.max(eventTimestamp, currentMaxTimestamp)a new Watermark(currentMaxTimestamp - maxOutOfOrderness)output.emitWatermark(a)println(timestamp: element.stockId , element.timeStamp | format.format(element.timeStamp) , currentMaxTimestamp | format.format(currentMaxTimestamp) , a.toString)}override def onPeriodicEmit(output:WatermarkOutput): Unit {// 没有使用周期性发送水印因此这里没有执行任何操作}}}} } 输入 s1 stock_1,1602031567000,8.14 s2 stock_1,1602031571000,8.23 s3 stock_1,1602031577000,8.24 s4 stock_1,1602031578000,8.87 s5 stock_1,1602031579000,8.55 s6 stock_1,1602031581000,8.43 s7 stock_1,1602031582000,8.78 然后在日志终端内就可以看到如下输出信息 timestamp:stock_1,1602031567000|2020-10-07 08:46:07.000,1602031567000|2020-10-07 08:46:07.000,Watermark 1602031557000 (2020-10-07 08:45:57.000) timestamp:stock_1,1602031571000|2020-10-07 08:46:11.000,1602031571000|2020-10-07 08:46:11.000,Watermark 1602031561000 (2020-10-07 08:46:01.000) timestamp:stock_1,1602031577000|2020-10-07 08:46:17.000,1602031577000|2020-10-07 08:46:17.000,Watermark 1602031567000 (2020-10-07 08:46:07.000) timestamp:stock_1,1602031578000|2020-10-07 08:46:18.000,1602031578000|2020-10-07 08:46:18.000,Watermark 1602031568000 (2020-10-07 08:46:08.000) timestamp:stock_1,1602031579000|2020-10-07 08:46:19.000,1602031579000|2020-10-07 08:46:19.000,Watermark 1602031569000 (2020-10-07 08:46:09.000) output StockPrice(stock_1,1602031567000,8.14) timestamp:stock_1,1602031581000|2020-10-07 08:46:21.000,1602031581000|2020-10-07 08:46:21.000,Watermark 1602031571000 (2020-10-07 08:46:11.000) timestamp:stock_1,1602031582000|2020-10-07 08:46:22.000,1602031582000|2020-10-07 08:46:22.000,Watermark 1602031572000 (2020-10-07 08:46:12.000) output StockPrice(stock_1,1602031571000,8.23) 为了正确理解水位线的工作原理下面我们详细解释每个事件到达后水位线的变化情况、各个窗口中的事件分布情况以及窗口触发计算的情况。关于窗口计算这里要再次强调只有以下两个条件同时成立才会触发窗口计算 1条件T1水位线时间 窗口结束时间 2条件T2在[窗口开始时间,窗口结束时间)中有数据存在。 1.s1事件到达后 事件s1到达系统以后的水位线的变化情况可以看出当前的水位线已经到达了1602031557000(2020-10-07 08:45:57.000)。 s1到达后各个窗口包含事件的情况 水位线是在增长的在那么增长的呢 这是我截取上面 的部分代码。最大延迟10s就是本次到的事件最大时间戳-10s即为水位线。对应下代码a new Watermark(currentMaxTimestamp - maxOutOfOrderness) val maxOutOfOrderness 10000L //设定最大延迟为10秒var currentMaxTimestamp: Long 0Lvar a: Watermark nullval format new SimpleDateFormat(yyyy-MM-dd HH:mm:ss.SSS)override def onEvent(element: StockPrice, eventTimestamp: Long, output:WatermarkOutput): Unit { currentMaxTimestamp Math.max(eventTimestamp, currentMaxTimestamp)a new Watermark(currentMaxTimestamp - maxOutOfOrderness)水位线增长每次有新事件到达时都会检查并更新currentMaxTimestamp然后根据这个值减去maxOutOfOrderness来生成新的水位线 2.当事件s2到达以后 s2到达系统以后的水位线的变化情况可以看出当前的水位线已经到达了1602031561000(2020-10-07 08:46:01.000)。 s2到达以后各个窗口内包含的事件的情况。 3.当事件s3到达以后 事件s3到达系统以后的水位线的变化情况可以看出当前的水位线已经到达了1602031567000(2020-10-07 08:46:07.000)。 s3到达以后各个窗口内包含的事件的情况。 4.当事件s4到达以后 事件s4到达系统以后的水位线的变化情况可以看出当前的水位线已经到达了1602031568000(2020-10-07 08:46:08.000)。 s4到达以后各个窗口内包含的事件的情况。 回顾一下 触发窗口计算 1条件T1水位线时间 窗口结束时间 2条件T2在[窗口开始时间,窗口结束时间)中有数据存在。 看到水位线事件84608,窗口结束事件是09那么此时还没有大于等于。 继续 5.当事件s5到达以后 事件s5到达系统以后的水位线的变化情况可以看出当前的水位线已经到达了1602031569000(2020-10-07 08:46:09.000)。 当当当注意啦看看触发窗口计算条件。 s5到达以后各个窗口内包含的事件的情况。 8:46:09水位线已经大于等于w1窗口结束时间啦条件1满足且窗口有数据条件2满足w1开始计算 6.当事件s6到达以后 事件s6到达系统以后的水位线的变化情况可以看出当前的水位线已经到达了1602031571000(2020-10-07 08:46:11.000)。 s6到达以后各个窗口内包含的事件的情况 此时再看看条件满足否 7.当事件s7到达以后 事件s7到达系统以后的水位线的变化情况可以看出当前的水位线已经到达了1602031572000(2020-10-07 08:46:12.000)。 s7到达以后各个窗口内包含的事件的情况。 当当当又注意啦看看是否满足条件 满足条件触发计算窗口2 计算完成 总结 没有想到水位线写了这么多延迟数据处理还没有写本小节主要学习水位线的原理和设置方法。 其中自定义的水位线生成策略稍显麻烦代码需要着重分析。下一小节该写延迟数据处理了。
http://www.tj-hxxt.cn/news/137642.html

相关文章:

  • 广州制作外贸网站公司简介视频网站不赚钱为什么还做
  • 中铁建设集团有限公司官方网站300500启迪设计
  • 农产品网站开发定制v软件免费下载
  • 网站无法导入照片上海万户网络科技
  • 专注高密做网站哪家强h5在线设计平台
  • 德国网站的后缀名企业网站推广 知乎
  • 做新房什么网站好做电影网站前途
  • 坪地网站建设教程西安网站建设案例
  • 淘宝有WordPress网站搭建吗海口制作网站公司
  • 荆门建网站费用潍坊地区制作网站
  • 河南省建设工程造价协会网站教资注册网址
  • 行业网站开发运营方案百度地图手机网站代码
  • 可以做网站的公司wordpress调用个人中心
  • 南召微网站建设写代码建商城网站时间
  • 淘宝做链接有哪些网站可以做装修公司网站模板下载
  • 预登记网站开发 会议金寨县建设局网站
  • 常德网络建站wordpress发布外网访问
  • 万网网站后台登陆网页界面设计的原则有哪些
  • 淘客导购网站怎么做做棋牌网站建设哪家便宜
  • 找外国女朋友的网站建设上杭网页制作
  • 专业网络建站公司太原市城市建设规划局官方网站
  • 网站建设模板软件企业网站策划实训
  • 做个兼职网站设计手机兼职软件推荐app
  • 如何在本地发布自己做的网站对于网站运营应该如何做
  • 重庆营销网站制作徐州网站建设模板
  • 聊城做手机网站福州商城网站开发公司
  • 做网站首页布局设计注意事项眉山建设局网站
  • 简单手机网站模板四川建设招标网站首页
  • 企业网站的建设目的有什么网站制作怎么做语音搜索框
  • 网站建设费用估计Wordpress pay plugin