当前位置: 首页 > news >正文

网站设置的用途网页游戏排行榜开服表

网站设置的用途,网页游戏排行榜开服表,柯桥区网站建设,长春最新发布信息双流 Join 和两个流合并是不一样的 两个流合并#xff1a;两个流变为 1 个流 union connect 双流 join: 两个流 join#xff0c;其实这两个流还是原来的#xff0c;只是满足条件的数据会变为一个新的流。 可以结合 sql 语句中的 union 和 join 的区别。 在离线 Hive 中两个流变为 1 个流 union connect 双流 join: 两个流 join其实这两个流还是原来的只是满足条件的数据会变为一个新的流。 可以结合 sql 语句中的 union 和 join 的区别。 在离线 Hive 中我们经常会使用 Join 进行多表关联。那么在实时中我们应该如何实现两条流的 Join 呢Flink DataStream API 为我们提供了3个算子来实现双流 join分别是 join -- 类似于我们以前学过的内连接 inner joincoGroup -- 类似于我们以前学过的 内连接左连接右连接intervalJoin -- 一个流中的数据可以关联另一个流中一个时间段的所有数据 下面我们分别详细看一下这3个算子是如何实现双流 Join 的。 1. Join Joining | Apache Flink Join 算子提供的语义为 “Window join”即按照指定字段和滚动/滑动/会话窗口进行内连接(InnerJoin)。Join 将有相同 Key 并且位于同一窗口中的两条流的元素进行关联。 Join 可以支持处理时间processing time和事件时间(event time)两种时间特征。 Join 通用用法如下 stream.join(otherStream) .where(KeySelector) .equalTo(KeySelector) .window(WindowAssigner) .apply(JoinFunction) Join 语义类似与离线 Hive 的 InnnerJoin (内连接)这意味着如果一个流中的元素在另一个流中没有相对应的元素则不会输出该元素。 下面我们看一下 Join 算子在不同类型窗口上的具体表现。 1.1 滚动窗口Join 当在滚动窗口上进行 Join 时所有有相同 Key 并且位于同一滚动窗口中的两条流的元素两两组合进行关联并最终传递到 JoinFunction 或 FlatJoinFunction 进行处理。 如上图所示我们定义了一个大小为 2 秒的滚动窗口最终产生 [0,1][2,3]… 这种形式的数据。上图显示了每个窗口中橘色流和绿色流的所有元素成对组合。需要注意的是在滚动窗口 [6,7] 中由于绿色流中不存在要与橘色流中元素 6、7 相关联的元素因此该窗口不会输出任何内容。 下面我们一起看一下如何实现上图所示的滚动窗口 Join 可以通过两个socket流将数据合并为一个三元组key,value1,value2 package com.bigdata.day07;import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time;import java.text.ParseException; import java.text.SimpleDateFormat; import java.time.Duration; import java.util.Arrays; import java.util.Date;/*** 基本功能:* program:FlinkDemo* author: 闫哥* create:2023-11-27 09:31:57**/ public class _ShuangLiuJoinDemo {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 并行度不为1 ,效果很难出来因为本地的并行度是16只有16个并行度都触发才能看到效果env.setParallelism(1);//2. source-加载数据 key,0,2021-03-26 12:09:00DataStreamTuple3String, Integer, String greenStream env.socketTextStream(localhost, 8888).map(new MapFunctionString, Tuple3String, Integer, String() {Overridepublic Tuple3String, Integer, String map(String line) throws Exception {String[] arr line.split(,);System.out.println(绿色 Arrays.toString(arr));return Tuple3.of(arr[0], Integer.valueOf(arr[1]), arr[2]);}})// 因为用到了EventTime 所以势必用到水印否则报错.assignTimestampsAndWatermarks(WatermarkStrategy.Tuple3String, Integer, StringforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssignerTuple3String, Integer, String() {Overridepublic long extractTimestamp(Tuple3String, Integer, String element, long recordTimestamp) {Long timeStamp 0L;SimpleDateFormat simpleDateFormat new SimpleDateFormat(yyyy-MM-dd HH:mm:ss);Date date null;try {date simpleDateFormat.parse(element.f2);} catch (ParseException e) {throw new RuntimeException(e);}timeStamp date.getTime();System.out.println(绿色的时间timeStamp);System.out.println(element.f0);return timeStamp;}}));;// 以后这个9999少用因为kafka占用这个端口 key,0,2021-03-26 12:09:00DataStreamTuple3String, Integer, String orangeStream env.socketTextStream(localhost, 7777).map(new MapFunctionString, Tuple3String,Integer,String() {Overridepublic Tuple3String, Integer, String map(String line) throws Exception {String[] arr line.split(,);System.out.println(橘色 Arrays.toString(arr));return Tuple3.of(arr[0],Integer.valueOf(arr[1]),arr[2]);}})// 因为用到了EventTime 所以势必用到水印否则报错.assignTimestampsAndWatermarks(WatermarkStrategy.Tuple3String, Integer, StringforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssignerTuple3String, Integer, String() {Overridepublic long extractTimestamp(Tuple3String, Integer, String element, long recordTimestamp) {Long timeStamp 0L;SimpleDateFormat simpleDateFormat new SimpleDateFormat(yyyy-MM-dd HH:mm:ss);Date date null;try {date simpleDateFormat.parse(element.f2);} catch (ParseException e) {throw new RuntimeException(e);}timeStamp date.getTime();System.out.println(橘色的时间timeStamp);return timeStamp;}}));//3. transformation-数据处理转换DataStream resultStream greenStream.join(orangeStream).where(tup3 - tup3.f0).equalTo(tup3 - tup3.f0).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new JoinFunctionTuple3String, Integer, String, Tuple3String, Integer, String, Tuple3String, Integer, Integer() {Overridepublic Tuple3String, Integer, Integer join(Tuple3String, Integer, String t1, Tuple3String, Integer, String t2) throws Exception {System.out.println(t1.f2);System.out.println(t2.f2);return Tuple3.of(t1.f0, t1.f1, t2.f1);}});//4. sink-数据输出resultStream.print();//5. execute-执行env.execute();} } 总结非常重要 1 要想测试这个效果需要将并行度设置为1 2窗口中数据的打印是需要触发的没有触发的数据窗口内是不会进行计算的所以记得输入触发的数据。 假如使用了EventTime 作为时间语义不管是窗口开始和结束时间还是触发的条件都跟系统时间没有关系而跟输入的数据有关系举例 假如你的第一条数据是key,0,2021-03-26 12:09:01 窗口的大小是5s水印是3秒 窗口的开始时间为 2021-03-26 12:09:00 结束时间是 2021-03-26 12:09:05 触发时间是2021-03-26 12:09:08 为什么呢 水印时间 结束时间 水印时间是2021-03-26 12:09:08 - 3 2021-03-26 12:09:05 2021-03-26 12:09:05 时间窗口如果是 00~05 ,05 这个时间点的数据是不包含在本窗口内的而是归于下一个窗口所谓的前包后不包。 如上代码所示为绿色流和橘色流指定 BoundedOutOfOrdernessWatermarks Watermark 策略设置100毫秒的最大可容忍的延迟时间同时也会为流分配事件时间戳。假设输入流为 格式两条流输入元素如下所示 绿色流 key,0,2021-03-26 12:09:00 key,1,2021-03-26 12:09:01 key,2,2021-03-26 12:09:02 key,4,2021-03-26 12:09:04 key,5,2021-03-26 12:09:05 key,8,2021-03-26 12:09:08 key,9,2021-03-26 12:09:09 key,11,2021-03-26 12:09:11橘色流 key,0,2021-03-26 12:09:00 key,1,2021-03-26 12:09:01 key,2,2021-03-26 12:09:02 key,3,2021-03-26 12:09:03 key,4,2021-03-26 12:09:04 key,6,2021-03-26 12:09:06 key,7,2021-03-26 12:09:07 key,11,2021-03-26 12:09:11 1.2 滑动窗口Join [解释一下即可不用深究 ] 当在滑动窗口上进行 Join 时所有有相同 Key 并且位于同一滑动窗口中的两条流的元素两两组合进行关联并最终传递到 JoinFunction 进行处理。 如上图所示我们定义了一个窗口大小为 2 秒、滑动步长为 1 秒的滑动窗口。需要注意的是一个元素可能会落在不同的窗口中因此会在不同窗口中发生关联例如绿色流中的0元素。当滑动窗口中一个流的元素在另一个流中没有相对应的元素则不会输出该元素。 下面我们一起看一下如何实现上图所示的滑动窗口 Join package com.bigdata.day07;import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time;import java.text.ParseException; import java.text.SimpleDateFormat; import java.time.Duration; import java.util.Date;/*** 基本功能: 演示join的滑动窗口* program:FlinkDemo* author: 闫哥* create:2024-05-20 09:11:13**/ public class Demo02Join {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 将并行度设置为1,否则很难看到现象env.setParallelism(1);// 创建一个绿色的流DataStreamSourceString greenSource env.socketTextStream(localhost, 8899);// key,0,2021-03-26 12:09:00 将它变为三元组SingleOutputStreamOperatorTuple3String, Integer, String greenDataStream greenSource.map(new MapFunctionString, Tuple3String, Integer, String() {Overridepublic Tuple3String, Integer, String map(String value) throws Exception {String[] arr value.split(,);return new Tuple3(arr[0], Integer.valueOf(arr[1]), arr[2]);}}).assignTimestampsAndWatermarks(WatermarkStrategy.Tuple3String, Integer, StringforBoundedOutOfOrderness(Duration.ofSeconds(3))// 为什么这个地方的代码比之前要长原因是以前获取的数据都是long类型并且都是毫秒值.withTimestampAssigner(new SerializableTimestampAssignerTuple3String, Integer, String() {Overridepublic long extractTimestamp(Tuple3String, Integer, String element, long recordTimestamp) {// 指定你的数据中哪一个是时间戳并且时间戳必须是long类型必须是毫秒为单位的。String time element.f2; //2021-03-26 12:09:00SimpleDateFormat sdf new SimpleDateFormat(yyyy-MM-dd HH:mm:ss);try {Date date sdf.parse(time);return date.getTime();} catch (ParseException e) {throw new RuntimeException(e);}}}));// 创建一个橘色的流DataStreamSourceString orangeSource env.socketTextStream(localhost, 9988);// key,0,2021-03-26 12:09:00 将它变为三元组SingleOutputStreamOperatorTuple3String, Integer, String orangeDataStream orangeSource.map(new MapFunctionString, Tuple3String, Integer, String() {Overridepublic Tuple3String, Integer, String map(String value) throws Exception {String[] arr value.split(,);return new Tuple3(arr[0], Integer.valueOf(arr[1]), arr[2]);}}).assignTimestampsAndWatermarks(WatermarkStrategy.Tuple3String, Integer, StringforBoundedOutOfOrderness(Duration.ofSeconds(3))// 为什么这个地方的代码比之前要长原因是以前获取的数据都是long类型并且都是毫秒值.withTimestampAssigner(new SerializableTimestampAssignerTuple3String, Integer, String() {Overridepublic long extractTimestamp(Tuple3String, Integer, String element, long recordTimestamp) {// 指定你的数据中哪一个是时间戳并且时间戳必须是long类型必须是毫秒为单位的。String time element.f2; //2021-03-26 12:09:00SimpleDateFormat sdf new SimpleDateFormat(yyyy-MM-dd HH:mm:ss);try {Date date sdf.parse(time);return date.getTime();} catch (ParseException e) {throw new RuntimeException(e);}}}));//2. source-加载数据//3. transformation-数据处理转换DataStreamTuple3String, Integer, Integer resultStream greenDataStream.join(orangeDataStream).where(tuple3 - tuple3.f0).equalTo(tuple3 - tuple3.f0).window(SlidingEventTimeWindows.of(Time.seconds(5),Time.seconds(1))).apply(new JoinFunctionTuple3String, Integer, String, Tuple3String, Integer, String, Tuple3String, Integer, Integer() {Overridepublic Tuple3String, Integer, Integer join(Tuple3String, Integer, String first, Tuple3String, Integer, String second) throws Exception {return Tuple3.of(first.f0, first.f1, second.f1);}});//4. sink-数据输出greenDataStream.print(绿色的流);orangeDataStream.print(橘色的流);resultStream.print(最终的结果);//5. execute-执行env.execute();} } 假设输入流为 格式两条流输入元素如下所示 绿色流 key,0,2021-03-26 12:09:00 key,3,2021-03-26 12:09:03 key,4,2021-03-26 12:09:04 key,9,2021-03-26 12:09:09橘色流 key,0,2021-03-26 12:09:00 key,1,2021-03-26 12:09:01 key,2,2021-03-26 12:09:02 key,3,2021-03-26 12:09:03 key,4,2021-03-26 12:09:04 key,9,2021-03-26 12:09:09 2. CoGroup CoGroup 算子是将两条数据流按照 Key 进行分组然后将相同 Key 的数据进行处理。要实现 CoGroup 功能需要为两个输入流分别指定 KeySelector 和 WindowAssigner。它的调用方式类似于 Join 算子但是 CoGroupFunction 比 JoinFunction 更加灵活可以按照用户指定的逻辑匹配左流或者右流的数据基于此我们可以实现内连接(InnerJoin)、左连接(LeftJoin)以及右连接(RightJoin)。 目前这些分组中的数据是在内存中保存的因此需要确保保存的数据量不能太大否则JVM 可能会崩溃。 CoGroup 通用用法如下 stream.coGroup(otherStream).where(KeySelector).equalTo(KeySelector).window(WindowAssigner).apply(CoGroupFunction); 下面我们看一下如何使用 CoGroup 算子实现内连接(InnerJoin)、左连接(LeftJoin)以及右连接(RightJoin)。 最大的优势是可以实现内连接左连接右连接但是缺点是内存压力大而上面的join只能实现内连接。 CoGroup 从写法上是coGroup 和 join的区别而且apply 里面的函数也是不一样的一定要注意观察。 2.1 InnerJoin 下面我们看一下如何使用 CoGroup 实现内连接 如上图所示我们定义了一个大小为 2 秒的滚动窗口。InnerJoin 只有在两个流对应窗口中都存在元素时才会输出。 我们以滚动窗口为例来实现 InnerJoin package com.bigdata.day07;import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.datastream.CoGroupedStreams; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector;import java.text.ParseException; import java.text.SimpleDateFormat; import java.time.Duration; import java.util.Arrays; import java.util.Date;/*** 基本功能:* program:FlinkDemo* author: 闫哥* create:2023-11-27 09:31:57**/ public class _ShuangLiuCoGroupDemo {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.setParallelism(1);//2. source-加载数据 key,0,2021-03-26 12:09:00DataStreamTuple3String, Integer, String greenStream env.socketTextStream(localhost, 8888).map(new MapFunctionString, Tuple3String, Integer, String() {Overridepublic Tuple3String, Integer, String map(String line) throws Exception {String[] arr line.split(,);System.out.println(绿色 Arrays.toString(arr));return Tuple3.of(arr[0], Integer.valueOf(arr[1]), arr[2]);}})// 因为用到了EventTime 所以势必用到水印否则报错.assignTimestampsAndWatermarks(WatermarkStrategy.Tuple3String, Integer, StringforBoundedOutOfOrderness(Duration.ofSeconds(1)).withTimestampAssigner(new SerializableTimestampAssignerTuple3String, Integer, String() {Overridepublic long extractTimestamp(Tuple3String, Integer, String element, long recordTimestamp) {Long timeStamp 0L;SimpleDateFormat simpleDateFormat new SimpleDateFormat(yyyy-MM-dd HH:mm:ss);Date date null;try {date simpleDateFormat.parse(element.f2);} catch (ParseException e) {throw new RuntimeException(e);}timeStamp date.getTime();System.out.println(绿色的时间timeStamp);System.out.println(element.f0);return timeStamp;}}));;// 以后这个9999少用因为kafka占用这个端口 key,0,2021-03-26 12:09:00DataStreamTuple3String, Integer, String orangeStream env.socketTextStream(localhost, 7777).map(new MapFunctionString, Tuple3String,Integer,String() {Overridepublic Tuple3String, Integer, String map(String line) throws Exception {String[] arr line.split(,);System.out.println(橘色 Arrays.toString(arr));return Tuple3.of(arr[0],Integer.valueOf(arr[1]),arr[2]);}})// 因为用到了EventTime 所以势必用到水印否则报错.assignTimestampsAndWatermarks(WatermarkStrategy.Tuple3String, Integer, StringforBoundedOutOfOrderness(Duration.ofSeconds(1)).withTimestampAssigner(new SerializableTimestampAssignerTuple3String, Integer, String() {Overridepublic long extractTimestamp(Tuple3String, Integer, String element, long recordTimestamp) {Long timeStamp 0L;SimpleDateFormat simpleDateFormat new SimpleDateFormat(yyyy-MM-dd HH:mm:ss);Date date null;try {date simpleDateFormat.parse(element.f2);} catch (ParseException e) {throw new RuntimeException(e);}timeStamp date.getTime();System.out.println(橘色的时间timeStamp);return timeStamp;}}));//3. transformation-数据处理转换CoGroupedStreamsTuple3String, Integer, String, Tuple3String, Integer, String coGroup greenStream.coGroup(orangeStream);coGroup.where(tup3 - tup3.f0).equalTo(tup3 - tup3.f0).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new CoGroupFunctionTuple3String, Integer, String, Tuple3String, Integer, String, String() {Overridepublic void coGroup(IterableTuple3String, Integer, String i1, IterableTuple3String, Integer, String i2, CollectorString collector) throws Exception {// 凭借这两个迭代器实现内连接左右连接// 内连接 外面这个循环和里面的循环必须都有数据才会进行输出典型的内连接for (Tuple3String, Integer, String t1 : i1) {for (Tuple3String, Integer, String t2 : i2) {collector.collect(keyt1.f0,t1.valuet1.f1,t2.valuet2.f1);}}}}).print();//5. execute-执行env.execute();}} 如上代码所示我们实现了 CoGroupFunction 接口重写 coGroup 方法。一个流中有相同 Key 并且位于同一窗口的元素都会保存在同一个迭代器(Iterable)本示例中绿色流为 greenIterable橘色流为 orangeIterable如果要实现 InnerJoin 只需要两个迭代器中的元素两两组合即可。两条流输入元素如下所示 绿色流 key,0,2021-03-26 12:09:00 key,1,2021-03-26 12:09:01 key,2,2021-03-26 12:09:02 key,4,2021-03-26 12:09:04 key,5,2021-03-26 12:09:05 key,8,2021-03-26 12:09:08 key,9,2021-03-26 12:09:09 key,11,2021-03-26 12:09:11橘色流 key,0,2021-03-26 12:09:00 key,1,2021-03-26 12:09:01 key,2,2021-03-26 12:09:02 key,3,2021-03-26 12:09:03 key,4,2021-03-26 12:09:04 key,6,2021-03-26 12:09:06 key,7,2021-03-26 12:09:07 key,11,2021-03-26 12:09:11 2.2 LeftJoin 下面我们看一下如何使用 CoGroup 实现左连接 如上图所示我们定义了一个大小为 2 秒的滚动窗口。LeftJoin 只要绿色流窗口中有元素时就会输出。即使在橘色流对应窗口中没有相对应的元素。 我们以滚动窗口为例来实现 LeftJoin package com.bigdata.day07;import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.datastream.CoGroupedStreams; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector;import java.text.ParseException; import java.text.SimpleDateFormat; import java.time.Duration; import java.util.Arrays; import java.util.Date;/*** 基本功能:* program:FlinkDemo* author: 闫哥* create:2023-11-27 09:31:57**/ public class _ShuangLiuCoGroupLeftDemo {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.setParallelism(1);//2. source-加载数据 key,0,2021-03-26 12:09:00DataStreamTuple3String, Integer, String greenStream env.socketTextStream(localhost, 8888).map(new MapFunctionString, Tuple3String, Integer, String() {Overridepublic Tuple3String, Integer, String map(String line) throws Exception {String[] arr line.split(,);System.out.println(绿色 Arrays.toString(arr));return Tuple3.of(arr[0], Integer.valueOf(arr[1]), arr[2]);}})// 因为用到了EventTime 所以势必用到水印否则报错.assignTimestampsAndWatermarks(WatermarkStrategy.Tuple3String, Integer, StringforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssignerTuple3String, Integer, String() {Overridepublic long extractTimestamp(Tuple3String, Integer, String element, long recordTimestamp) {Long timeStamp 0L;SimpleDateFormat simpleDateFormat new SimpleDateFormat(yyyy-MM-dd HH:mm:ss);Date date null;try {date simpleDateFormat.parse(element.f2);} catch (ParseException e) {throw new RuntimeException(e);}timeStamp date.getTime();System.out.println(绿色的时间timeStamp);System.out.println(element.f0);return timeStamp;}}));;// 以后这个9999少用因为kafka占用这个端口 key,0,2021-03-26 12:09:00DataStreamTuple3String, Integer, String orangeStream env.socketTextStream(localhost, 7777).map(new MapFunctionString, Tuple3String,Integer,String() {Overridepublic Tuple3String, Integer, String map(String line) throws Exception {String[] arr line.split(,);System.out.println(橘色 Arrays.toString(arr));return Tuple3.of(arr[0],Integer.valueOf(arr[1]),arr[2]);}})// 因为用到了EventTime 所以势必用到水印否则报错.assignTimestampsAndWatermarks(WatermarkStrategy.Tuple3String, Integer, StringforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssignerTuple3String, Integer, String() {Overridepublic long extractTimestamp(Tuple3String, Integer, String element, long recordTimestamp) {Long timeStamp 0L;SimpleDateFormat simpleDateFormat new SimpleDateFormat(yyyy-MM-dd HH:mm:ss);Date date null;try {date simpleDateFormat.parse(element.f2);} catch (ParseException e) {throw new RuntimeException(e);}timeStamp date.getTime();System.out.println(橘色的时间timeStamp);return timeStamp;}}));//3. transformation-数据处理转换CoGroupedStreamsTuple3String, Integer, String, Tuple3String, Integer, String coGroup greenStream.coGroup(orangeStream);coGroup.where(tup3 - tup3.f0).equalTo(tup3 - tup3.f0).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new CoGroupFunctionTuple3String, Integer, String, Tuple3String, Integer, String, String() {Overridepublic void coGroup(IterableTuple3String, Integer, String i1, IterableTuple3String, Integer, String i2, CollectorString collector) throws Exception {// 凭借这两个迭代器实现内连接左右连接// 内连接for (Tuple3String, Integer, String t1 : i1) {boolean noEelement true;for (Tuple3String, Integer, String t2 : i2) {noEelement false;collector.collect(keyt1.f0,t1.valuet1.f1,t2.valuet2.f1);}if(noEelement){collector.collect(keyt1.f0,t1.valuet1.f1,t2.valuenull);}}}}).print();//5. execute-执行env.execute();}} 如上代码所示我们实现了 CoGroupFunction 接口重写 coGroup 方法。一个流中有相同 Key 并且位于同一窗口的元素都会保存在同一个迭代器(Iterable)本示例中绿色流为 green Iterable橘色流为 orange Iterable如果要实现 LeftJoin 需要保证 orange Iterable 中没有元素green Iterable 中的元素也能输出。因此我们定义了一个 noElements 变量来判断 orange Iterable 是否有元素如果 orange Iterable 中没有元素单独输出 greenIterable 中的元素即可。Join 效果如下所示 2.3 RightJoin 下面我们看一下如何使用 CoGroup 实现右连接 如上图所示我们定义了一个大小为 2 秒的滚动窗口。LeftJoin 只要橘色流窗口中有元素时就会输出。即使在绿色流对应窗口中没有相对应的元素。 我们以滚动窗口为例来实现 RightJoin // Join流 CoGroupedStreams coGroupStream greenStream.coGroup(orangeStream); DataStreamString result coGroupStream // 绿色流 .where(new KeySelectorTuple3String, String, String, String() {Overridepublic String getKey(Tuple3String, String, String tuple3) throws Exception {return tuple3.f0;} }) // 橘色流 .equalTo(new KeySelectorTuple3String, String, String, String() {Overridepublic String getKey(Tuple3String, String, String tuple3) throws Exception {return tuple3.f0;} }) // 滚动窗口 .window(TumblingEventTimeWindows.of(Time.seconds(2))) .apply(new RightJoinFunction());// 右连接 private static class RightJoinFunction implements CoGroupFunctionTuple3String, String, String, Tuple3String, String, String, String {Overridepublic void coGroup(IterableTuple3String, String, String greenIterable, IterableTuple3String, String, String orangeIterable, CollectorString collector) throws Exception {for (Tuple3String, String, String orangeTuple : orangeIterable) {boolean noElements true;for (Tuple3String, String, String greenTuple : greenIterable) {noElements false;LOG.info([Join流] Key : {}, Value: {}, EventTime: {},greenTuple.f0, greenTuple.f1 , orangeTuple.f1, greenTuple.f2 , orangeTuple.f2);collector.collect(greenTuple.f1 , orangeTuple.f1);}if (noElements) {LOG.info([Join流] Key : {}, Value: {}, EventTime: {},orangeTuple.f0, null, orangeTuple.f1, null, orangeTuple.f2);collector.collect(null, orangeTuple.f2);}}} } 如上代码所示我们实现了 CoGroupFunction 接口重写 coGroup 方法。一个流中有相同 Key 并且位于同一窗口的元素都会保存在同一个迭代器(Iterable)本示例中绿色流为 greenIterable橘色流为 orangeIterable如果要实现 RightJoin实现原理跟 LeftJoin 一样需要保证 greenIterable 中没有元素orangeIterable 中的元素也能输出。因此我们定义了一个 noElements 变量来判断 greenIterable 是否有元素如果 greenIterable 中没有元素单独输出 orangeIterable 中的元素即可。 3. Interval Join Interval Join 不同于 Join以及CoGroup 原因是 Join和CoGroup 他们是窗口Join 必须给定窗口的 Interval Join不需要给窗口。Interval Join 必须先分组才能使用。 Flink 中基于 DataStream 的 Join只能实现在同一个窗口的两个数据流进行 Join但是在实际中常常会存在数据乱序或者延时的情况导致两个流的数据进度不一致就会出现数据跨窗口的情况那么数据就无法在同一个窗口内 Join。Flink 基于 KeyedStream 提供的 Interval Join 机制可以对两个keyedStream 进行 Join, 按照相同的 key 在一个相对数据时间的时间段内进行 Join。按照指定字段以及右流相对左流偏移的时间区间进行关联 b.timestamp ∈ [a.timestamp lowerBound, a.timestamp upperBound] 或者 a.timestamp lowerBound b.timestamp a.timestamp upperBound 其中a和b分别是上图中绿色流和橘色流中的元素并且有相同的 key。只需要保证 lowerBound 永远小于等于 upperBound 即可均可以为正数或者负数。 从上面可以看出绿色流可以晚到 lowerBoundlowerBound为负的话时间也可以早到 upperBoundupperBound为正的话时间。也可以理解为橘色流中的每个元素可以和绿色流中指定区间的元素进行 Join。需要注意的是 Interval Join 当前仅支持事件时间EventTime public IntervalJoinedT1, T2, KEY between(Time lowerBound, Time upperBound) {if (timeBehaviour ! TimeBehaviour.EventTime) {throw new UnsupportedTimeCharacteristicException(Time-bounded stream joins are only supported in event time);} } 下面我们具体看看如何实现一个 Interval Join package com.bigdata.day07;import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector;import java.text.ParseException; import java.text.SimpleDateFormat; import java.time.Duration; import java.util.Arrays; import java.util.Date;/*** 基本功能:* program:FlinkDemo* author: 闫哥* create:2023-11-27 09:31:57**/ public class _ShuangLiuIntervalJoinDemo {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 并行度不为1 ,效果很难出来因为本地的并行度是16只有16个并行度都触发才能看到效果env.setParallelism(1);//2. source-加载数据 key,0,2021-03-26 12:09:00DataStreamTuple3String, Integer, String greenStream env.socketTextStream(localhost, 8888).map(new MapFunctionString, Tuple3String, Integer, String() {Overridepublic Tuple3String, Integer, String map(String line) throws Exception {String[] arr line.split(,);System.out.println(绿色 Arrays.toString(arr));return Tuple3.of(arr[0], Integer.valueOf(arr[1]), arr[2]);}})// 因为用到了EventTime 所以势必用到水印否则报错.assignTimestampsAndWatermarks(WatermarkStrategy.Tuple3String, Integer, StringforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssignerTuple3String, Integer, String() {Overridepublic long extractTimestamp(Tuple3String, Integer, String element, long recordTimestamp) {Long timeStamp 0L;SimpleDateFormat simpleDateFormat new SimpleDateFormat(yyyy-MM-dd HH:mm:ss);Date date null;try {date simpleDateFormat.parse(element.f2);} catch (ParseException e) {throw new RuntimeException(e);}timeStamp date.getTime();System.out.println(绿色的时间timeStamp);System.out.println(element.f0);return timeStamp;}}));;// 以后这个9999少用因为kafka占用这个端口 key,0,2021-03-26 12:09:00DataStreamTuple3String, Integer, String orangeStream env.socketTextStream(localhost, 7777).map(new MapFunctionString, Tuple3String,Integer,String() {Overridepublic Tuple3String, Integer, String map(String line) throws Exception {String[] arr line.split(,);System.out.println(橘色 Arrays.toString(arr));return Tuple3.of(arr[0],Integer.valueOf(arr[1]),arr[2]);}})// 因为用到了EventTime 所以势必用到水印否则报错.assignTimestampsAndWatermarks(WatermarkStrategy.Tuple3String, Integer, StringforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssignerTuple3String, Integer, String() {Overridepublic long extractTimestamp(Tuple3String, Integer, String element, long recordTimestamp) {Long timeStamp 0L;SimpleDateFormat simpleDateFormat new SimpleDateFormat(yyyy-MM-dd HH:mm:ss);Date date null;try {date simpleDateFormat.parse(element.f2);} catch (ParseException e) {throw new RuntimeException(e);}timeStamp date.getTime();System.out.println(橘色的时间timeStamp);return timeStamp;}}));//3. transformation-数据处理转换DataStream resultStream greenStream.keyBy(tup - tup.f0).intervalJoin(orangeStream.keyBy(tup - tup.f0)).between(Time.seconds(-2),Time.seconds(1)).process(new ProcessJoinFunctionTuple3String, Integer, String, Tuple3String, Integer, String, String() {Overridepublic void processElement(Tuple3String, Integer, String left, Tuple3String, Integer, String right, ProcessJoinFunctionTuple3String, Integer, String, Tuple3String, Integer, String, String.Context ctx, CollectorString out) throws Exception {out.collect(left中的key:left.f0,valueleft.f1,timeleft.f2,right中的key:right.f0,valueright.f1,timeright.f2);}});//4. sink-数据输出resultStream.print();//5. execute-执行env.execute();} } 需要注意的是 Interval Join 当前仅支持事件时间EventTime所以需要为流指定事件时间戳(毫秒值)。 两条流输入元素如下所示 绿色流 c,0,2021-03-23 12:09:00 c,1,2021-03-23 12:09:01 c,6,2021-03-23 12:09:06 c,7,2021-03-23 12:09:07橘色流 c,0,2021-03-23 12:09:00 c,2,2021-03-23 12:09:02 c,3,2021-03-23 12:09:03 c,4,2021-03-23 12:09:04 c,5,2021-03-23 12:09:05 c,7,2021-03-23 12:09:07 总结 join、coGroup 都是基于窗口的join, join算子本身只支持内连接 coGroup 只可以实现内连接左右连接 intervalJoin 是一个范围join, 一个数据流上的某一时刻数据可以join 另外一个数据流上的某一范围的数据。跟窗口无关。
文章转载自:
http://www.morning.litao4.cn.gov.cn.litao4.cn
http://www.morning.wsxxq.cn.gov.cn.wsxxq.cn
http://www.morning.rbkdg.cn.gov.cn.rbkdg.cn
http://www.morning.rgfx.cn.gov.cn.rgfx.cn
http://www.morning.mpwgs.cn.gov.cn.mpwgs.cn
http://www.morning.dpruuode.cn.gov.cn.dpruuode.cn
http://www.morning.csjps.cn.gov.cn.csjps.cn
http://www.morning.drggr.cn.gov.cn.drggr.cn
http://www.morning.rgnq.cn.gov.cn.rgnq.cn
http://www.morning.yubkwd.cn.gov.cn.yubkwd.cn
http://www.morning.mbmtz.cn.gov.cn.mbmtz.cn
http://www.morning.ptysj.cn.gov.cn.ptysj.cn
http://www.morning.qkkmd.cn.gov.cn.qkkmd.cn
http://www.morning.qzxb.cn.gov.cn.qzxb.cn
http://www.morning.qxljc.cn.gov.cn.qxljc.cn
http://www.morning.kchwr.cn.gov.cn.kchwr.cn
http://www.morning.qsy41.cn.gov.cn.qsy41.cn
http://www.morning.bzsqr.cn.gov.cn.bzsqr.cn
http://www.morning.hjjfp.cn.gov.cn.hjjfp.cn
http://www.morning.kdhrf.cn.gov.cn.kdhrf.cn
http://www.morning.qgjxt.cn.gov.cn.qgjxt.cn
http://www.morning.qtsks.cn.gov.cn.qtsks.cn
http://www.morning.hwhnx.cn.gov.cn.hwhnx.cn
http://www.morning.nqbs.cn.gov.cn.nqbs.cn
http://www.morning.czlzn.cn.gov.cn.czlzn.cn
http://www.morning.hcwlq.cn.gov.cn.hcwlq.cn
http://www.morning.lmfmd.cn.gov.cn.lmfmd.cn
http://www.morning.qrqg.cn.gov.cn.qrqg.cn
http://www.morning.rrgm.cn.gov.cn.rrgm.cn
http://www.morning.tyjp.cn.gov.cn.tyjp.cn
http://www.morning.rtbj.cn.gov.cn.rtbj.cn
http://www.morning.bfrsr.cn.gov.cn.bfrsr.cn
http://www.morning.mmqng.cn.gov.cn.mmqng.cn
http://www.morning.bwqcx.cn.gov.cn.bwqcx.cn
http://www.morning.nbpqx.cn.gov.cn.nbpqx.cn
http://www.morning.rfldz.cn.gov.cn.rfldz.cn
http://www.morning.xxrwp.cn.gov.cn.xxrwp.cn
http://www.morning.dwfxl.cn.gov.cn.dwfxl.cn
http://www.morning.xhpnp.cn.gov.cn.xhpnp.cn
http://www.morning.xxlz.cn.gov.cn.xxlz.cn
http://www.morning.xsszn.cn.gov.cn.xsszn.cn
http://www.morning.fncgw.cn.gov.cn.fncgw.cn
http://www.morning.rbbgh.cn.gov.cn.rbbgh.cn
http://www.morning.lmcrc.cn.gov.cn.lmcrc.cn
http://www.morning.dodoking.cn.gov.cn.dodoking.cn
http://www.morning.ggxbyhk.cn.gov.cn.ggxbyhk.cn
http://www.morning.rdtq.cn.gov.cn.rdtq.cn
http://www.morning.fgsqz.cn.gov.cn.fgsqz.cn
http://www.morning.yhjlg.cn.gov.cn.yhjlg.cn
http://www.morning.pzlcd.cn.gov.cn.pzlcd.cn
http://www.morning.kryn.cn.gov.cn.kryn.cn
http://www.morning.dlwzm.cn.gov.cn.dlwzm.cn
http://www.morning.myzfz.com.gov.cn.myzfz.com
http://www.morning.rxlck.cn.gov.cn.rxlck.cn
http://www.morning.tsgxz.cn.gov.cn.tsgxz.cn
http://www.morning.qnxkm.cn.gov.cn.qnxkm.cn
http://www.morning.rfxw.cn.gov.cn.rfxw.cn
http://www.morning.mhfbf.cn.gov.cn.mhfbf.cn
http://www.morning.cfnht.cn.gov.cn.cfnht.cn
http://www.morning.wwxg.cn.gov.cn.wwxg.cn
http://www.morning.mmynk.cn.gov.cn.mmynk.cn
http://www.morning.pqyms.cn.gov.cn.pqyms.cn
http://www.morning.zffps.cn.gov.cn.zffps.cn
http://www.morning.zrgdd.cn.gov.cn.zrgdd.cn
http://www.morning.lyjwb.cn.gov.cn.lyjwb.cn
http://www.morning.ljngm.cn.gov.cn.ljngm.cn
http://www.morning.dbcw.cn.gov.cn.dbcw.cn
http://www.morning.sgmis.com.gov.cn.sgmis.com
http://www.morning.wmfr.cn.gov.cn.wmfr.cn
http://www.morning.fbhmn.cn.gov.cn.fbhmn.cn
http://www.morning.cnfjs.cn.gov.cn.cnfjs.cn
http://www.morning.djxnw.cn.gov.cn.djxnw.cn
http://www.morning.bhwz.cn.gov.cn.bhwz.cn
http://www.morning.hxfrd.cn.gov.cn.hxfrd.cn
http://www.morning.fssmx.com.gov.cn.fssmx.com
http://www.morning.jpwmk.cn.gov.cn.jpwmk.cn
http://www.morning.jcyrs.cn.gov.cn.jcyrs.cn
http://www.morning.dzqr.cn.gov.cn.dzqr.cn
http://www.morning.wpcfh.cn.gov.cn.wpcfh.cn
http://www.morning.rmpkn.cn.gov.cn.rmpkn.cn
http://www.tj-hxxt.cn/news/258428.html

相关文章:

  • 网站 流程陕西高端品牌网站建设价格
  • 怎么创造一个网站大秦wordpress付费阅读
  • 领创科技网站开发万素网
  • 做网站开发注册工商户可以么seo友情链接是什么
  • 新东家网站建设温州网站改版
  • 响应式网站示例温州设计集团网站建设
  • 专业企业网站搭建服务毕业设计(论文)-基于cms的校园网站建设
  • 淄博网站建设选哪家免费图片编辑工具
  • 黄山景区的网站做的怎么样推荐常州模板网站建设
  • 济南网站建设咨询电话正规网站建设公司一般要多少钱
  • 静安做网站wordpress跳转到子页面
  • 长春站建筑风格数据中心idc机房建设
  • wordpress 2017 主题广州seo公司排行
  • 网站如何做京东联盟网站制作方案书
  • intitle 律师网站建设的重要性临淄信息港最新招聘
  • 网站后台管理界面html网站建设具体项目及价格
  • 小语种网站网站知识介绍
  • 摄影网站大全广东省住房及建设厅官方网站
  • 红河做网站的公司医疗网站建设基本流程
  • 闽清县城乡建设局网站移动网站开发书籍
  • 商城网站建设设计介绍万能短视频素材库
  • 网站目录扫描陕西个人证书查询网
  • 济南行业网站开发网页制作基础教程免费
  • 山西建设局网站汕头免费网站制作
  • 深圳傻瓜式网站建设公司好吗做网站前端用什么技术好
  • 站点推广名词解释html做的网站怎么弄
  • 建设旅游网站数据库设计怎么做网站教程html文本文档
  • 全屏网站大小seo网站外链专发
  • 网站建设公司地址在哪华为网站的建设建议书
  • 商城网站制作的教程用wordpress建的大部