活动策划网站源码,wordpress界面菜单怎么弄,聚名网站,青岛做网站哪家公司好如何保证Flink双流Join准确性和及时性、除了窗口join还存在哪些实现方式、究竟如何回答才能完全打动面试官呢。。你将在文中找到答案。 1 引子
1.1 数据库SQL中的JOIN
我们先来看看数据库SQL中的JOIN操作。如下所示的订单查询SQL#xff0c;通过将订单表的id和订单详情表ord…如何保证Flink双流Join准确性和及时性、除了窗口join还存在哪些实现方式、究竟如何回答才能完全打动面试官呢。。你将在文中找到答案。 1 引子
1.1 数据库SQL中的JOIN
我们先来看看数据库SQL中的JOIN操作。如下所示的订单查询SQL通过将订单表的id和订单详情表order_id关联获取所有订单下的商品信息。
select a.id as 订单id,a.order_date as 下单时间,a.order_amount as 订单金额,b.order_detail_id as 订单详情id,b.goods_name as 商品名称,b.goods_price as 商品价格,b.order_id as 订单id
from dwd_order_info_pfd a
right join dwd_order_detail_pfd b
on a.id b.order_id这是一段很简单的SQL代码就不详细展开叙述了。此处主要引出SQL中的JOIN类型这里用到的是 right join , 即右连接。
left join: 保留左表全部数据和右表关联数据右表非关联数据置NULLright join: 保留右表全部数据和左表关联数据左表非关联数据置NULLinner join: 保留左表关联数据和右边关联数据cross join: 保留左表和右表数据笛卡尔积 基于关联键值逐行关联匹配过滤表数据并生成最终结果提供给下游数据分析使用。 就此打住关于数据库SQL中的JOIN原理不再多赘述感兴趣的话大家可自行研究下面我们将目光转移到大数据领域看看吧。
1.2 离线场景下的JOIN
假设存在这样一个场景: 已知Mysql数据库中订单表和订单明细表且满足一对多的关系统计T-1天所有订单的商品分布详情。 聪明的大家肯定已经给出了答案没错~就是上面的SQL:
select a.*, b.*
from dwd_order_info_pfd a
right join dwd_order_detail_pfd b
on a.id b.order_id现在修改下条件已知订单表和订单明细表均为亿级别数据求相同场景下的分析结果。
咋办此时关系型数据库貌似不大合适了~开始放大招使用大数据计算引擎来解决。
考虑到T-1统计场景对时效性要求很低可以使用Hive SQL来处理底层跑Mapreduce任务。如果想提高运行速度换成Flink或Spark计算引擎使用内存计算。
至于查询SQL和上面一样并将其封装成一个定时调度任务, 等系统调度运行。如果结果不正确的话由于数据源和数据静态不变大不了重跑看起来感觉皆大欢喜~
可是好景不长产品冤家此时又给了你一个无法拒绝的需求我要实时统计
2 实时场景下的JOIN
还是上面的场景此时数据源换成了实时订单流和实时订单明细流比如Kafka的两个topic要求实时统计每分钟内所有订单下的商品分布详情。 现在情况貌似变得复杂了起来,简单分析下:
数据源。实时数据流和静态流不同数据是实时流入的且动态变化需要计算程序支持实时处理机制。关联性。前面提到静态数据执行多次join操作左表和右表能关联的数据是很恒定的而实时数据流(左右表)如果进入时机不一致原本可以关联的数据会关联不上或者发生错误。延迟性。实时统计提供分钟甚至秒级别响应结果。
由于流数据join的特殊性在满足实时处理机制、低延迟、强关联性的前提下看来需要制定完善的数据方案才能实现真正的流数据JOIN。
2.1 方案思路
我们知道订单数据和订单明细数据是一对多的关系即一条订单数据对应着多条商品明细数据毕竟买一件商品也是那么多邮费不如打包团购。。而一条明细数据仅对应一条订单数据。
这样双流join策略可以考虑如下思路:
当数据流为订单数据时。无条件保留无论当前是否关联到明细数据均留作后续join使用。当数据流为明细数据时。在关联到其订单数据后就可以say goodbye了否则暂时保留等待下一次与订单数据的邂逅。完成所有处于同一时段内的订单数据和订单明细数据join, 清空存储状态 实际生产场景中需要考虑更多的复杂情况包括JOIN过程的数据丢失等异常情况的处理此处仅示意。 好了看起来我们已经有了一个马马虎虎的实时流JOIN方案雏形。
貌似可以准备动手大干一场了~ 别着急有人已经帮我们偷偷的实现了Apache Flink
3 Flink的双流JOIN Apache Flink 是一个框架和分布式处理引擎用于对无界和有界数据流进行有状态计算。Flink 被设计在所有常见的集群环境中运行以内存执行速度和任意规模来执行计算。 ——来自Flink官网定义 这里我们只需要知道Flink是一个实时计算引擎就行了主要关注其如何实现双流JOIN。
3.1 内部运行机制
内存计算Flink任务优先在内存中计算内存不够时保存到访问高效的磁盘上提供秒级延迟响应。状态强一致性Flink使用一致性快照保存状态并定期检查本地状态到持久存储来保证状态一致性。分布式执行Flink应用程序可以划分为无数个并行任务在集群中执行几乎无限量使用CPU、主内存、磁盘和网络IO。内置高级编程模型Flink编程模型抽象为SQL、Table、DataStream|DataSet API、Process四层并封装成丰富功能的算子其中就包含JOIN类型的算子。
仔细看看我们前面章节讨论的实时流JOIN方案的前提是否都满足了呢
实时处理机制: Flink天生即实时计算引擎低延迟: Flink内存计算秒级延迟强关联性: Flink状态一致性和join类算子
不由感叹, 这个Flink果然强啊~
保持好奇心我们去瞅瞅Flink双流join的真正奥义
3.2 JOIN实现机制
Flink双流JOIN主要分为两大类。一类是基于原生State的Connect算子操作另一类是基于窗口的JOIN操作。其中基于窗口的JOIN可细分为window join和interval join两种。
实现原理底层原理依赖Flink的State状态存储通过将数据存储到State中进行关联join, 最终输出结果。 恍然大悟, Flink原来是通过State状态来缓存等待join的实时流。
4 基于Window Join的双流JOIN实现机制
顾名思义此类方式利用Flink的窗口机制实现双流join。通俗理解将两条实时流中元素分配到同一个时间窗口内完成Join。
底层原理: 两条实时流数据缓存在Window State中当窗口触发计算时执行join操作。
4.1 join算子
先看看Window join实现方式之一的join算子。这里涉及到Flink中的窗口(window)概念因此Window Join按照窗口类型区分的话某种程度来说可以细分出3种
Tumbling Window Join (滚动窗口)Sliding Window Join (滑动窗口)Session Widnow Join(会话窗口) 两条流数据按照关联主键在滚动、滑动、会话窗口内进行inner join, 底层基于State存储并支持处理时间和事件时间两种时间特征看下源码: 源码核心总结windows窗口 state存储 双层for循环执行join() 现在让我们把时间轴往回拉一点点在实时场景JOIN那里我们收到了这样的需求统计每分钟内所有订单下的商品明细分布。
OK, 使用join算子小试牛刀一下。我们定义60秒的滚动窗口将订单流和订单明细流通过order_id关联得到如下的程序
val env ...
// kafka 订单流
val orderStream ...
// kafka 订单明细流
val orderDetailStream ...orderStream.join(orderDetailStream).where(r r._1) //订单id.equalTo(r r._2) //订单id.window(TumblingProcessTimeWindows.of(Time.seconds(60))).apply {(r1, r2) r1 : r2}.print()
整个代码其实很简单概要总结下:
定义两条输入实时流A、BA流调用join(b流)算子关联关系定义: where为A流关联键equalTo为B流关联键都是订单id定义window窗口(60s间隔)apply方法定义逻辑输出
这样只要程序稳定运行就能够持续不断的计算每分钟内订单分布详情貌似解决问题了奥~
还是别高兴太早别忘了此时的join类型是inner join。复习一下知识inner join指的是仅保留两条流关联上的数据。
这样双流中没关联上的数据岂不是都丢掉了别担心Flink还提供了另一个window join操作: coGroup算子。
4.2 coGroup算子
coGroup算子也是基于window窗口机制不过coGroup算子比Join算子更加灵活可以按照用户指定的逻辑匹配左流或右流数据并输出。
换句话说我们通过自己指定双流的输出来达到left join和right join的目的。
现在来看看在相同场景下coGroup算子是如何实现left join:
#这里看看java的写法
orderDetailStream.coGroup(orderStream).where(r - r.getOrderId()).equalTo(r - r.getOrderId()).window(TumblingProcessingTimeWindows.of(Time.seconds(60))).apply(new CoGroupFunctionOrderDetail, Order, Tuple2String, Long() {Overridepublic void coGroup(IterableOrderDetail orderDetailRecords, IterableOrder orderRecords, CollectorTuple2String, Long collector) {for (OrderDetail orderDetaill : orderDetailRecords) {boolean flag false;for (Order orderRecord : orderRecords) {// 右流中有对应的记录collector.collect(new Tuple2(orderDetailRecords.getGoods_name(), orderDetailRecords.getGoods_price()));flag true;}if (!flag) {// 右流中没有对应的记录collector.collect(new Tuple2(orderDetailRecords.getGoods_name(), null));}}}}).print();这里需要说明几点:
join算子替换为coGroup算子两条流依然需要在一个window中且定义好关联条件apply方法中自定义判断此处对右值进行判断如果有值则进行连接输出,否则右边置为NULL。
可以这么说现在我们已经彻底搞定了窗口双流JOIN。
只要你给我提供具体的窗口大小我就能通过join或coGroup算子鼓捣出各种花样join而且使用起来特别简单。
但是假如此时我们亲爱的产品又提出了一个小小条件 大促高峰期商品数据某时段会写入不及时时间可能比订单早也可能比订单晚同样计算每分钟内的订单商品分布详情没问题吧~ 当然有问题两条流如果步调不一致还用窗口来控制能join的上才怪了~ 很容易等不到join流窗口就自动关闭了。
还好我知道Flink提供了Interval join机制。
5 基于Interval Join的双流JOIN实现机制
Interval Join根据右流相对左流偏移的时间区间(interval)作为关联窗口在偏移区间窗口中完成join操作。
有点不好理解我画个图看下: stream2.time ∈ (stream1.time low, stream1.time high) 满足数据流stream2在数据流stream1的 interval(low, high)偏移区间内关联join。interval越大关联上的数据就越多超出interval的数据不再关联。
实现原理interval join也是利用Flink的state存储数据不过此时存在state失效机制ttl触发数据清理操作。
这里再引出一个问题: state的ttl机制需要怎么设置不合理的ttl设置会不会撑爆内存 下面简单看下interval join的代码实现过程:
val env ...
// kafka 订单流
val orderStream ...
// kafka 订单明细流
val orderDetailStream ...orderStream.keyBy(_.1)// 调用intervalJoin关联.intervalJoin(orderDetailStream._2)// 设定时间上限和下限.between(Time.milliseconds(-30), Time.milliseconds(30)) .process(new ProcessWindowFunction())class ProcessWindowFunction extends ProcessJoinFunction...{override def processElement(...) {collector.collect((r1, r2) r1 : r2)}
}订单流在流入程序后等候(low,high)时间间隔内的订单明细流数据进行join, 否则继续处理下一个流。
从代码中我们发现interval join需要在两个KeyedStream之上操作即keyBy()并在between()方法中指定偏移区间的上下界。
需要注意的是interval join实现的也是inner join且目前只支持事件时间。
6 基于Connect的双流JOIN实现机制
前面在使用Window join或者Interval Join来实现双流join的时候我发现了其中的共性 无论哪种实现方式Flink内部都将join过程透明化在算子中封装了所有的实现细节。 无论哪种实现方式Flink内部都将join过程透明化在算子中封装了所有的实现细节。
可是这样会引来一个问题如果程序报错或者数据异常如何快速进行调优排查直接看源码吗不大现实。。
这里介绍基于Connect算子实现的双流JOIN方法我们可自己控制双流JOIN处理逻辑同时保持过程时效性和准确性。
6.1 Connect算子原理
对两个DataStream执行connect操作将其转化为ConnectedStreams, 生成的Streams可以调用不同方法在两个实时流上执行且双流之间可以共享状态。 图上我们可以看到两个数据流被connect之后只是被放在了同一个流中内部依然保持各自的数据和形式两个流相互独立。 [DataStream1, DataStream2] - ConnectedStreams[1,2] 这样我们可以在Connect算子底层的ConnectedStreams基础上编写代码自行实现双流JOIN的逻辑处理。
6.2 技术实现
1.调用connect算子,根据orderid进行分组并使用process算子分别对两条流进行处理。
orderStream.connect(orderDetailStream).keyBy(orderId, orderId).process(new orderProcessFunc());2.process方法内部进行状态编程, 初始化订单、订单明细和定时器的ValueState状态。
private ValueStateOrderEvent orderState;
private ValueStateTxEvent orderDetailState;
private ValueStateLong timeState;// 初始化状态Value
orderState getRuntimeContext().getState(new ValueStateDescriptorOrder(order-state,Order.class));
····3.为每个进入的数据流保存state状态并创建定时器。在时间窗口内另一个流到达时进行join并输出完成后删除定时器。
Override
public void processElement1(Order value, Context ctx, CollectorTuple2Order, OrderDetail out){if (orderDetailState.value() null){//明细数据未到先把订单数据放入状态orderState.update(value);//建立定时器60秒后触发Long ts (value.getEventTime()60)*1000L;ctx.timerService().registerEventTimeTimer(ts);timeState.update(ts);}else{//明细数据已到直接输出到主流out.collect(new Tuple2(value,orderDetailState.value()));//删除定时器ctx.timerService().deleteEventTimeTimer(timeState.value());//清空状态注意清空的是订单明细状态orderDetailState.clear();timeState.clear();}
}
...
Override
public void processElement2(){...
}4.未及时到达的数据流触发定时器输出到侧输出流左流先到而右流未到则输出左流反之输出右连流。
Override
public void onTimer(long timestamp, OnTimerContext ctx, CollectorTuple2Order, OrderDetail out) {// 实现左连接if (orderState.value() ! null){ctx.output(new OutputTagString(left-jo in) {}, orderState.value().getOrderId());// 实现右连接}else{ctx.output(new OutputTagString(right-jo in) {}, orderDetailState.value().getOrderId());}orderState.clear();orderDetailState.clear();timeState.clear();
}总体思想基于数据时间实现订单数据及订单明细数据的关联超时或者缺失则由侧输出流输出。 在connect中针对订单流和订单明细流先创建定时器并保存state状态处于窗口内就进行join, 否则进入侧输出流。 7 双流JOIN的优化与总结 为什么我的双流join时间到了却不触发一直没有输出 检查一下watermark的设置是否合理数据时间是否远远大于watermark和窗口时间导致窗口数据经常为空 state数据保存多久会内存爆炸吗 state自带有ttl机制可以设置ttl过期策略触发Flink清理过期state数据。建议程序中的state数据结构用完后手动clear掉。 我的双流join倾斜怎么办 join倾斜三板斧: 过滤异常key、拆分表减少数据、打散key分布。当然可以的话我建议加内存加内存加内存 想实现多流join怎么办 目前无法一次实现可以考虑先union然后再二次处理或者先进行connnect操作再进行join操作仅建议~ join过程延迟、没关联上的数据会丢失吗 这个一般来说不会join过程可以使用侧输出流存储延迟流如果出现节点网络等异常Flink checkpoint也可以保证数据不丢失。