训做网站的心得体会范文,wordpress快,网站建设参考,网站中的表单怎么做目录
窗口分类
1.按照驱动类型分类
1. 时间窗口#xff08;Time window#xff09;
2.计数窗口#xff08;Count window#xff09;
2.按照窗口分配数据的规则分类
窗口API分类
API调用
窗口分配器器#xff1a;
窗口函数
增量聚合函数#xff1a;
全窗口函数…目录
窗口分类
1.按照驱动类型分类
1. 时间窗口Time window
2.计数窗口Count window
2.按照窗口分配数据的规则分类
窗口API分类
API调用
窗口分配器器
窗口函数
增量聚合函数
全窗口函数
flink sql 窗口函数 窗口 | Apache Flink
窗口分类
1.按照驱动类型分类
1. 时间窗口Time window 时间窗口以时间点定义窗口的开始和结束因此截取出就是某一段时间的数据。当到达结束时间时窗口不在接受数据触发计算输出结果并关闭销毁窗口。
flink有一个专门的类用来表示时间窗口TimeWindow,这个类只有两个私有属性窗口的方法获取最大时间戳为end-1,因此窗口[start,end) 左开右闭
PublicEvolving
public class TimeWindow extends Window {private final long start;private final long end;public TimeWindow(long start, long end) {this.start start;this.end end;}Overridepublic long maxTimestamp() {return end - 1;}
2.计数窗口Count window
计数窗口是基于元素个数截取,在到达固定个数是就触发计算并关闭窗口。
3.全局窗口Global Windows
是计数窗口的底层实现窗口分配器由GlobalWindows类提供需要自定义触发器实现窗口的计算 stream.keyBy(data - true).window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(2)))
// .max().aggregate(new AvgPv()).print();查看源代码windou函数后见windowStrream时获取默认的触发器
PublicEvolvingpublic WindowedStream(KeyedStreamT, K input, WindowAssigner? super T, W windowAssigner) {this.input input;this.builder new WindowOperatorBuilder(windowAssigner,windowAssigner.getDefaultTrigger(input.getExecutionEnvironment()), //湖区触发器input.getExecutionConfig(),input.getType(),input.getKeySelector(),input.getKeyType());}// 计数窗口底层采用全局窗口加计数器来实现public WindowedStreamT, KEY, GlobalWindow countWindow(long size) {return window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));}public WindowedStreamT, KEY, GlobalWindow countWindow(long size, long slide) {return window(GlobalWindows.create()).evictor(CountEvictor.of(size)).trigger(CountTrigger.of(slide));}
2.按照窗口分配数据的规则分类
滚动窗口Tumbling Window窗口大小固定窗口没有重叠 滑动窗口 Sliding Window滑动窗口有重叠也可以没有重叠如果窗口size和滑动size相等等于滚动窗口 会话窗口 Session Window基于会话对窗口进行分组与其他两个不同的是会话窗口是借用会话窗口的超时失效机触发窗口计算当数据到来后会开启一个窗口如果在超时时间内有数据陆续到来窗口不会关闭,反之会关闭极端情况如果数据总能在窗口超时时间到达前远远不断的到来该窗口会一直开启不会关闭 全局窗口 Global Window比较通用的窗口该窗口会把数据分配到一个窗口中窗口为全局有效会把相同key的数据分配到同一个窗口中默认不会触发计算跟没有窗口一样需要自定义触发器才能使用
窗口API分类
窗口大的分类可以分为按键分区和非按键分区两种按键分需要经过keyby操作会把数据进行分发实现负载均分可以并行处理更大的数据量。而非按键分区窗口相当于并行度为1使用上直接调用windowall因此一般并不推荐使用
stream
.keyby(...) //流按键分区
.window(...) //定义窗口分配器
[.trigger()] //设置出发器
[.evictor()] //设置移除器
[.allowedLateness()] // 设置延迟时间
[.sideOutputLateData()] //设置侧输出流
.reduce/aggregate/fold/apply() //处理函数
[.getSideOutput()] //获取侧输出流stream
.windowAll(...) //定义窗口分配器
[.trigger()] //设置出发器
[.evictor()] //设置移除器
[.allowedLateness()] // 设置延迟时间
[.sideOutputLateData()] //设置侧输出流
.reduce/aggregate/fold/apply() //处理函数
[.getSideOutput()] //获取侧输出流 API调用
窗口操作包含两个重要的概念窗口分配器(window Assigners和窗口函数(window function两部分
窗口分配器用于构建窗口确定窗口类型确定数据划分哪一个窗口窗口函数制定数据的计算规则
窗口分配器器
作用窗口分配器用来划分窗口属于哪一个窗口
窗口按照时间可以划分为滚动、滑动和session三种类型窗口
窗口计数划分滚动和滑动两种类型 eventStream.keyBy(data - data.url).window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).aggregate();
窗口函数
窗口函数按照计算特点可以分为增量计算和全量计算
增量聚合函数数据到达后立即计算窗口只保存中间结果。效率高性能好但不够灵活。
全量聚合函数缓存窗口的所有元素触发后统一计算效率低但计算灵活。
增量聚合函数
数据进入窗口会参与计算窗口结束前只需要保留一个聚合后的状态值内存压力小。
1.规约函数ReduceFunction数据保存留一个状态输入类型和输出类型必须一致来一条数据会处理将数据合并到状态中 stream.keyBy(r - r.f0)// 设置滚动事件时间窗口.window(TumblingEventTimeWindows.of(Time.seconds(5))).reduce(new ReduceFunctionTuple2String, Long() {Overridepublic Tuple2String, Long reduce(Tuple2String, Long value1, Tuple2String, Long value2) throws Exception {// 定义累加规则窗口闭合时向下游发送累加结果return Tuple2.of(value1.f0, value1.f1 value2.f1);}}).print(); sum、max、min等底层都是通过同名AggregateFunction实现非下面的聚合函数本质还是实现ReduceFunction结构重写了reduce方法
2.聚合函数AggrateFunction:在规约函数基础上进行完善。解决输出和输入类型必须一致的限制问题。实现应用更灵活 // 所有数据设置相同的key发送到同一个分区统计PV和UV再相除stream.keyBy(data - true).window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(2))).aggregate(new AvgPv()).print();public static class AvgPv implements AggregateFunctionEvent, Tuple2HashSetString, Long, Double {Overridepublic Tuple2HashSetString, Long createAccumulator() {// 创建累加器return Tuple2.of(new HashSetString(), 0L);}Overridepublic Tuple2HashSetString, Long add(Event value, Tuple2HashSetString, Long accumulator) {// 属于本窗口的数据来一条累加一次并返回累加器accumulator.f0.add(value.user);return Tuple2.of(accumulator.f0, accumulator.f1 1L);}Overridepublic Double getResult(Tuple2HashSetString, Long accumulator) {// 窗口闭合时增量聚合结束将计算结果发送到下游return (double) accumulator.f1 / accumulator.f0.size();}Overridepublic Tuple2HashSetString, Long merge(Tuple2HashSetString, Long a, Tuple2HashSetString, Long b) {return null;}}
全窗口函数
全窗口函数会将进入窗口的数据先进行缓存然后在窗口关闭时一起计算缓存数据会占用内存资源如果一个窗口数据量太大时可能出现内存溢出的问题
全窗口函数可以划分窗口函数(windowFunction)和处理窗口函数processWindowFunction两种
窗口函数(windowFunction)老版本通用窗口接口window()后调用apply(),传入实现windowFunction接口; 缺点是不能获取上下文信息也没有更高级的功能。因为在功能上可以被processWindowFunction全覆盖因此主键被弃用
DataStreamTuple2String, Long input ...;input.keyBy(key selector).window(window assigner).apply(new MyWindowFunction());Public
public interface WindowFunctionIN, OUT, KEY, W extends Window extends Function, Serializable {/*** Evaluates the window and outputs none or several elements.** param key The key for which this window is evaluated.* param window The window that is being evaluated.* param input The elements in the window being evaluated.* param out A collector for emitting elements.* throws Exception The function may throw exceptions to fail the program and trigger recovery.*/void apply(KEY key, W window, IterableIN input, CollectorOUT out) throws Exception;
}
处理窗口函数processWindowFunction是窗口API中最底层通用的窗口函数接口可以获取到上问对象(context,实现为调用process方法传入自定义继承ProcessWindowFunction类;
input.keyBy(t - t.f0).window(TumblingEventTimeWindows.of(Time.minutes(5))).process(new MyProcessWindowFunction());/* ... */public class MyProcessWindowFunction extends ProcessWindowFunctionTuple2String, Long, String, String, TimeWindow {Overridepublic void process(String key, Context context, IterableTuple2String, Long input, CollectorString out) {long count 0;for (Tuple2String, Long in: input) {count;}out.collect(Window: context.window() count: count);}
}
注意一般增量窗口函数和全量窗口函数可以一起使用window.aggregate()方法可以传入两个函数第一个采用增量聚合函数第二个传入全量函数这样数据在进入窗口会触发增量计算窗口不会缓存数据。当窗口关闭触发计算时结果数据穿度到全量计算参数Iterable中一般只有一个数据
aggregate(acct1,acct2) flink sql 窗口函数
flink sql 窗口也包含常见的滚动窗口、滑动窗口、session窗口但还有一种累计窗口。
在flink1.13版本后flinksql支持累计窗口CUMULATE可以实现没5分钟触发一次计算输出当天的累计数据使用样例
SELECT cast(PROCTIME() as timestamp_ltz) as window_end_time,manufacturer_name,event_id,case when state is null then -1 else state end ,cast(sum(agg)as string ) as agg
FROM TABLE(CUMULATE(TABLE dm_cumulate, DESCRIPTOR(ts1), INTERVAL 5 MINUTES, INTERVAL 1 DAY(9)))
GROUP BYwindow_end,window_start,manufacturer_name,event_id,case when state is null then -1 else state end