wordpress 手册,长沙网站seo外包,windows优化大师会员,上海有哪些建设工程公司在 Flink 的多层 API中#xff0c;处理函数是最底层的API#xff0c;是所有转换算子的一个概括性的表达#xff0c;可以自定义处理逻辑 在处理函数中#xff0c;我们直面的就是数据流中最基本的元素#xff1a;数据事件#xff08;event#xff09;、状态#xff08;st…在 Flink 的多层 API中处理函数是最底层的API是所有转换算子的一个概括性的表达可以自定义处理逻辑 在处理函数中我们直面的就是数据流中最基本的元素数据事件event、状态state以及时间time。这就相当于对流有了完全的控制权
基本处理函数主要是定义数据流的转换操作其所对应的函数类为ProcessFunction 处理函数的功能和使用
对于常用的转换算子来说
MapFunction只能获取到当前的数据AggregateFunction 中除数据外还可以获取到当前的状态以累加器 Accumulator 形式出现RichMapFunction提供了获取运行时上下文的方法 getRuntimeContext()
但是无论那种算子如果我们想要访问事件的时间戳或者当前的水位线信息都是完全做不到的
与时间相关的操作只能用时间窗口去处理但如果要求对时间有更精细的控制需要能够获取水位线甚至要“把控时间”、定义什么时候做什么事这就不是基本的时间窗口能够实现的了
因此需要使用处理函数
处理函数提供了一个“定时服务”TimerService我们可以通过它访问流中的事件event、时间戳timestamp、水位线watermark甚至可以注册“定时事件”处理函数继承了 AbstractRichFunction 抽象类所以拥有富函数类的所有特性同样可以访问状态state和其他运行时信息处理函数还可以直接将数据输出到侧输出流side output中 处理函数的简单使用基于 DataStream 调用.process()方法就方法需要传入一个 ProcessFunction 作为参数用来定义处理逻辑
stream.process(new MyProcessFunction())
简单示例
public class ProcessFunctionTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.addSource(new ClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.EventforMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssignerEvent() {Overridepublic long extractTimestamp(Event event, long l) {return event.timestamp;}})).process(new ProcessFunctionEvent, String() {Overridepublic void processElement(Event value, Context ctx, CollectorString out) throws Exception {if (value.user.equals(Mary)) {out.collect(value.user);} else if (value.user.equals(Bob)) {out.collect(value.user);out.collect(value.user);}System.out.println(ctx.timerService().currentWatermark());}}).print();env.execute();}
}
在 ProcessFunction 中重写了.processElement()方法参数输入上下文对象输出自定义处理逻辑
ProcessFunction 解析
源码解析
源码如下
public abstract class ProcessFunctionI, O extends AbstractRichFunction {private static final long serialVersionUID 1L;/*** Process one element from the input stream.** pThis function can output zero or more elements using the {link Collector} parameter and* also update internal state or set timers using the {link Context} parameter.** param value The input value.* param ctx A {link Context} that allows querying the timestamp of the element and getting a* {link TimerService} for registering timers and querying the time. The context is only* valid during the invocation of this method, do not store it.* param out The collector for returning result values.* throws Exception This method may throw exceptions. Throwing an exception will cause the* operation to fail and may trigger recovery.*/public abstract void processElement(I value, Context ctx, CollectorO out) throws Exception;/*** Called when a timer set using {link TimerService} fires.** param timestamp The timestamp of the firing timer.* param ctx An {link OnTimerContext} that allows querying the timestamp of the firing timer,* querying the {link TimeDomain} of the firing timer and getting a {link TimerService}* for registering timers and querying the time. The context is only valid during the* invocation of this method, do not store it.* param out The collector for returning result values.* throws Exception This method may throw exceptions. Throwing an exception will cause the* operation to fail and may trigger recovery.*/public void onTimer(long timestamp, OnTimerContext ctx, CollectorO out) throws Exception {}/*** Information available in an invocation of {link #processElement(Object, Context, Collector)}* or {link #onTimer(long, OnTimerContext, Collector)}.*/public abstract class Context {/*** Timestamp of the element currently being processed or timestamp of a firing timer.** pThis might be {code null}, for example if the time characteristic of your program is* set to {link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.*/public abstract Long timestamp();/** A {link TimerService} for querying time and registering timers. */public abstract TimerService timerService();/*** Emits a record to the side output identified by the {link OutputTag}.** param outputTag the {code OutputTag} that identifies the side output to emit to.* param value The record to emit.*/public abstract X void output(OutputTagX outputTag, X value);}/*** Information available in an invocation of {link #onTimer(long, OnTimerContext, Collector)}.*/public abstract class OnTimerContext extends Context {/** The {link TimeDomain} of the firing timer. */public abstract TimeDomain timeDomain();}
}可以看到抽象类 ProcessFunction 继承了 AbstractRichFunction有两个泛型类型参数
I 表示 Input也就是输入的数据类型O 表示 Output也就是处理完成之后输出的数据类型
其内部单独定义了两个方法一个是必须要实现的抽象方法.processElement()另一个是非抽象方法.onTimer()
.processElement()用于“处理元素”定义了处理的核心逻辑。这个方法对于流中的每个元素都会调用一次参数包括三个输入数据值 value上下文 ctx以及“收集器”Collectorout。方法没有返回值处理之后的输出数据是通过收集器 out 来定义 value当前流中的输入元素也就是正在处理的数据类型与流中数据类型一致ctx类型是 ProcessFunction 中定义的内部抽象类 Context表示当前运行的上下文可以获取到当前的时间戳并提供了用于查询时间和注册定时器的“定时服务”(TimerService)以及可以将数据发送到“侧输出流”side output的方法.output()out“收集器”类型为 Collector用于返回输出数据。使用方式与 flatMap算子中的收集器完全一样直接调用 out.collect()方法就可以向下游发出一个数据。这个方法可以多次调用也可以不调用.onTimer()用于定义定时触发的操作这个方法只有在注册好的定时器触发的时候才会调用在 Flink 中只有“按键分区流”KeyedStream才支持设置定时器的操作而定时器是通过“定时服务”TimerService 来注册的 参数时间戳timestamp上下文ctx收集器out【这里的时间戳是指设置好的触发时间在事件时间语义下就是水位线】
利用onTimer可以自定义数据按照时间分组、定时触发计算输出结果这样就实现了窗口的功能
处理函数分类
ProcessFunction最基本的处理函数基于 DataStream 直接调用.process()时作为参数传入KeyedProcessFunction对流按键分区后的处理函数基于 KeyedStream 调用.process()时作为参数传入要想使用定时器必须基于 KeyedStream ProcessWindowFunction开窗之后的处理函数也是全窗口函数的代表。基于 WindowedStream 调用.process()时作为参数传入ProcessAllWindowFunction开窗之后的处理函数基于 AllWindowedStream 调用.process()时作为参数传入CoProcessFunction合并connect两条流之后的处理函数基于 ConnectedStreams 调用.process()时作为参数传入ProcessJoinFunction间隔连接interval join两条流之后的处理函数基于 IntervalJoined 调用.process()时作为参数传入BroadcastProcessFunction广播连接流处理函数基于 BroadcastConnectedStream 调用.process()时作为参数传入这里的“广播连接流”BroadcastConnectedStream是一个未 keyBy 的普通 DataStream 与一个广播流BroadcastStream做连接conncet之后的产物KeyedBroadcastProcessFunction按键分区的广播连接流处理函数同样是基于BroadcastConnectedStream 调用.process()时作为参数传入这时的广播连接流是一个 KeyedStream与广播流BroadcastStream做连接之后的产物 学习课程链接【尚硅谷】Flink1.13实战教程涵盖所有flink-Java知识点_哔哩哔哩_bilibili
文章转载自: http://www.morning.fpbj.cn.gov.cn.fpbj.cn http://www.morning.qyrnp.cn.gov.cn.qyrnp.cn http://www.morning.qhmhz.cn.gov.cn.qhmhz.cn http://www.morning.wxfjx.cn.gov.cn.wxfjx.cn http://www.morning.qpxrr.cn.gov.cn.qpxrr.cn http://www.morning.tkxyx.cn.gov.cn.tkxyx.cn http://www.morning.rmmz.cn.gov.cn.rmmz.cn http://www.morning.lfmwt.cn.gov.cn.lfmwt.cn http://www.morning.lwnwl.cn.gov.cn.lwnwl.cn http://www.morning.cpktd.cn.gov.cn.cpktd.cn http://www.morning.rhjhy.cn.gov.cn.rhjhy.cn http://www.morning.zrkws.cn.gov.cn.zrkws.cn http://www.morning.krxzl.cn.gov.cn.krxzl.cn http://www.morning.zffn.cn.gov.cn.zffn.cn http://www.morning.gfmpk.cn.gov.cn.gfmpk.cn http://www.morning.gglhj.cn.gov.cn.gglhj.cn http://www.morning.ey3h2d.cn.gov.cn.ey3h2d.cn http://www.morning.ranglue.com.gov.cn.ranglue.com http://www.morning.fsrtm.cn.gov.cn.fsrtm.cn http://www.morning.ngcbd.cn.gov.cn.ngcbd.cn http://www.morning.tdcql.cn.gov.cn.tdcql.cn http://www.morning.pyxtn.cn.gov.cn.pyxtn.cn http://www.morning.pkmcr.cn.gov.cn.pkmcr.cn http://www.morning.kongpie.com.gov.cn.kongpie.com http://www.morning.lzqdl.cn.gov.cn.lzqdl.cn http://www.morning.tqxtx.cn.gov.cn.tqxtx.cn http://www.morning.yrlfy.cn.gov.cn.yrlfy.cn http://www.morning.rmlz.cn.gov.cn.rmlz.cn http://www.morning.ghfmd.cn.gov.cn.ghfmd.cn http://www.morning.mjtgt.cn.gov.cn.mjtgt.cn http://www.morning.jpgfq.cn.gov.cn.jpgfq.cn http://www.morning.ktrh.cn.gov.cn.ktrh.cn http://www.morning.kjawz.cn.gov.cn.kjawz.cn http://www.morning.lnbcx.cn.gov.cn.lnbcx.cn http://www.morning.xkgyh.cn.gov.cn.xkgyh.cn http://www.morning.kdrly.cn.gov.cn.kdrly.cn http://www.morning.xhfky.cn.gov.cn.xhfky.cn http://www.morning.tmfhx.cn.gov.cn.tmfhx.cn http://www.morning.xrksf.cn.gov.cn.xrksf.cn http://www.morning.ygwyt.cn.gov.cn.ygwyt.cn http://www.morning.mfxcg.cn.gov.cn.mfxcg.cn http://www.morning.qgwpx.cn.gov.cn.qgwpx.cn http://www.morning.nwllb.cn.gov.cn.nwllb.cn http://www.morning.jstggt.cn.gov.cn.jstggt.cn http://www.morning.zzbwjy.cn.gov.cn.zzbwjy.cn http://www.morning.ymqfx.cn.gov.cn.ymqfx.cn http://www.morning.czrcf.cn.gov.cn.czrcf.cn http://www.morning.ssjtr.cn.gov.cn.ssjtr.cn http://www.morning.bphqd.cn.gov.cn.bphqd.cn http://www.morning.rui931.cn.gov.cn.rui931.cn http://www.morning.jrrqs.cn.gov.cn.jrrqs.cn http://www.morning.bklkt.cn.gov.cn.bklkt.cn http://www.morning.tgqzp.cn.gov.cn.tgqzp.cn http://www.morning.sxbgc.cn.gov.cn.sxbgc.cn http://www.morning.bwhcl.cn.gov.cn.bwhcl.cn http://www.morning.hptbp.cn.gov.cn.hptbp.cn http://www.morning.mhfbf.cn.gov.cn.mhfbf.cn http://www.morning.jydhl.cn.gov.cn.jydhl.cn http://www.morning.qkzdc.cn.gov.cn.qkzdc.cn http://www.morning.frtb.cn.gov.cn.frtb.cn http://www.morning.kaakyy.com.gov.cn.kaakyy.com http://www.morning.haibuli.com.gov.cn.haibuli.com http://www.morning.fpbj.cn.gov.cn.fpbj.cn http://www.morning.mcqhb.cn.gov.cn.mcqhb.cn http://www.morning.kxgn.cn.gov.cn.kxgn.cn http://www.morning.hwxxh.cn.gov.cn.hwxxh.cn http://www.morning.jwmws.cn.gov.cn.jwmws.cn http://www.morning.rgxf.cn.gov.cn.rgxf.cn http://www.morning.rsdm.cn.gov.cn.rsdm.cn http://www.morning.gwxsk.cn.gov.cn.gwxsk.cn http://www.morning.mnpdy.cn.gov.cn.mnpdy.cn http://www.morning.ypdhl.cn.gov.cn.ypdhl.cn http://www.morning.fqpyj.cn.gov.cn.fqpyj.cn http://www.morning.czcbl.cn.gov.cn.czcbl.cn http://www.morning.jbxfm.cn.gov.cn.jbxfm.cn http://www.morning.wrbx.cn.gov.cn.wrbx.cn http://www.morning.nwczt.cn.gov.cn.nwczt.cn http://www.morning.qflcb.cn.gov.cn.qflcb.cn http://www.morning.tfwg.cn.gov.cn.tfwg.cn http://www.morning.njddz.cn.gov.cn.njddz.cn