网站功能建设流程图,wordpress下载模板,揭阳网页制作,wordpress 酒主题Flink笔记整理#xff08;五#xff09; 文章目录 Flink笔记整理#xff08;五#xff09;七、处理函数#xff08;最底层最常用最灵活#xff09;7.1基本处理函数#xff08;ProcessFunction#xff09;处理函数的功能和使用ProcessFunction解析 7.2按键分区处理函数五 文章目录 Flink笔记整理五七、处理函数最底层最常用最灵活7.1基本处理函数ProcessFunction处理函数的功能和使用ProcessFunction解析 7.2按键分区处理函数KeyedProcessFunction定时器Timer和定时服务TimeService 7.3 窗口处理函数窗口处理函数的使用ProcessWindowFunction解析 7.4 应用案例——Top N 总结 七、处理函数最底层最常用最灵活
之前所介绍的流处理API无论是基本的转换、聚合还是更为复杂的窗口操作其实都是基于DataStream进行转换的所以可以统称为DataStream API。
在Flink更底层我们可以不定义任何具体的算子比如mapfilter或者window而只是提炼出一个统一的“处理”process操作——它是所有转换算子的一个概括性的表达可以自定义处理逻辑所以这一层接口就被叫作“处理函数”process function。 7.1基本处理函数ProcessFunction
处理函数的功能和使用
之前学习的转换算子一般只是针对某种具体操作来定义的能够拿到的信息比较有限。如果我们想要访问事件的时间戳或者当前的水位线信息都是完全做不到的。跟时间相关的操作目前我们只会用窗口来处理。而在很多应用需求中要求我们对时间有更精细的控制需要能够获取水位线甚至要“把控时间”、定义什么时候做什么事这就不是基本的时间窗口能够实现的了。
这时就需要使用底层的处理函数。处理函数提供了一个“定时服务”TimerService我们可以通过它访问流中的事件event、时间戳timestamp、水位线watermark甚至可以注册“定时事件”。而且处理函数继承了AbstractRichFunction抽象类所以拥有富函数类的所有特性同样可以访问状态state和其他运行时信息。此外处理函数还可以直接将数据输出到侧输出流side output中。所以处理函数是最为灵活的处理方法可以实现各种自定义的业务逻辑。
处理函数的使用与基本的转换操作类似只需要直接基于DataStream调用.process()方法就可以了。方法需要传入一个ProcessFunction作为参数用来定义处理逻辑。
stream.process(new MyProcessFunction())这里ProcessFunction不是接口而是一个抽象类继承了AbstractRichFunctionMyProcessFunction是它的一个具体实现。所以所有的处理函数都是富函数RichFunction富函数可以调用的东西这里同样都可以调用。
ProcessFunction解析
在源码中我们可以看到抽象类ProcessFunction继承了AbstractRichFunction有两个泛型类型参数I表示Input也就是输入的数据类型O表示Output也就是处理完成之后输出的数据类型。内部单独定义了两个方法一个是必须要实现的抽象方法.processElement()另一个是非抽象方法.onTimer()。 public abstract class ProcessFunctionI, O extends AbstractRichFunction {...public abstract void processElement(I value, Context ctx, CollectorO out) throws Exception;public void onTimer(long timestamp, OnTimerContext ctx, CollectorO out) throws Exception {}...
}ProcessFunction解析
7.2按键分区处理函数KeyedProcessFunction
在上节中提到只有在KeyedStream中才支持使用TimerService设置定时器的操作。所以一般情况下我们都是先做了keyBy分区之后再去定义处理操作代码中更加常见的处理函数是KeyedProcessFunction。
ProcessFunction解析
定时器Timer和定时服务TimeService
定时器Timer和定时服务TimeService及例子
7.3 窗口处理函数
除了KeyedProcessFunction另外一大类常用的处理函数就是基于窗口的ProcessWindowFunction和ProcessAllWindowFunction了。在第六章窗口函数的介绍中我们之前已经简单地使用过窗口处理函数了。
窗口处理函数的使用
进行窗口计算我们可以直接调用现成的简单聚合方法sum/max/min也可以通过调用.reduce()或.aggregate()来自定义一般的增量聚合函数ReduceFunction/AggregateFucntion而对于更加复杂、需要窗口信息和额外状态的一些场景我们还可以直接使用全窗口函数、把数据全部收集保存在窗口内等到触发窗口计算时再统一处理。窗口处理函数就是一种典型的全窗口函数。
窗口处理函数ProcessWindowFunction的使用与其他窗口函数类似也是基于WindowedStream直接调用方法就可以只不过这时调用的是.process()。
stream.keyBy( t - t.f0 ).window( TumblingEventTimeWindows.of(Time.seconds(10)) ).process(new MyProcessWindowFunction())ProcessWindowFunction解析
ProcessWindowFunction既是处理函数又是全窗口函数。从名字上也可以推测出它的本质似乎更倾向于“窗口函数”一些。事实上它的用法也确实跟其他处理函数有很大不同。我们可以从源码中的定义看到这一点 ProcessWindowFunction解析
7.4 应用案例——Top N
案例需求实时统计一段时间内的出现次数最多的水位。例如统计最近10秒钟内出现次数最多的两个水位并且每5秒钟更新一次。我们知道这可以用一个滑动窗口来实现。于是就需要开滑动窗口收集传感器的数据按照不同的水位进行统计而后汇总排序并最终输出前两名。这其实就是著名的“Top N”问题。 案例实现代码 总结