阜阳建设工程质量监督局网站,wordpress个性主题,网页版微信登陆,汕头网站建设推广价格无论是基本的简单转换和聚合#xff0c;还是基于窗口的计算#xff0c;我们都是针对一条流上的数据进行处理的。而在实际应用中#xff0c;可能需要将不同来源的数据连接合并在一起处理#xff0c;也有可能需要将一条流拆分开#xff0c;所以经常会有对多条流进行处理的场… 无论是基本的简单转换和聚合还是基于窗口的计算我们都是针对一条流上的数据进行处理的。而在实际应用中可能需要将不同来源的数据连接合并在一起处理也有可能需要将一条流拆分开所以经常会有对多条流进行处理的场景 简单划分两大类
分流——把一条数据流拆分成完全独立的两条或多条一般通过侧输出流来实现合流——多条数据流合并为一条数据流如unionconnectjoincoGroup
9.1 分流
9.1.1 简单实现 其实根据条件筛选数据的需求本身非常容易实现只要针对同一条流多次独立调用.filter()方法进行筛选就可以得到拆分之后的流 例子根据用户进行划分
public class SplitStreamByFilter {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSourceEvent stream env.addSource(new ClickSource());SingleOutputStreamOperatorEvent maryStream stream.filter(new FilterFunctionEvent() {Overridepublic boolean filter(Event value) throws Exception {return value.getUser().equals(Mary);}});SingleOutputStreamOperatorEvent bobStream stream.filter(new FilterFunctionEvent() {Overridepublic boolean filter(Event value) throws Exception {return value.getUser().equals(Bob);}});SingleOutputStreamOperatorEvent otherStream stream.filter(new FilterFunctionEvent() {Overridepublic boolean filter(Event value) throws Exception {return !value.getUser().equals(Mary) !value.getUser().equals(Bob);}});maryStream.print(Mary);bobStream.print(Bob);otherStream.print(other);env.execute();}
}
9.1.2 使用侧输出流
public class SplitStreamByOutputTag {private static OutputTagTuple3String, String, Long MaryTag new OutputTagTuple3String, String, Long(Mary-pv) {};private static OutputTagTuple3String, String, Long BobTag new OutputTagTuple3String, String, Long(Bob-pv) {};public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSourceEvent stream env.addSource(new ClickSource());SingleOutputStreamOperatorEvent processStream stream.process(new ProcessFunctionEvent, Event() {Overridepublic void processElement(Event event, Context context, CollectorEvent collector) throws Exception {if (event.getUser().equals(Mary)) {context.output(MaryTag, new Tuple3(event.getUser(), event.getUrl(), event.getTimestamp()));} else if (event.getUser().equals(Bob)) {context.output(BobTag, new Tuple3(event.getUser(), event.getUrl(), event.getTimestamp()));} else {collector.collect(event);}}});processStream.getSideOutput(MaryTag).print(Mary);processStream.getSideOutput(BobTag).print(Bob);processStream.print(other);env.execute();}
}9.2 基本合流操作
9.2.1 联合(union) 将多条流合在一起 要求要联合的流的数据类型必须相同 返回的是DataStream
使用方法
stream1.union(stream2, stream3, ...)在事件时间语义下不同流的水位线快慢不同如果合并在一起水位线该以哪个为准呢? 答多流合并时的水位线以最小的那个为准和之前介绍的并行任务水位线的传递规则完全一致。 代码略9.2.2 连接connect 流的联合union虽然简单但是受限于数据类型不能改变。Flink提供了另一种合流操作——连接它允许不同数据类型的流进行合并 1、连接流 连接流是多条流形式上的“合并”虽然被放在了同一个流中但内部仍保持各自的数据形式不变彼此之间是相互独立的。 stream1.connect(stream2)返回的是ConnectedStreams
ConnectedStreams在调用.map()方法时传入的不再是一个简单的 MapFunction 而是一个 CoMapFunction
ConnectedStreams.map(new CoMapFunction())2、CoProcessFunction
ConnectedStreams在调用.process()时需要传入CoProcessFunction 对于连接流 ConnectedStreams 的处理操作,需要分别定义对两条流的处理转换 public abstract class CoProcessFunctionIN1, IN2, OUT extends AbstractRichFunction {...public abstract void processElement1(IN1 value, Context ctx, CollectorOUT out) public abstract void processElement2(IN2 value, Context ctx, CollectorOUT out) public void onTimer(long timestamp, OnTimerContext ctx, CollectorOUT out) public abstract class Context {...}...
}
例子略
代码略9.3 基于时间的合流——双流联结Join 对于两条流的合并很多情况我们并不是简单地将所有数据放在一起而是希望根据某个字段的值将它们联结起来“配对”去做处理。例如用传感器监控火情时我们需要将大量温度传感器和烟雾传感器采集到的信息按照传感器 ID 分组、再将两条流中数据合并起来如果同时超过设定阈值就要报警。 我们发现这种需求与关系型数据库中表的 join 操作非常相近。事实上Flink 中两条流的 connect 操作就可以通过 keyBy 指定键进行分组后合并实现了类似于 SQL 中的 join 操作另外 connect 支持处理函数可以使用自定义状态和 TimerService 灵活实现各种需求其实已经能够处理双流合并的大多数场景。 不过处理函数是底层接口所以尽管 connect 能做的事情多但在一些具体应用场景下还是显得太过抽象了。比如如果我们希望统计固定时间内两条流数据的匹配情况那就需要设置定时器、自定义触发逻辑来实现——其实这完全可以用窗口window来表示。为了更方便地实现基于时间的合流操作Flink 的 DataStrema API 提供了两种内置的 join 算子以及 coGroup 算子。本节我们就来做一个详细的讲解。 注SQL 中 join 一般会翻译为“连接”我们这里为了区分不同的算子一般的合流操作 connect 翻译为“连接”而把 join 翻译为“联结”。 9.3.1 窗口联结 如果我们希望将两条流的数据进行合并、且同样针对某段时间进行处理和统计又该怎么做呢 Flink 为这种场景专门提供了一个窗口联结window join算子可以定义时间窗口并将两条流中共享一个公共键key的数据放在窗口中进行配对处理 1、窗口联结的调用 窗口联结在代码中的实现首先需要调用 DataStream 的.join()方法来合并两条流得到一个 JoinedStreams 接着通过.where() 和.equalTo() 方法指定两条流中联结的 key 然后通过.window()开窗口并调用.apply()传入联结窗口函数进行处理计算 stream1.join(stream2).where(KeySelector).equalTo(KeySelector).window(WindowAssigner).apply(JoinFunction)上面代码中.where()的参数是键选择器KeySelector用来指定第一条流中的 key而.equalTo()传入的 KeySelector 则指定了第二条流中的 key。两者相同的元素如果在同一窗口中就可以匹配起来并通过一个“联结函数”JoinFunction进行处理了。 这里.window()传入的就是窗口分配器之前讲到的三种时间窗口都可以用在这里滚动窗口tumbling window、滑动窗口sliding window和会话窗口session window。 而后面调用.apply()可以看作实现了一个特殊的窗口函数。注意这里只能调用.apply()没有其他替代的方法。 传入的 JoinFunction 也是一个函数类接口使用时需要实现内部的.join()方法。这个方法有两个参数分别表示两条流中成对匹配的数据 public interface JoinFunctionIN1, IN2, OUT extends Function, Serializable { OUT join(IN1 first, IN2 second) throws Exception;
}这里需要注意JoinFunciton 并不是真正的“窗口函数”它只是定义了窗口函数在调用时对匹配数据的具体处理逻辑。 当然既然是窗口计算在.window()和.apply()之间也可以调用可选 API 去做一些自定义 比如用.trigger()定义触发器用.allowedLateness()定义允许延迟时间等等 2、窗口联结的处理流程 JoinFunction 中的两个参数分别代表了两条流中的匹配的数据。这里就会有一个问题 什么时候就会匹配好数据调用.join()方法呢接下来我们就来介绍一下窗口 join 的具体处理流程。 两条流的数据到来之后首先会按照 key 分组、进入对应的窗口中存储当到达窗口结束时间时算子会先统计出窗口内两条流的数据的所有组合也就是对两条流中的数据做一个笛卡尔积相当于表的交叉连接cross join然后进行遍历把每一对匹配的数据作为参数 (firstsecond)传入 JoinFunction 的.join()方法进行计算处理得到的结果直接输出如图 8-8 所示。所以窗口中每有一对数据成功联结匹配JoinFunction 的.join()方法就会被调用一次并输出一个结果。 3、窗口联结实例 在电商网站中往往需要统计用户不同行为之间的转化这就需要对不同的行为数据流 按照用户 ID 进行分组后再合并以分析它们之间的关联。如果这些是以固定时间周期比如1 小时来统计的那我们就可以使用窗口 join 来实现这样的需求 代码略9.3.2 间隔联结 在有些场景下我们要处理的时间间隔可能并不是固定的。比如在交易系统中需要实时地对每一笔交易进行核验保证两个账户转入转出数额相等也就是所谓的“实时对账”。两次转账的数据可能写入了不同的日志流它们的时间戳应该相差不大所以我们可以考虑只统计一段时间内是否有出账入账的数据匹配。这时显然不应该用滚动窗口或滑动窗口来处理——因为匹配的两个数据有可能刚好“卡在”窗口边缘两侧于是窗口内就都没有匹配了会话窗口虽然时间不固定但也明显不适合这个场景。 基于时间的窗口联结已经无能为力了。 为了应对这样的需求Flink 提供了一种叫作“间隔联结”interval join的合流操作。顾名思义间隔联结的思路就是针对一条流的每个数据开辟出其时间戳前后的一段时间间隔 看这期间是否有来自另一条流的数据匹配。 1、间隔联结的原理 间隔联结具体的定义方式是我们给定两个时间点分别叫作间隔的“上界”upperBound和“下界”lowerBound于是对于一条流不妨叫作 A中的任意一个数据元素 a就可以开辟一段时间间隔[a.timestamp lowerBound, a.timestamp upperBound],即以 a 的时间戳为中心下至下界点、上至上界点的一个闭区间我们就把这段时间作为可以匹配另一条流数据的“窗口”范围。所以对于另一条流不妨叫 B中的数据元素 b如果它的时间戳落在了这个区间范围内a 和 b 就可以成功配对进而进行计算输出结果。所以匹配的条件为 a.timestamp lowerBound b.timestamp a.timestamp upperBound 这里需要注意做间隔联结的两条流 A 和 B也必须基于相同的 key下界 lowerBound 应该小于等于上界 upperBound两者都可正可负间隔联结目前只支持事件时间语义。 下方的流 A 去间隔联结上方的流 B所以基于 A 的每个数据元素都可以开辟一个间隔区间。我们这里设置下界为-2 毫秒上界为 1 毫秒。于是对于时间戳为 2 的 A 中元素它的可匹配区间就是[0, 3],流 B 中有时间戳为 0、1 的两个元素落在这个范围内所以就可以得到匹配数据对(2, 0)和(2, 1)。同样地A 中时间戳为 3 的元素可匹配区间为[1, 4]B 中只有时间戳为 1 的一个数据可以匹配于是得到匹配数据对(3, 1)。 所以我们可以看到间隔联结同样是一种内连接inner join。与窗口联结不同的是interval join 做匹配的时间段是基于流中数据的所以并不确定而且流 B 中的数据可以不只在一个区间内被匹配。 2、间隔联结的调用
stream1
.keyBy(KeySelector)
.intervalJoin(stream2.keyBy(KeySelector))
.between(Time.milliseconds(-2), Time.milliseconds(1))
.process (new ProcessJoinFunction());3、间隔联结实例 在电商网站中某些用户行为往往会有短时间内的强关联。我们这里举一个例子我们有两条流一条是下订单的流一条是浏览数据的流。我们可以针对同一个用户来做这样一个联结。也就是使用一个用户的下订单的事件和这个用户的最近十分钟的浏览数据进行一个联结查询 代码略9.3.3 窗口同组联结 除窗口联结和间隔联结之外Flink 还提供了一个“窗口同组联结”window coGroup操作。它的用法跟 window join 非常类似也是将两条流合并之后开窗处理匹配的元素调用时只需要将.join()换为.coGroup()就可以了. 内部的.coGroup()方法有些类似于 FlatJoinFunction 中.join()的形式同样有三个参数 分别代表两条流中的数据以及用于输出的收集器Collector。不同的是这里的前两个参数不再是单独的每一组“配对”数据了而是传入了可遍历的数据集合。也就是说现在不会再去计算窗口中两条流数据集的笛卡尔积而是直接把收集到的所有数据一次性传入至于要怎样配对完全是自定义的。这样.coGroup()方法只会被调用一次而且即使一条流的数据没有任何另一条流的数据匹配也可以出现在集合中、当然也可以定义输出结果了。 所以能够看出coGroup 操作比窗口的 join 更加通用不仅可以实现类似 SQL 中的“内连接”inner join也可以实现左外连接left outer join、右外连接right outer join和全外连接full outer join。事实上窗口 join 的底层也是通过 coGroup 来实现的 代码略