西安 网站设计,wordpress 在线报名,网站 谁建设 谁负责,中国新兴建设招聘网站文章目录 1、水位线的生成原则2、有序流内置水位线3、乱序流内置水位线4、自定义周期性水位线生成器5、自定义断点式水位线生成器6、从数据源中发送水位线 1、水位线的生成原则
水位线出现#xff0c;即代表这个时间之前的数据已经全部到齐#xff0c;之后不会再出现之前的数… 文章目录 1、水位线的生成原则2、有序流内置水位线3、乱序流内置水位线4、自定义周期性水位线生成器5、自定义断点式水位线生成器6、从数据源中发送水位线 1、水位线的生成原则
水位线出现即代表这个时间之前的数据已经全部到齐之后不会再出现之前的数据了。参考前面的乱序流可以得出
想要保证数据绝对正确就得加足够大的延迟但实时性就没保障了想要实时性强就得把延迟设置小但此时迟到数据可能遗漏准确性降低
水位线的定义是对低延迟和结果准确性的一个权衡。Flink生成水位线的方法是.assignTimestampsAndWatermarks()它主要用来为流中的数据分配时间戳并生成水位线来指示事件时间
DataStreamEvent stream env.addSource(xxx);DataStream withTimestampsAndWatermarks stream.assignTimestampsAndWatermarks(WatermarkStrategy对象);
WatermarkStrategy是一个接口包含了一个时间戳分配器TimestampAssigner和一个水位线生成WatermarkGenerator
public interface WatermarkStrategyT extends TimestampAssignerSupplierT,WatermarkGeneratorSupplierT{// 负责从流中数据元素的某个字段中提取时间戳并分配给元素。时间戳的分配是生成水位线的基础。OverrideTimestampAssignerT createTimestampAssigner(TimestampAssignerSupplier.Context context);// 主要负责按照既定的方式基于时间戳生成水位线OverrideWatermarkGeneratorT createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
}
2、有序流内置水位线
有序流的时间戳全部单调递增没有迟到数据直接WatermarkStrategy.forMonotonousTimestamps()就可以拿到WatermarkStrategy对象
public class WatermarkMonoDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(node01, 9527).map(new WaterSensorMapFunction());// TODO 1.定义Watermark策略WatermarkStrategyWaterSensor watermarkStrategy WatermarkStrategy// 1.1 指定watermark生成升序的watermark没有等待时间.WaterSensorforMonotonousTimestamps()// 1.2 指定 时间戳分配器从数据中提取.withTimestampAssigner(new SerializableTimestampAssignerWaterSensor() {Overridepublic long extractTimestamp(WaterSensor element, long recordTimestamp) {System.out.println(数据 element ,recordTs recordTimestamp);// 返回的时间戳要毫秒这里拿自定义对象的ts属性做为时间戳return element.getTs() * 1000L;}});// TODO 2. 指定 watermark策略SingleOutputStreamOperatorWaterSensor sensorDSwithWatermark sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);sensorDSwithWatermark.keyBy(sensor - sensor.getId())// TODO 3.使用事件时间语义的窗口别再用处理时间TumblingProcessTime.window(TumblingEventTimeWindows.of(Time.seconds(10))).process(new ProcessWindowFunctionWaterSensor, String, String, TimeWindow() {Overridepublic void process(String s, Context context, IterableWaterSensor elements, CollectorString out) throws Exception {long startTs context.window().getStart();long endTs context.window().getEnd();String windowStart DateFormatUtils.format(startTs, yyyy-MM-dd HH:mm:ss.SSS);String windowEnd DateFormatUtils.format(endTs, yyyy-MM-dd HH:mm:ss.SSS);long count elements.spliterator().estimateSize();out.collect(key s 的窗口[ windowStart , windowEnd )包含 count 条数据 elements.toString());}}).print();env.execute();}
}
执行下输入10时逻辑时钟被推到了10s到达区间触发窗口执行全窗口函数的process输出当前窗口的数据 3、乱序流内置水位线
调用WatermarkStrategy. forBoundedOutOfOrderness()传入延迟时间
public class WatermarkOutOfOrdernessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(node01, 9527).map(new WaterSensorMapFunction());// TODO 1.定义Watermark策略WatermarkStrategyWaterSensor watermarkStrategy WatermarkStrategy// 1.1 指定watermark生成乱序的等待3s.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(3))// 1.2 指定 时间戳分配器从数据中提取.withTimestampAssigner((element, recordTimestamp) - {// 返回的时间戳要 毫秒System.out.println(数据 element ,recordTs recordTimestamp);return element.getTs() * 1000L;});// TODO 2. 指定 watermark策略SingleOutputStreamOperatorWaterSensor sensorDSwithWatermark sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);sensorDSwithWatermark.keyBy(sensor - sensor.getId())// TODO 3.使用 事件时间语义 的窗口.window(TumblingEventTimeWindows.of(Time.seconds(10))).process(new ProcessWindowFunctionWaterSensor, String, String, TimeWindow() {Overridepublic void process(String s, Context context, IterableWaterSensor elements, CollectorString out) throws Exception {long startTs context.window().getStart();long endTs context.window().getEnd();String windowStart DateFormatUtils.format(startTs, yyyy-MM-dd HH:mm:ss.SSS);String windowEnd DateFormatUtils.format(endTs, yyyy-MM-dd HH:mm:ss.SSS);long count elements.spliterator().estimateSize();out.collect(key s 的窗口[ windowStart , windowEnd )包含 count 条数据 elements.toString());}}).print();env.execute();}
}
执行 简单分析下结果
第一条数据s111进来创建窗口水位线为1s-3s延迟3s)s11010进来水位线为10-3 7s还未到达10窗口不触发若是有序流无等待下此时窗口已被触发了此时进来一条乱序数据比如s1666-33s水位线保持上面的7不变watermark不会推进且6这条数据也会被统计在[0,10)的区间内s11111进来11-38也不会触发但这条数据是属于[10,20)区间的那个桶的s11313进来达到10窗口触发
4、自定义周期性水位线生成器
上面只是定义了时间戳的提取逻辑水位线的生成采用的默认内置策略。接下来自定义水位线生成器周期性水位生成器。 周期性生成器是通过onEvent()观察判断输入的事件而在onPeriodicEmit()里发射生成的水位线 // 自定义水位线的产生
public class CustomPeriodicWatermarkExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(node01, 9527).map(new WaterSensorMapFunction());// 定义Watermark策略WatermarkStrategyWaterSensor watermarkStrategy WatermarkStrategy// 1.1 指定watermark生成器.WaterSensorforGenerator(context - MyPeriodWatermarkGenerator(3000L))// 1.2 指定时间戳分配器从数据中提取.withTimestampAssigner((element, recordTimestamp) - {// 返回的时间戳要 毫秒System.out.println(数据 element ,recordTs recordTimestamp);return element.getTs() * 1000L;});// TODO 2. 指定 watermark策略SingleOutputStreamOperatorWaterSensor sensorDSwithWatermark sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);sensorDSwithWatermark.keyBy(sensor - sensor.getId())// TODO 3.使用 事件时间语义 的窗口.window(TumblingEventTimeWindows.of(Time.seconds(10))).process(new ProcessWindowFunctionWaterSensor, String, String, TimeWindow() {Overridepublic void process(String s, Context context, IterableWaterSensor elements, CollectorString out) throws Exception {long startTs context.window().getStart();long endTs context.window().getEnd();String windowStart DateFormatUtils.format(startTs, yyyy-MM-dd HH:mm:ss.SSS);String windowEnd DateFormatUtils.format(endTs, yyyy-MM-dd HH:mm:ss.SSS);long count elements.spliterator().estimateSize();out.collect(key s 的窗口[ windowStart , windowEnd )包含 count 条数据 elements.toString());}}).print();env.execute();}}
模仿前面的内置生成器定义自己的水位线生成器
public class MyPeroidWatermarkGenerator implements WatermarkGeneratorEvent {private Long delayTime 5000L; // 延迟时间private Long maxTs -Long.MAX_VALUE delayTime 1L; // 观察到的最大时间戳//构造方法传入延迟时间构造水位线生成器对象public MyPeroidWatermarkGenerator(long delayTime){this.delayTime delayTime;this.maxTs Long.MIN_VALUE this.delayTime 1;}/*** 每条数据进来都调用一次用来提取最大的事件事件*/Overridepublic void onEvent(Event eventlong eventTimestampWatermarkOutput output) {// 每来一条数据就调用一次maxTs Math.max(event.timestampmaxTs); // 更新最大时间戳System.out.println(调用了onEvent方法获取目前为止最大的时间戳 maxTimestamp);}/*** 周期性调用默认20ms*/Overridepublic void onPeriodicEmit(WatermarkOutput output) {// 发射水位线默认200ms调用一次output.emitWatermark(new Watermark(maxTs - delayTime - 1L));System,out,println(调用了onPeriodicEmit方法,生成watermark (maxTimestamp - delayTs - 1) );}}核心部分指定水位线生成器的Lamdba表达式展开就是 运行
数据没进来前每200ms调用一次发射水位线的方法此时的水位线是构造方法里Long.MIN_VALUE那个进来一条数据调用onEvent最大时间戳被更新到周期后再发射水位线maxTs-delayTs-1继续周期性调用onPeriodicEmit方法 onPeriodicEmit()里调用output.emitWatermark()就可以发出水位线了这个方法由系统框架周期性地调用默认200ms一次 修改默认的周期比如改为400ms
env.getConfig().setAutoWatermarkInterval(400L);5、自定义断点式水位线生成器
断点式生成器会不停地检测onEvent()中的事件发现带有水位线信息的当事件时就立即发出水位线。改下代码定义水位线生成器
public class PointWatermarkGenerator implements WatermarkGeneratorEvent {private Long delayTime 5000L; // 延迟时间private Long maxTs -Long.MAX_VALUE delayTime 1L; // 观察到的最大时间戳//构造方法传入延迟时间构造水位线生成器对象public MyPeroidWatermarkGenerator(long delayTime){this.delayTime delayTime;this.maxTs Long.MIN_VALUE this.delayTime 1;}/*** 每条数据进来都调用一次用来提取最大的事件事件*/Overridepublic void onEvent(Event eventlong eventTimestampWatermarkOutput output) {// 每来一条数据就调用一次maxTs Math.max(event.timestampmaxTs); // 更新最大时间戳// 发射水位线output.emitWatermark(new Watermark(maxTs - delayTime - 1L));System.out.println(调用了onEvent方法获取目前为止最大的时间戳 maxTimestamp ,生成watermark (maxTimestamp - delayTs - 1));}/*** 周期性调用默认20ms*/Overridepublic void onPeriodicEmit(WatermarkOutput output) {}}周期性代码改为
//...// 定义Watermark策略WatermarkStrategyWaterSensor watermarkStrategy WatermarkStrategy// 1.1 指定watermark生成器.WaterSensorforGenerator(context - PointWatermarkGenerator(3000L))// 1.2 指定时间戳分配器从数据中提取.withTimestampAssigner((element, recordTimestamp) - {// 返回的时间戳要 毫秒return element.getTs() * 1000L;});
运行此时不再周期性的发射水位线 6、从数据源中发送水位线
在自定义的数据源中抽取事件时间然后发送水位线
env.fromSource(
kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), kafkasource
)//注意fromSorce方法的第二个传参之前用的WatermarkStrategy.noWatermark()
注意此时不用再assignTimestampsAndWatermarks了在自定义数据源中生成水位线和在程序中使用assignTimestampsAndWatermarks方法生成水位线二者只能取其一