当前位置: 首页 > news >正文

网页建站怎么做赣州建设信息网

网页建站怎么做,赣州建设信息网,咸阳学校网站建设联系电话,厦门网站建设 模板建站系列文章目录 Flink1.17实战教程#xff08;第一篇#xff1a;概念、部署、架构#xff09; Flink1.17实战教程#xff08;第二篇#xff1a;DataStream API#xff09; Flink1.17实战教程#xff08;第三篇#xff1a;时间和窗口#xff09; Flink1.17实战教程…系列文章目录 Flink1.17实战教程第一篇概念、部署、架构 Flink1.17实战教程第二篇DataStream API Flink1.17实战教程第三篇时间和窗口 Flink1.17实战教程第四篇处理函数 Flink1.17实战教程第五篇状态管理 Flink1.17实战教程第六篇容错机制 Flink1.17实战教程第七篇Flink SQL 文章目录 系列文章目录1. 基本处理函数ProcessFunction1.1 处理函数的功能和使用1.2 ProcessFunction解析1.3 处理函数的分类 2. 按键分区处理函数KeyedProcessFunction2.1 定时器Timer和定时服务TimerService2.2 KeyedProcessFunction案例 3. 窗口处理函数3.1 窗口处理函数的使用3.2 ProcessWindowFunction解析 4. 应用案例——Top N4.1 使用ProcessAllWindowFunction4.2 使用KeyedProcessFunction 5. 侧输出流Side Output 1. 基本处理函数ProcessFunction 之前所介绍的流处理API无论是基本的转换、聚合还是更为复杂的窗口操作其实都是基于DataStream进行转换的所以可以统称为DataStream API。 在Flink更底层我们可以不定义任何具体的算子比如mapfilter或者window而只是提炼出一个统一的“处理”process操作——它是所有转换算子的一个概括性的表达可以自定义处理逻辑所以这一层接口就被叫作“处理函数”process function。 1.1 处理函数的功能和使用 我们之前学习的转换算子一般只是针对某种具体操作来定义的能够拿到的信息比较有限。如果我们想要访问事件的时间戳或者当前的水位线信息都是完全做不到的。跟时间相关的操作目前我们只会用窗口来处理。而在很多应用需求中要求我们对时间有更精细的控制需要能够获取水位线甚至要“把控时间”、定义什么时候做什么事这就不是基本的时间窗口能够实现的了。 这时就需要使用底层的处理函数。处理函数提供了一个“定时服务”TimerService我们可以通过它访问流中的事件event、时间戳timestamp、水位线watermark甚至可以注册“定时事件”。而且处理函数继承了AbstractRichFunction抽象类所以拥有富函数类的所有特性同样可以访问状态state和其他运行时信息。此外处理函数还可以直接将数据输出到侧输出流side output中。所以处理函数是最为灵活的处理方法可以实现各种自定义的业务逻辑。 处理函数的使用与基本的转换操作类似只需要直接基于DataStream调用.process()方法就可以了。方法需要传入一个ProcessFunction作为参数用来定义处理逻辑。 stream.process(new MyProcessFunction())这里ProcessFunction不是接口而是一个抽象类继承了AbstractRichFunctionMyProcessFunction是它的一个具体实现。所以所有的处理函数都是富函数RichFunction富函数可以调用的东西这里同样都可以调用。 1.2 ProcessFunction解析 在源码中我们可以看到抽象类ProcessFunction继承了AbstractRichFunction有两个泛型类型参数I表示Input也就是输入的数据类型O表示Output也就是处理完成之后输出的数据类型。 内部单独定义了两个方法一个是必须要实现的抽象方法.processElement()另一个是非抽象方法.onTimer()。 public abstract class ProcessFunctionI, O extends AbstractRichFunction {...public abstract void processElement(I value, Context ctx, CollectorO out) throws Exception;public void onTimer(long timestamp, OnTimerContext ctx, CollectorO out) throws Exception {}...}1抽象方法.processElement() 用于“处理元素”定义了处理的核心逻辑。这个方法对于流中的每个元素都会调用一次参数包括三个输入数据值value上下文ctx以及“收集器”Collectorout。方法没有返回值处理之后的输出数据是通过收集器out来定义的。 value当前流中的输入元素也就是正在处理的数据类型与流中数据类型一致。ctx类型是ProcessFunction中定义的内部抽象类Context表示当前运行的上下文可以获取到当前的时间戳并提供了用于查询时间和注册定时器的“定时服务”TimerService以及可以将数据发送到“侧输出流”side output的方法.output()。out“收集器”类型为Collector用于返回输出数据。使用方式与flatMap算子中的收集器完全一样直接调用out.collect()方法就可以向下游发出一个数据。这个方法可以多次调用也可以不调用。 通过几个参数的分析不难发现ProcessFunction可以轻松实现flatMap、map、filter这样的基本转换功能而通过富函数提供的获取上下文方法.getRuntimeContext()也可以自定义状态state进行处理这也就能实现聚合操作的功能了。 2非抽象方法.onTimer() 这个方法只有在注册好的定时器触发的时候才会调用而定时器是通过“定时服务”TimerService来注册的。打个比方注册定时器timer就是设了一个闹钟到了设定时间就会响而.onTimer()中定义的就是闹钟响的时候要做的事。所以它本质上是一个基于时间的“回调”callback方法通过时间的进展来触发在事件时间语义下就是由水位线watermark来触发了。 定时方法.onTimer()也有三个参数时间戳timestamp上下文ctx以及收集器out。这里的timestamp是指设定好的触发时间事件时间语义下当然就是水位线了。另外这里同样有上下文和收集器所以也可以调用定时服务TimerService以及任意输出处理之后的数据。 既然有.onTimer()方法做定时触发我们用ProcessFunction也可以自定义数据按照时间分组、定时触发计算输出结果这其实就实现了窗口window的功能。所以说ProcessFunction其实可以实现一切功能。 注意在Flink中只有“按键分区流”KeyedStream才支持设置定时器的操作。 1.3 处理函数的分类 我们知道DataStream在调用一些转换方法之后有可能生成新的流类型例如调用.keyBy()之后得到KeyedStream进而再调用.window()之后得到WindowedStream。对于不同类型的流其实都可以直接调用.process()方法进行自定义处理这时传入的参数就都叫作处理函数。当然它们尽管本质相同都是可以访问状态和时间信息的底层API可彼此之间也会有所差异。 Flink提供了8个不同的处理函数 1ProcessFunction 最基本的处理函数基于DataStream直接调用.process()时作为参数传入。 2KeyedProcessFunction 对流按键分区后的处理函数基于KeyedStream调用.process()时作为参数传入。要想使用定时器比如基于KeyedStream。 3ProcessWindowFunction 开窗之后的处理函数也是全窗口函数的代表。基于WindowedStream调用.process()时作为参数传入。 4ProcessAllWindowFunction 同样是开窗之后的处理函数基于AllWindowedStream调用.process()时作为参数传入。 5CoProcessFunction 合并connect两条流之后的处理函数基于ConnectedStreams调用.process()时作为参数传入。关于流的连接合并操作我们会在后续章节详细介绍。 6ProcessJoinFunction 间隔连接interval join两条流之后的处理函数基于IntervalJoined调用.process()时作为参数传入。 7BroadcastProcessFunction 广播连接流处理函数基于BroadcastConnectedStream调用.process()时作为参数传入。这里的“广播连接流”BroadcastConnectedStream是一个未keyBy的普通DataStream与一个广播流BroadcastStream做连接conncet之后的产物。关于广播流的相关操作我们会在后续章节详细介绍。 8KeyedBroadcastProcessFunction 按键分区的广播连接流处理函数同样是基于BroadcastConnectedStream调用.process()时作为参数传入。与BroadcastProcessFunction不同的是这时的广播连接流是一个KeyedStream与广播流BroadcastStream做连接之后的产物。 2. 按键分区处理函数KeyedProcessFunction 在上节中提到只有在KeyedStream中才支持使用TimerService设置定时器的操作。所以一般情况下我们都是先做了keyBy分区之后再去定义处理操作代码中更加常见的处理函数是KeyedProcessFunction。 2.1 定时器Timer和定时服务TimerService 在.onTimer()方法中可以实现定时处理的逻辑而它能触发的前提就是之前曾经注册过定时器、并且现在已经到了触发时间。注册定时器的功能是通过上下文中提供的“定时服务”来实现的。 定时服务与当前运行的环境有关。前面已经介绍过ProcessFunction的上下文Context中提供了.timerService()方法可以直接返回一个TimerService对象。TimerService是Flink关于时间和定时器的基础服务接口包含以下六个方法 // 获取当前的处理时间 long currentProcessingTime();// 获取当前的水位线事件时间 long currentWatermark();// 注册处理时间定时器当处理时间超过time时触发 void registerProcessingTimeTimer(long time);// 注册事件时间定时器当水位线超过time时触发 void registerEventTimeTimer(long time);// 删除触发时间为time的处理时间定时器 void deleteProcessingTimeTimer(long time);// 删除触发时间为time的处理时间定时器 void deleteEventTimeTimer(long time);六个方法可以分成两大类基于处理时间和基于事件时间。而对应的操作主要有三个获取当前时间注册定时器以及删除定时器。需要注意尽管处理函数中都可以直接访问TimerService不过只有基于KeyedStream的处理函数才能去调用注册和删除定时器的方法未作按键分区的DataStream不支持定时器操作只能获取当前时间。 TimerService会以键key和时间戳为标准对定时器进行去重也就是说对于每个key和时间戳最多只有一个定时器如果注册了多次onTimer()方法也将只被调用一次。 2.2 KeyedProcessFunction案例 基于keyBy之后的KeyedStream直接调用.process()方法这时需要传入的参数就是KeyedProcessFunction的实现类。 stream.keyBy( t - t.f0 ).process(new MyKeyedProcessFunction())类似地KeyedProcessFunction也是继承自AbstractRichFunction的一个抽象类与ProcessFunction的定义几乎完全一样区别只是在于类型参数多了一个K这是当前按键分区的key的类型。同样地我们必须实现一个.processElement()抽象方法用来处理流中的每一个数据另外还有一个非抽象方法.onTimer()用来定义定时器触发时的回调操作。 代码如下 public class KeyedProcessTimerDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(hadoop102, 7777).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, ts) - element.getTs() * 1000L));KeyedStreamWaterSensor, String sensorKS sensorDS.keyBy(sensor - sensor.getId());// TODO Process:keyedSingleOutputStreamOperatorString process sensorKS.process(new KeyedProcessFunctionString, WaterSensor, String() {/*** 来一条数据调用一次* param value* param ctx* param out* throws Exception*/Overridepublic void processElement(WaterSensor value, Context ctx, CollectorString out) throws Exception {//获取当前数据的keyString currentKey ctx.getCurrentKey();// TODO 1.定时器注册TimerService timerService ctx.timerService();// 1、事件时间的案例Long currentEventTime ctx.timestamp(); // 数据中提取出来的事件时间timerService.registerEventTimeTimer(5000L);System.out.println(当前key currentKey ,当前时间 currentEventTime ,注册了一个5s的定时器);// 2、处理时间的案例 // long currentTs timerService.currentProcessingTime(); // timerService.registerProcessingTimeTimer(currentTs 5000L); // System.out.println(当前key currentKey ,当前时间 currentTs ,注册了一个5s后的定时器);// 3、获取 process的 当前watermark // long currentWatermark timerService.currentWatermark(); // System.out.println(当前数据 value ,当前watermark currentWatermark);// 注册定时器 处理时间、事件时间 // timerService.registerProcessingTimeTimer(); // timerService.registerEventTimeTimer();// 删除定时器 处理时间、事件时间 // timerService.deleteEventTimeTimer(); // timerService.deleteProcessingTimeTimer();// 获取当前时间进展 处理时间-当前系统时间 事件时间-当前watermark // long currentTs timerService.currentProcessingTime(); // long wm timerService.currentWatermark();}/*** TODO 2.时间进展到定时器注册的时间调用该方法* param timestamp 当前时间进展就是定时器被触发时的时间* param ctx 上下文* param out 采集器* throws Exception*/Overridepublic void onTimer(long timestamp, OnTimerContext ctx, CollectorString out) throws Exception {super.onTimer(timestamp, ctx, out);String currentKey ctx.getCurrentKey();System.out.println(key currentKey 现在时间是 timestamp 定时器触发);}});process.print();env.execute();} }3. 窗口处理函数 除了KeyedProcessFunction另外一大类常用的处理函数就是基于窗口的ProcessWindowFunction和ProcessAllWindowFunction了。在第六章窗口函数的介绍中我们之前已经简单地使用过窗口处理函数了。 3.1 窗口处理函数的使用 进行窗口计算我们可以直接调用现成的简单聚合方法sum/max/min也可以通过调用.reduce()或.aggregate()来自定义一般的增量聚合函数ReduceFunction/AggregateFucntion而对于更加复杂、需要窗口信息和额外状态的一些场景我们还可以直接使用全窗口函数、把数据全部收集保存在窗口内等到触发窗口计算时再统一处理。窗口处理函数就是一种典型的全窗口函数。 窗口处理函数ProcessWindowFunction的使用与其他窗口函数类似也是基于WindowedStream直接调用方法就可以只不过这时调用的是.process()。 stream.keyBy( t - t.f0 ).window( TumblingEventTimeWindows.of(Time.seconds(10)) ).process(new MyProcessWindowFunction())3.2 ProcessWindowFunction解析 ProcessWindowFunction既是处理函数又是全窗口函数。从名字上也可以推测出它的本质似乎更倾向于“窗口函数”一些。事实上它的用法也确实跟其他处理函数有很大不同。我们可以从源码中的定义看到这一点 public abstract class ProcessWindowFunctionIN, OUT, KEY, W extends Window extends AbstractRichFunction {...public abstract void process(KEY key, Context context, IterableIN elements, CollectorOUT out) throws Exception;public void clear(Context context) throws Exception {}public abstract class Context implements java.io.Serializable {...} }ProcessWindowFunction依然是一个继承了AbstractRichFunction的抽象类它有四个类型参数 INinput数据流中窗口任务的输入数据类型。OUToutput窗口任务进行计算之后的输出数据类型。KEY数据中键key的类型。W窗口的类型是Window的子类型。一般情况下我们定义时间窗口W就是TimeWindow。 ProcessWindowFunction里面处理数据的核心方法.process()。方法包含四个参数。 key窗口做统计计算基于的键也就是之前keyBy用来分区的字段。context当前窗口进行计算的上下文它的类型就是ProcessWindowFunction内部定义的抽象类Context。elements窗口收集到用来计算的所有数据这是一个可迭代的集合类型。out用来发送数据输出计算结果的收集器类型为Collector。 可以明显看出这里的参数不再是一个输入数据而是窗口中所有数据的集合。而上下文context所包含的内容也跟其他处理函数有所差别 public abstract class Context implements java.io.Serializable {public abstract W window();public abstract long currentProcessingTime();public abstract long currentWatermark();public abstract KeyedStateStore windowState();public abstract KeyedStateStore globalState();public abstract X void output(OutputTagX outputTag, X value);}除了可以通过.output()方法定义侧输出流不变外其他部分都有所变化。这里不再持有TimerService对象只能通过currentProcessingTime()和currentWatermark()来获取当前时间所以失去了设置定时器的功能另外由于当前不是只处理一个数据所以也不再提供.timestamp()方法。与此同时也增加了一些获取其他信息的方法比如可以通过.window()直接获取到当前的窗口对象也可以通过.windowState()和.globalState()获取到当前自定义的窗口状态和全局状态。注意这里的“窗口状态”是自定义的不包括窗口本身已经有的状态针对当前key、当前窗口有效而“全局状态”同样是自定义的状态针对当前key的所有窗口有效。 所以我们会发现ProcessWindowFunction中除了.process()方法外并没有.onTimer()方法而是多出了一个.clear()方法。从名字就可以看出这主要是方便我们进行窗口的清理工作。如果我们自定义了窗口状态那么必须在.clear()方法中进行显式地清除避免内存溢出。 至于另一种窗口处理函数ProcessAllWindowFunction它的用法非常类似。区别在于它基于的是AllWindowedStream相当于对没有keyBy的数据流直接开窗并调用.process()方法 stream.windowAll( TumblingEventTimeWindows.of(Time.seconds(10)) ).process(new MyProcessAllWindowFunction())4. 应用案例——Top N 案例需求实时统计一段时间内的出现次数最多的水位。例如统计最近10秒钟内出现次数最多的两个水位并且每5秒钟更新一次。我们知道这可以用一个滑动窗口来实现。于是就需要开滑动窗口收集传感器的数据按照不同的水位进行统计而后汇总排序并最终输出前两名。这其实就是著名的“Top N”问题。 4.1 使用ProcessAllWindowFunction 思路一一种最简单的想法是我们干脆不区分不同水位而是将所有访问数据都收集起来统一进行统计计算。所以可以不做keyBy直接基于DataStream开窗然后使用全窗口函数ProcessAllWindowFunction来进行处理。 在窗口中可以用一个HashMap来保存每个水位的出现次数只要遍历窗口中的所有数据自然就能得到所有水位的出现次数。最后把HashMap转成一个列表ArrayList然后进行排序、取出前两名输出就可以了。 代码具体实现如下 public class ProcessAllWindowTopNDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(hadoop102, 7777).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, ts) - element.getTs() * 1000L));// 最近10秒 窗口长度 每5秒输出 滑动步长// TODO 思路一 所有数据到一起 用hashmap存 keyvcvaluecount值sensorDS.windowAll(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).process(new MyTopNPAWF()).print();env.execute();}public static class MyTopNPAWF extends ProcessAllWindowFunctionWaterSensor, String, TimeWindow {Overridepublic void process(Context context, IterableWaterSensor elements, CollectorString out) throws Exception {// 定义一个hashmap用来存keyvcvaluecount值MapInteger, Integer vcCountMap new HashMap();// 1.遍历数据, 统计 各个vc出现的次数for (WaterSensor element : elements) {Integer vc element.getVc();if (vcCountMap.containsKey(vc)) {// 1.1 key存在不是这个key的第一条数据直接累加vcCountMap.put(vc, vcCountMap.get(vc) 1);} else {// 1.2 key不存在初始化vcCountMap.put(vc, 1);}}// 2.对 count值进行排序: 利用List来实现排序ListTuple2Integer, Integer datas new ArrayList();for (Integer vc : vcCountMap.keySet()) {datas.add(Tuple2.of(vc, vcCountMap.get(vc)));}// 对List进行排序根据count值 降序datas.sort(new ComparatorTuple2Integer, Integer() {Overridepublic int compare(Tuple2Integer, Integer o1, Tuple2Integer, Integer o2) {// 降序 后 减 前return o2.f1 - o1.f1;}});// 3.取出 count最大的2个 vcStringBuilder outStr new StringBuilder();outStr.append(\n);// 遍历 排序后的 List取出前2个 考虑可能List不够2个的情况 》 List中元素的个数 和 2 取最小值for (int i 0; i Math.min(2, datas.size()); i) {Tuple2Integer, Integer vcCount datas.get(i);outStr.append(Top (i 1) \n);outStr.append(vc vcCount.f0 \n);outStr.append(count vcCount.f1 \n);outStr.append(窗口结束时间 DateFormatUtils.format(context.window().getEnd(), yyyy-MM-dd HH:mm:ss.SSS) \n);outStr.append(\n);}out.collect(outStr.toString());}} }4.2 使用KeyedProcessFunction 思路二在上一小节的实现过程中我们没有进行按键分区直接将所有数据放在一个分区上进行了开窗操作。这相当于将并行度强行设置为1在实际应用中是要尽量避免的所以Flink官方也并不推荐使用AllWindowedStream进行处理。另外我们在全窗口函数中定义了HashMap来统计vc的出现次数计算过程是要先收集齐所有数据、然后再逐一遍历更新HashMap这显然不够高效。 基于这样的想法我们可以从两个方面去做优化一是对数据进行按键分区分别统计vc的出现次数二是进行增量聚合得到结果最后再做排序输出。所以我们可以使用增量聚合函数AggregateFunction进行浏览量的统计然后结合ProcessWindowFunction排序输出来实现Top N的需求。 具体实现可以分成两步先对每个vc统计出现次数然后再将统计结果收集起来排序输出最终结果。由于最后的排序还是基于每个时间窗口的输出的统计结果中要包含窗口信息我们可以输出包含了vc、出现次数count以及窗口结束时间的Tuple3。之后先按窗口结束时间分区然后用KeyedProcessFunction来实现。 用KeyedProcessFunction来收集数据做排序这时面对的是窗口聚合之后的数据流而窗口已经不存在了我们需要确保能够收集齐所有数据所以应该在窗口结束时间基础上再“多等一会儿”。具体实现上可以采用一个延迟触发的事件时间定时器。基于窗口的结束时间来设定延迟其实并不需要等太久——因为我们是靠水位线的推进来触发定时器而水位线的含义就是“之前的数据都到齐了”。所以我们只需要设置1毫秒的延迟就一定可以保证这一点。 而在等待过程中之前已经到达的数据应该缓存起来我们这里用一个自定义的HashMap来进行存储key为窗口的标记value为List。之后每来一条数据就把它添加到当前的HashMap中并注册一个触发时间为窗口结束时间加1毫秒windowEnd 1的定时器。待到水位线到达这个时间定时器触发我们可以保证当前窗口所有vc的统计结果Tuple3都到齐了于是从HashMap中取出进行排序输出。 具体代码实现如下 public class KeyedProcessFunctionTopNDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(hadoop102, 7777).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, ts) - element.getTs() * 1000L));// 最近10秒 窗口长度 每5秒输出 滑动步长/*** TODO 思路二 使用 KeyedProcessFunction实现* 1、按照vc做keyby开窗分别count* 》 增量聚合计算 count* 》 全窗口对计算结果 count值封装 带上 窗口结束时间的 标签* 》 为了让同一个窗口时间范围的计算结果到一起去** 2、对同一个窗口范围的count值进行处理 排序、取前N个* 》 按照 windowEnd做keyby* 》 使用process 来一条调用一次需要先存分开存用HashMap,keywindowEnd,valueList* 》 使用定时器对 存起来的结果 进行 排序、取前N个*/// 1. 按照 vc 分组、开窗、聚合增量计算全量打标签// 开窗聚合后就是普通的流没有了窗口信息需要自己打上窗口的标记 windowEndSingleOutputStreamOperatorTuple3Integer, Integer, Long windowAgg sensorDS.keyBy(sensor - sensor.getVc()).window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).aggregate(new VcCountAgg(),new WindowResult());// 2. 按照窗口标签窗口结束时间keyby保证同一个窗口时间范围的结果到一起去。排序、取TopNwindowAgg.keyBy(r - r.f2).process(new TopN(2)).print();env.execute();}public static class VcCountAgg implements AggregateFunctionWaterSensor, Integer, Integer {Overridepublic Integer createAccumulator() {return 0;}Overridepublic Integer add(WaterSensor value, Integer accumulator) {return accumulator 1;}Overridepublic Integer getResult(Integer accumulator) {return accumulator;}Overridepublic Integer merge(Integer a, Integer b) {return null;}}/*** 泛型如下* 第一个输入类型 增量函数的输出 count值Integer* 第二个输出类型 Tuple3(vccountwindowEnd) ,带上 窗口结束时间 的标签* 第三个key类型 vcInteger* 第四个窗口类型*/public static class WindowResult extends ProcessWindowFunctionInteger, Tuple3Integer, Integer, Long, Integer, TimeWindow {Overridepublic void process(Integer key, Context context, IterableInteger elements, CollectorTuple3Integer, Integer, Long out) throws Exception {// 迭代器里面只有一条数据next一次即可Integer count elements.iterator().next();long windowEnd context.window().getEnd();out.collect(Tuple3.of(key, count, windowEnd));}}public static class TopN extends KeyedProcessFunctionLong, Tuple3Integer, Integer, Long, String {// 存不同窗口的 统计结果keywindowEndvaluelist数据private MapLong, ListTuple3Integer, Integer, Long dataListMap;// 要取的Top数量private int threshold;public TopN(int threshold) {this.threshold threshold;dataListMap new HashMap();}Overridepublic void processElement(Tuple3Integer, Integer, Long value, Context ctx, CollectorString out) throws Exception {// 进入这个方法只是一条数据要排序得到齐才行 》 存起来不同窗口分开存// 1. 存到HashMap中Long windowEnd value.f2;if (dataListMap.containsKey(windowEnd)) {// 1.1 包含vc不是该vc的第一条直接添加到List中ListTuple3Integer, Integer, Long dataList dataListMap.get(windowEnd);dataList.add(value);} else {// 1.1 不包含vc是该vc的第一条需要初始化listListTuple3Integer, Integer, Long dataList new ArrayList();dataList.add(value);dataListMap.put(windowEnd, dataList);}// 2. 注册一个定时器 windowEnd1ms即可// 同一个窗口范围应该同时输出只不过是一条一条调用processElement方法只需要延迟1ms即可ctx.timerService().registerEventTimeTimer(windowEnd 1);}Overridepublic void onTimer(long timestamp, OnTimerContext ctx, CollectorString out) throws Exception {super.onTimer(timestamp, ctx, out);// 定时器触发同一个窗口范围的计算结果攒齐了开始 排序、取TopNLong windowEnd ctx.getCurrentKey();// 1. 排序ListTuple3Integer, Integer, Long dataList dataListMap.get(windowEnd);dataList.sort(new ComparatorTuple3Integer, Integer, Long() {Overridepublic int compare(Tuple3Integer, Integer, Long o1, Tuple3Integer, Integer, Long o2) {// 降序 后 减 前return o2.f1 - o1.f1;}});// 2. 取TopNStringBuilder outStr new StringBuilder();outStr.append(\n);// 遍历 排序后的 List取出前 threshold 个 考虑可能List不够2个的情况 》 List中元素的个数 和 2 取最小值for (int i 0; i Math.min(threshold, dataList.size()); i) {Tuple3Integer, Integer, Long vcCount dataList.get(i);outStr.append(Top (i 1) \n);outStr.append(vc vcCount.f0 \n);outStr.append(count vcCount.f1 \n);outStr.append(窗口结束时间 vcCount.f2 \n);outStr.append(\n);}// 用完的List及时清理节省资源dataList.clear();out.collect(outStr.toString());}} }5. 侧输出流Side Output 处理函数还有另外一个特有功能就是将自定义的数据放入“侧输出流”side output输出。这个概念我们并不陌生之前在讲到窗口处理迟到数据时最后一招就是输出到侧输出流。而这种处理方式的本质其实就是处理函数的侧输出流功能。 我们之前讲到的绝大多数转换算子输出的都是单一流流里的数据类型只能有一种。而侧输出流可以认为是“主流”上分叉出的“支流”所以可以由一条流产生出多条流而且这些流中的数据类型还可以不一样。利用这个功能可以很容易地实现“分流”操作。 具体应用时只要在处理函数的.processElement()或者.onTimer()方法中调用上下文的.output()方法就可以了。 DataStreamInteger stream env.fromSource(...);OutputTagString outputTag new OutputTagString(side-output) {};SingleOutputStreamOperatorLong longStream stream.process(new ProcessFunctionInteger, Long() {Overridepublic void processElement( Integer value, Context ctx, CollectorInteger out) throws Exception {// 转换成Long输出到主流中out.collect(Long.valueOf(value));// 转换成String输出到侧输出流中ctx.output(outputTag, side-output: String.valueOf(value));} });这里output()方法需要传入两个参数第一个是一个“输出标签”OutputTag用来标识侧输出流一般会在外部统一声明第二个就是要输出的数据。 我们可以在外部先将OutputTag声明出来 OutputTagString outputTag new OutputTagString(side-output) {};如果想要获取这个侧输出流可以基于处理之后的DataStream直接调用.getSideOutput()方法传入对应的OutputTag这个方式与窗口API中获取侧输出流是完全一样的。 DataStreamString stringStream longStream.getSideOutput(outputTag);案例需求对每个传感器水位超过10的输出告警信息 代码如下 public class SideOutputDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(hadoop102, 7777).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, ts) - element.getTs() * 1000L));OutputTagString warnTag new OutputTag(warn, Types.STRING);SingleOutputStreamOperatorWaterSensor process sensorDS.keyBy(sensor - sensor.getId()).process(new KeyedProcessFunctionString, WaterSensor, WaterSensor() {Overridepublic void processElement(WaterSensor value, Context ctx, CollectorWaterSensor out) throws Exception {// 使用侧输出流告警if (value.getVc() 10) {ctx.output(warnTag, 当前水位 value.getVc() ,大于阈值10);}// 主流正常 发送数据out.collect(value);}});process.print(主流);process.getSideOutput(warnTag).printToErr(warn);env.execute();} }
http://www.tj-hxxt.cn/news/136486.html

相关文章:

  • 网站建设行业怎么样wordpress支付下载
  • 网站开发 英文文章网站悬挂备案号
  • 蓝色科技网站建设网络营销首先要做什么
  • 做二手货的网站做网站公司宣传语
  • 泰兴网站优化下载浏览器并安装
  • 佛山制作网站设计报价360地图怎么添加商户
  • 杭州市建设工程造价管理协会网站深圳品牌vi设计
  • 网站如何提交关键词wordpress熊掌号推送
  • 企业网站备案系统品牌网站建设代理
  • 东莞网站推广宣传iis网站无法启动
  • 为古汉字老人做网站网络运营与维护
  • 绵阳的网站制作公司哪家好wordpress 边框大小
  • 一般网站建设公司好抖音代运营合作协议书范本
  • 定制化网站开发报价百度seo网络营销书
  • 网站开发主流程序wordpress应用案例
  • 单页网站建设平台哪个好400电话网络推广商城网站
  • 网站建设方案书阿里云ps做网站广告logo
  • 朝阳淘宝网站建设微网站成功案例
  • 好的网站设计网站搜索网站不显示图片
  • 网站部分版块显示正在建设对于网站建设的意见和建议
  • 搭建淘宝客网站源码帮公司制作网页多少钱
  • 怎么看网站是用什么系统做的app制作需要哪些技术
  • 河北省住房和城乡建设厅的网站高米店网站建设公司
  • 电子商务网站推广策划方案云主机 怎么做网站
  • 白山住房与城乡建设局网站学广告设计需要什么学历
  • 外包做一个网站一般费用wordpress邀请码吧
  • 旅游景点网站设计论文贵州网站开发哪家便宜
  • 网站图片如何居中做360手机网站优化快
  • 手机网站建设价格是多少做网站需要模板吗
  • 一个网站需要多少空间付费阅读wordpress主题