在线生成app免费,网站seo排名培训,如何建立小企业网站,西安网页设计培训费用一.Flink中的状态
1.1 概述
在Flink中#xff0c;算子任务可以分为有状态和无状态两种状态。
无状态的算子任务只需要观察每个独立事件#xff0c;根据当前输入的数据直接转换输出结果。例如Map、Filter、FlatMap都是属于无状态算子。
而有状态的算子任务#xff0c;就…一.Flink中的状态
1.1 概述
在Flink中算子任务可以分为有状态和无状态两种状态。
无状态的算子任务只需要观察每个独立事件根据当前输入的数据直接转换输出结果。例如Map、Filter、FlatMap都是属于无状态算子。
而有状态的算子任务就是除了当前数据外还需要一些其他的数据来得到计算结果。这里的其他数据就是所谓的“状态”。例如聚合函数、窗口函数都属于有状态算子。
1.2 状态的分类
1.2.1 托管状态Managed State和原始状态Raw State
Flink的状态有两种托管状态Managed State和原始状态Raw State。托管状态就是由Flink统一管理的状态的存储访问、故障恢复和重组等一系列问题都由Flink实现我们只要调接口就可以而原始状态则是自定义的所有的状态具体管理则需要自行实现。
一般使用托管状态即可。后面的所有内容也仅是基于托管状态的。
1.2.2 算子状态Operator State和按键分区状态Keyed State
在Flink中一个算子任务会按照并行度分为多个并行子任务执行而不同的子任务会占据不同的任务槽task slot。由于不同的slot在计算资源上是物理隔离的所以Flink能管理的状态在并行任务间是无法共享的每个状态只能针对当前子任务的实例有效。
而很多有状态的操作比如聚合、窗口都是要先做keyBy进行按键分区的。按键分区之后任务所进行的所有计算都应该只针对当前key有效所以状态也应该按照key彼此隔离。在这种情况下状态的访问方式又会有所不同。
基于这样的想法又可以将托管状态分为两类算子状态和按键分区状态。经过KeyBy操作后的状态则被称为按键分区状态Keyed State否则就是“算子状态Operator State”。
算子状态
算子状态的状态作用范围为当前算子任务实例-即每个task(分区)间状态不共享。 算子状态可以用在所有算子上使用时与本地变量没什么区别在使用时需实现checkpoint接口。假如使用新的Source架构则需要继承SourceReaderBase抽象类。
按键分区状态
按键分区状态只有在KeyBy后才能使用因为状态是根据输入流中定义的键(Key)来维护和访问的。每个Key分区间状态不共享。 二.按键分区状态Keyed State
按键分区状态Keyed State顾名思义是任务按照键key来访问和维护的状态。它的特点非常鲜明就是以key为作用范围进行隔离。
2.1 值状态ValueState
顾名思义状态中只保存一个“值”value。ValueStateT本身是一个接口源码如下
PublicEvolving
public interface ValueStateT extends State {T value() throws IOException;void update(T var1) throws IOException;
}
这里的T是泛型表示值状态数据类型。
对值的操作主要有以下
// 获取当前状态值
T value()// 更新/覆盖状态值
update(T value)
在具体使用时为了让运行时上下文清楚到底是哪个状态我们还需要创建一个“状态描述器”StateDescriptor来提供状态的基本信息。例如源码中ValueState的状态描述器构造方法如下
public ValueStateDescriptor(String name, ClassT typeClass) {super(name, typeClass, null);
}这里需要传入状态的名称和类型——这跟我们声明一个变量时做的事情完全一样。
案例检测每种传感器的水位值如果连续的两个水位值差值超过10就输出报警。
public class KeyedValueStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(xxx.xxx.xxx.xxx, 1234).map(new MyMapFunctionImpl()).assignTimestampsAndWatermarks(WatermarkStrategy.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(3)) // 设置最大等待时间为3s.withTimestampAssigner((SerializableTimestampAssignerWaterSensor) (waterSensor, l) - waterSensor.getTs() * 1000L));// 对传感器做KeyBySingleOutputStreamOperatorString process sensorDS.keyBy(r - r.getId()).process(new KeyedProcessFunctionString, WaterSensor, String() {// 定义状态用于每组当前的水位线ValueStateInteger lastVcState;// 必须在open方法中初始化状态Overridepublic void open(Configuration parameters) throws Exception {// 初始化值状态需传入值状态描述器(唯一的名称值的类型)lastVcState getRuntimeContext().getState(new ValueStateDescriptor(lastVc, Types.INT));}Overridepublic void processElement(WaterSensor value, KeyedProcessFunctionString, WaterSensor, String.Context ctx, CollectorString out) throws Exception {// 1.取出上一条水位线int lastVc lastVcState.value() null ? value.getVc() : lastVcState.value();// 2.判断是否超过10if (Math.abs(value.getVc() - lastVc) 10) {out.collect(传感器 value.getId() ,上一次水位线 lastVc ,当前水位线: value.getVc() ,触发报警(相差超过10)!!!);}// 更新当前状态lastVcState.update(value.getVc());}});process.print();env.execute();}
输入
[rootVM-55-24-centos ~]# nc -lk 1234
s1,1,1
s1,2,13
s1,5,9
s1,6,22
s2,9,10
s2,10,23输出
传感器s1,上一次水位线1,当前水位线:13,触发报警(相差超过10)!!!
传感器s1,上一次水位线9,当前水位线:22,触发报警(相差超过10)!!!
传感器s2,上一次水位线10,当前水位线:23,触发报警(相差超过10)!!!如果不使用状态存储则需要定义HashMap存储每个Key的水位线没有状态高效。
2.2 列表状态ListState
将需要保存的数据以列表List的形式组织起来。在ListStateT接口中同样有一个类型参数T表示列表中数据的类型。ListState也提供了一系列的方法来操作状态使用方式与一般的List非常相似。
对 List 状态的操作主要有以下
// 获取当前的列表状态返回的是一个可迭代类型IterableT
IterableT get()// 传入一个列表values直接对状态进行覆盖
update(ListT values)// 向列表中添加多个元素以列表values形式传入
addAll(ListT values)类似地ListState的状态描述器就叫作ListStateDescriptor用法跟ValueStateDescriptor完全一致。
案例:针对每种传感器输出最高的3个水位值。
public class KeyedListStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(xxx.xxx.xxx.xxx, 1234).map(new MyMapFunctionImpl()).assignTimestampsAndWatermarks(WatermarkStrategy.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(3)) // 设置最大等待时间为3s.withTimestampAssigner((SerializableTimestampAssignerWaterSensor) (waterSensor, l) - waterSensor.getTs() * 1000L));// 对传感器做KeyBySingleOutputStreamOperatorString process sensorDS.keyBy(r - r.getId()).process(new KeyedProcessFunctionString, WaterSensor, String() {// 定义 ListStateListStateInteger vcListState;// 初始化 ListStateOverridepublic void open(Configuration parameters) throws Exception {super.open(parameters);vcListState getRuntimeContext().getListState(new ListStateDescriptor(vcListState,Types.INT));}Overridepublic void processElement(WaterSensor value, KeyedProcessFunctionString, WaterSensor, String.Context ctx, CollectorString out) throws Exception {// 将当前水位线存入 ListStatevcListState.add(value.getVc());// 将 ListState (迭代器)中的值取出拷贝到 List 中ListInteger vcList new ArrayListInteger();for (Integer vc : vcListState.get()) {vcList.add(vc);}// 排序vcList.sort(((o1, o2) - o2 - o1));// 取前三if (vcList.size() 3) {vcList.remove(3);}out.collect(当前传感器:value.getId(),最大的3个水位线为:vcList.toString());// 更新 ListStatevcListState.update(vcList);}});process.print();env.execute();}
}输入
[rootVM-55-24-centos ~]# nc -lk 1234
s1,1,1
s1,4,4
s1,5,3
s1,6,6
s2,5,6
s3,6,5
s2,4,7输出
当前传感器:s1,最大的3个水位线为:[1]
当前传感器:s1,最大的3个水位线为:[4, 1]
当前传感器:s1,最大的3个水位线为:[4, 3, 1]
当前传感器:s1,最大的3个水位线为:[6, 4, 3]
当前传感器:s2,最大的3个水位线为:[6]
当前传感器:s3,最大的3个水位线为:[5]
当前传感器:s2,最大的3个水位线为:[7, 6]
2.3 Map状态MapState
把一些键值对key-value作为状态整体保存起来可以认为就是一组key-value映射的列表。使用与Map非常类似。
对Map状态的操作主要有以下
// 根据key查询mapState中的value
UV get(UK key)// 向mapState中put一个键值对
put(UK key, UV value)// 向mapState中put多个键值对
putAll(MapUK, UV map)// 将指定key对应的键值对删除
remove(UK key)// 判断是否存在指定的key
boolean contains(UK key)// 获取映射状态中所有的键值对
IterableMap.EntryUK, UV entries()// 获取映射状态中所有的键key返回一个可迭代Iterable类型
IterableUK keys()// 获取映射状态中所有的值value返回一个可迭代Iterable类型
IterableUV values()// 判断映射是否为空
boolean isEmpty()案例统计每种传感器每种水位值出现的次数。
public class KeyedMapStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(xxx.xxx.xxx.xxx, 1234).map(new MyMapFunctionImpl()).assignTimestampsAndWatermarks(WatermarkStrategy.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(3)) // 设置最大等待时间为3s.withTimestampAssigner((SerializableTimestampAssignerWaterSensor) (waterSensor, l) - waterSensor.getTs() * 1000L));// 对传感器做KeyBySingleOutputStreamOperatorString process sensorDS.keyBy(r - r.getId()).process(new KeyedProcessFunctionString, WaterSensor, String() {// 定义Map状态键为vc(Integer),值为count(Integer)MapStateInteger,Integer vcCountMapState;// 初始化Map状态Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);vcCountMapState getRuntimeContext().getMapState(new MapStateDescriptorInteger,Integer(vcCountMapState,Types.INT,Types.INT));}Overridepublic void processElement(WaterSensor value, KeyedProcessFunctionString, WaterSensor, String.Context ctx, CollectorString out) throws Exception {// 判断map状态中是否存在该vc存在则count1否则put进map状态Integer vc value.getVc();if(vcCountMapState.contains(value.getVc())){Integer vcCount vcCountMapState.get(vc);vcCountMapState.put(vc , vcCount);}else{vcCountMapState.put(vc , 1);}StringBuilder outStr new StringBuilder();outStr.append(传感器:value.getId(),下的所有水位线及出现次数\n);// 遍历该key下的所有键值for (Map.EntryInteger, Integer entry : vcCountMapState.entries()) {outStr.append(vcentry.getKey(),countentry.getValue()\n);}outStr.append(------------------------------------------------------);out.collect(outStr.toString());}});process.print();env.execute();}
}
输入
[rootVM-55-27-centos ~]# nc -lk 1234
s1,1,1
s1,2,2
s1,3,1
s2,1,1
s1,4,1输出
传感器:s1,下的所有水位线及出现次数
vc1,count1
------------------------------------------------------
传感器:s1,下的所有水位线及出现次数
vc1,count1
vc2,count1
------------------------------------------------------
传感器:s1,下的所有水位线及出现次数
vc1,count2
vc2,count1
------------------------------------------------------
传感器:s2,下的所有水位线及出现次数
vc1,count1
------------------------------------------------------
传感器:s1,下的所有水位线及出现次数
vc1,count3
vc2,count1
------------------------------------------------------
2.4 归约状态ReducingState
类似于值状态Value不过需要对添加进来的所有数据进行归约将归约聚合之后的值作为状态保存下来。
与之前不同的是在归约状态描述器中需要传入ReduceFunction实现具体的归约逻辑。
对归约状态的操作主要有以下
// 把新数据和之前的状态进行归约并用得到的结果更新状态。
add(IN)// 获取归约状态中的值
OUT get()
案例计算每种传感器的水位和。
public class KeyedReducingStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(xxx.xxx.xxx.xxx, 1234).map(new MyMapFunctionImpl()).assignTimestampsAndWatermarks(WatermarkStrategy.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(3)) // 设置最大等待时间为3s.withTimestampAssigner((SerializableTimestampAssignerWaterSensor) (waterSensor, l) - waterSensor.getTs() * 1000L));// 对传感器做KeyBySingleOutputStreamOperatorString process sensorDS.keyBy(r - r.getId()).process(new KeyedProcessFunctionString, WaterSensor, String() {// 定义Reducing状态ReducingStateInteger vcSumReducingState;// 初始化Reducing状态(需要传入ReduceFunction实现具体的归约逻辑)Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);vcSumReducingState getRuntimeContext().getReducingState(new ReducingStateDescriptor(vcSumReducingState,// 归约逻辑 两数相加(v1, v2) - v1v2,Types.INT));}Overridepublic void processElement(WaterSensor value, KeyedProcessFunctionString, WaterSensor, String.Context ctx, CollectorString out) throws Exception {vcSumReducingState.add(value.getVc());out.collect(传感器:value.getId(),水位线总值为:vcSumReducingState.get());}});process.print();env.execute();}
}
输入
[rootVM-55-24-centos ~]# nc -lk 1234
s1,1,1
s1,2,2
s1,3,3
s2,4,4
s2,5,5
s1,6,6
s3,7,7输出
传感器:s1,水位线总值为:1
传感器:s1,水位线总值为:3
传感器:s1,水位线总值为:6
传感器:s2,水位线总值为:4
传感器:s2,水位线总值为:9
传感器:s1,水位线总值为:12
传感器:s3,水位线总值为:7
2.5 聚合状态AggregatingState
与归约状态非常类似聚合状态也是一个值用来保存添加进来的所有数据的聚合结果。并且允许输入、输出、中间累加器类型可以不一致。
对聚合状态的操作主要有以下
// 向聚合状态中添加元素
add(IN)// 从聚合状态中获取结果
OUT get()
案例计算每种传感器的平均水位。
public class KeyedAggregatingStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(xxx.xxx.xxx, 1234).map(new MyMapFunctionImpl()).assignTimestampsAndWatermarks(WatermarkStrategy.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(3)) // 设置最大等待时间为3s.withTimestampAssigner((SerializableTimestampAssignerWaterSensor) (waterSensor, l) - waterSensor.getTs() * 1000L));// 对传感器做KeyBySingleOutputStreamOperatorString process sensorDS.keyBy(r - r.getId()).process(new KeyedProcessFunctionString, WaterSensor, String() {// 定义聚合状态AggregatingStateInteger,Double vcAvgAggState;// 初始化聚合状态Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);vcAvgAggState getRuntimeContext().getAggregatingState(new AggregatingStateDescriptor(vcAvgAggState,// 聚合逻辑new AggregateFunctionInteger, Tuple2Integer, Integer, Double() {// 初始化累加器Overridepublic Tuple2Integer, Integer createAccumulator() {return Tuple2.of(0, 0);}// 累加逻辑 水位相加次数1Overridepublic Tuple2Integer, Integer add(Integer integer, Tuple2Integer, Integer accumulator) {return Tuple2.of(accumulator.f0 integer , accumulator.f1 1);}// 结果 水位 / 次数Overridepublic Double getResult(Tuple2Integer, Integer accumulator) {return (accumulator.f0 * 1D) / accumulator.f1;}Overridepublic Tuple2Integer, Integer merge(Tuple2Integer, Integer integerIntegerTuple2, Tuple2Integer, Integer acc1) {return null;}},Types.TUPLE(Types.INT, Types.INT)));}Overridepublic void processElement(WaterSensor value, KeyedProcessFunctionString, WaterSensor, String.Context ctx, CollectorString out) throws Exception {vcAvgAggState.add(value.getVc());out.collect(传感器:value.getId(),平均水位为:vcAvgAggState.get());}});process.print();env.execute();}
}
输入
[rootVM-55-24-centos ~]# nc -lk 1234
s1,1,1
s1,2,2
s1,4,4
s2,5,5
s2,6,6
s1,7,7
输出
传感器:s1,平均水位为:1.0
传感器:s1,平均水位为:1.5
传感器:s1,平均水位为:2.3333333333333335
传感器:s2,平均水位为:5.0
传感器:s2,平均水位为:5.5
传感器:s1,平均水位为:3.5
2.6 状态生存时间TTL
随着Flink程序的运行状态所消耗的存储空间也会随之增长如果不限制则可能会导致存储空间耗尽。可以使用 .clear() 方法清除状态但是不够灵活。
可以在状态描述器中通过.enableTimeToLive()方法启动TTL功能并创建一个StateTtlConfig配置对象。
StateTtlConfig ttlConfig StateTtlConfig.newBuilder(Time.seconds(10)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();ValueStateDescriptorString stateDescriptor new ValueStateDescriptor(my state, String.class);// 开启TTL
stateDescriptor.enableTimeToLive(ttlConfig);主要的配置项 .newBuilder() 状态TTL配置的构造器方法,需传入Time参数设定状态过期时间 .setUpdateType()设置更新类型什么时机进行更新失效时间(重置失效时间) OnCreateAndWrite 创建状态和更改状态写操作时更新失效时间 OnReadAndWrite无论读写操作都会更新失效时间 .setStateVisibility()设置状态的可见性 NeverReturnExpired表示从不返回过期值 ReturnExpireDefNotCleanedUp如果过期状态还存在则返回 示例代码
public class StateTTLDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(xxx.xxx.xxx.xxx, 1234).map(new MyMapFunctionImpl()).assignTimestampsAndWatermarks(WatermarkStrategy.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(3)) // 设置最大等待时间为3s.withTimestampAssigner((SerializableTimestampAssignerWaterSensor) (waterSensor, l) - waterSensor.getTs() * 1000L));SingleOutputStreamOperatorString process sensorDS.keyBy(r - r.getId()).process(new KeyedProcessFunctionString, WaterSensor, String() {ValueStateInteger lastVcState;Overridepublic void open(Configuration parameters) throws Exception {// 创建 StateTtlConfigStateTtlConfig stateTtlConfig StateTtlConfig.newBuilder(Time.seconds(10)) // 状态存活时间为10s.updateTtlOnCreateAndWrite() // 创建/更新状态时重置存活时间.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) // 不返回过期状态.build();// 启用 TTLValueStateDescriptorInteger stateDescriptor new ValueStateDescriptor(lastVc, Types.INT);stateDescriptor.enableTimeToLive(stateTtlConfig);lastVcState getRuntimeContext().getState(stateDescriptor);}Overridepublic void processElement(WaterSensor value, KeyedProcessFunctionString, WaterSensor, String.Context ctx, CollectorString out) throws Exception {out.collect(传感器:value.getId(),当前状态值:lastVcState.value());lastVcState.update(value.getVc());}});process.print();env.execute();}
}
输入
[rootVM-55-24-centos ~]# nc -lk 1234
s1,1,1
...间隔10s
s1,2,2
s1,3,3
s1,4,4
s1,5,5
输出
传感器:s1,当前状态值:null
传感器:s1,当前状态值:1
传感器:s1,当前状态值:null
传感器:s1,当前状态值:3
传感器:s1,当前状态值:4
三.算子状态Operator State
算子状态Operator State就是一个算子并行实例上定义的状态作用范围被限定为当前算子任务。每个算子子任务共享一个算子状态子任务间不共享
算子状态的实际应用场景不如Keyed State多一般用在Source或Sink等与外部系统连接的算子上一般使用不多。
当算子并行度发生变化时算子状态也支持在并行的算子子任务实例间做重新分配根据状态的类型不同重组分配的方案也会不同。
算子状态也支持不同的结构类型主要有三种ListState、UnionListState和BroadcastState。
3.1 列表状态ListState
与Keyed State中的ListState一样将状态表示为一组数据的列表。
与Keyed State中的列表状态的区别是在算子状态的上下文中不会按键key分别处理状态所以每一个并行子任务上只会保留一个“列表”list也就是当前并行子任务上所有状态项的集合。列表中的状态项就是可以重新分配的最细粒度彼此之间完全独立。
当算子并行度进行缩放调整时算子的状态列表将会被全部收集收集起来再通过轮询的方式重新依次分配给新的所有并行任务。
算子状态中不会存在“键组”key group这样的结构所以为了方便重组分配就把它直接定义成了“列表”list。这也就解释了为什么算子状态中没有最简单的值状态ValueState。
案例在map算子中计算数据的个数。
/*** 在map算子中计算数据的个数*/
public class OperatorListStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 全局算子并行度为2env.setParallelism(2);env.socketTextStream(xxx.xxx.xxx.xxx, 1234).map(new MyCountMapFunction()).print();env.execute();}// 实现 CheckPointedFunction 接口public static class MyCountMapFunction implements MapFunctionString,Long, CheckpointedFunction {// 定义本地变量private Long count 0L;// 定义算子状态private ListStateLong state;// Map算子逻辑Overridepublic Long map(String s) throws Exception {return count;}/*** 状态快照用于将本地变量持久化至算子状态中,开启checkpoint时才会调用* param context the context for drawing a snapshot of the operator* throws Exception*/Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {System.out.println(调用了snapshotState方法...);// 清空状态state.clear();// 将本地变量存入状态中state.add(count);}/**、* 初始化本地变量程序启动和恢复时从状态中把数据添加到本地变量每个子任务调用一次* param context the context for initializing the operator* throws Exception*/Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {System.out.println(调用了initializeState方法...);// 从上下文中获取算子状态state context.getOperatorStateStore().getListState(new ListStateDescriptorLong(list-state, Types.LONG));// 从算子状态中将数据拷贝至本地变量if (context.isRestored()) { // 判断是否初始化成功for (Long v : state.get()) {count v;}}}}
}
输入
[rootVM-55-24-centos ~]# nc -lk 1234
a
b
c
d
e
f
g输出
调用了initializeState方法...
调用了initializeState方法...
1 1
2 1
1 2
2 2
1 3
2 3
1 4
3.2 联合列表状态UnionListState
与ListState类似联合列表状态也会将状态表示为一个列表。它与常规列表状态的区别在于算子并行度进行缩放调整时对于状态的分配方式不同。
在并行度进行缩放调整时联合列表与普通列表不同联合列表会将所有并行子任务的列表状态收集起来并直接向所有并行子任务广播完整的列表。如果列表中状态项太多则不推荐使用联合里欸包状态。
使用上也与ListState类似只需要在实现CheckpointedFunction类的initializeState方法时通过上下文获取算子状态使用 .getUnionListState() 即可其他与ListState无异。
state context.getOperatorStateStore().getUnionListState(new ListStateDescriptor(list-state, Types.LONG));
3.3 广播状态BroadcastState
有时我们希望算子并行子任务都保持同一份“全局”状态用来做统一的配置和规则设定。这时所有分区的所有数据都会访问到同一个状态状态就像被“广播”到所有分区一样这种特殊的算子状态就叫作广播状态BroadcastState。
在并行度进行缩放操作时由于是全局状态也不会造成影响。
案例水位超过指定的阈值发送告警阈值可以动态修改。
/*** 水位超过指定的阈值发送告警阈值可以动态修改。*/
public class OperatoBroadcastStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);// 数据流SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(xxx.xxx.xxx.xxx, 1234).map(new MyMapFunctionImpl());// 配置流用于广播配置(阈值配置将发往这条流)DataStreamSourceString configDS env.socketTextStream(xxx.xxx.xxx.xxx, 4321);// 将配置流进行广播MapStateDescriptorString, Integer broadcastMapState new MapStateDescriptor(broadcast-state, Types.STRING, Types.INT);BroadcastStreamString configBS configDS.broadcast(broadcastMapState);// 将数据流和广播后的配置流使用connect进行连接BroadcastConnectedStreamWaterSensor, String sensorBCS sensorDS.connect(configBS);// 调用processsensorBCS.process(new BroadcastProcessFunctionWaterSensor, String, String() {/*** 数据流的处理逻辑可以通过上下文读取广播状态(只读)* param value The stream element.* param ctx* param out* throws Exception*/Overridepublic void processElement(WaterSensor value, BroadcastProcessFunctionWaterSensor, String, String.ReadOnlyContext ctx, CollectorString out) throws Exception {// 通过上下文获取广播状态的值(阈值)ReadOnlyBroadcastStateString, Integer broadcastState ctx.getBroadcastState(broadcastMapState);// 未从广播状态中读到值则设置默认值Integer threshold broadcastState.get(threshold) ! null ? broadcastState.get(threshold): 0;if(value.getVc() threshold){out.collect(传感器: value.getId(),当前水位为: value.getVc(),触发了阈值:threshold);}}/*** 配置广播流的处理逻辑可以通过上下文可以往广播状态写入值* param value The stream element.* param ctx* param out* throws Exception*/Overridepublic void processBroadcastElement(String value, BroadcastProcessFunctionWaterSensor, String, String.Context ctx, CollectorString out) throws Exception {// 读取流中的阈值写入广播状态中BroadcastStateString, Integer broadcastState ctx.getBroadcastState(broadcastMapState);broadcastState.put(threshold , Integer.valueOf(value));}}).print();env.execute();}
}
输入
// 数据流
[rootVM-55-24-centos ~]# nc -lk 1234
s1,1,1
s1,2,2输出
2 传感器:s1,当前水位为:1,触发了阈值:0
1 传感器:s1,当前水位为:2,触发了阈值:0
输入
// 广播配置流
[rootVM-55-24-centos ~]# nc -lk 4321
10// 数据流
[rootVM-55-24-centos ~]# nc -lk 1234
s1,7,7
s1,11,11输出
1 传感器:s1,当前水位为:11,触发了阈值:10 简单来说就是一条流广播后专门读取配置与普通的数据流进行连结然后广播流将配置加载到广播状态中这样普通的数据流就能够在不重启程序的情况下通过上下文动态读取配置。 应用场景MySQL定义中一张配置表定义一条配置流读取MySQL中的binlog配置表如有修改就将相应的配置广播出去更改数据库即可实现线上程序动态配置。
四.状态后端State Backends
在Flink中状态的存储、访问以及维护都是由一个可插拔的组件决定的这个组件就叫作状态后端state backend。状态后端主要负责管理本地状态的存储方式和位置。
4.1 状态后端的分类HashMapStateBackend/RocksDB
状态后端是一个“开箱即用”的组件可以在不改变应用程序逻辑的情况下独立配置。Flink中提供了两类不同的状态后端一种是“哈希表状态后端”HashMapStateBackend另一种是“内嵌RocksDB状态后端”EmbeddedRocksDBStateBackend。如果没有特别配置系统默认的状态后端是HashMapStateBackend。
4.1.1 哈希表状态后端HashMapStateBackend
HashMapStateBackend是把状态存放在内存里。具体实现上哈希表状态后端在内部会直接把状态当作对象objects保存在Taskmanager的JVM堆上。普通的状态以及窗口中收集的数据和触发器都会以键值对的形式存储起来所以底层是一个哈希表HashMap这种状态后端也因此得名。
4.1.2 内嵌RocksDB状态后端EmbeddedRocksDBStateBackend
RocksDB是一种内嵌的key-value存储介质可以把数据持久化到本地硬盘。配置EmbeddedRocksDBStateBackend后会将处理中的数据全部放入RocksDB数据库中RocksDB默认存储在TaskManager的本地数据目录里。
RocksDB的状态数据被存储为序列化的字节数组读写操作需要序列化/反序列化因此状态的访问性能要差一些。另外因为做了序列化key的比较也会按照字节进行而不是直接调用.hashCode()和.equals()方法。
EmbeddedRocksDBStateBackend始终执行的是异步快照(快照时不会阻塞任务)所以不会因为保存检查点而阻塞数据的处理而且它还提供了增量式保存检查点的机制这在很多情况下可以大大提升保存效率。
4.2 如何选择正确的状态后端
HashMapStateBackendEmbeddedRocksDBStateBackend存储介质Taskmanager的JVM堆内存Taskmanager的JVM的文件磁盘读写速度快慢
由此可以看出虽然HashMapStateBackend的读写速度快但是使用的是Taskmanager的JVM堆内存如果存储的状态较大则可能会将Taskmanager的内存耗尽。EmbeddedRocksDBStateBackend则存在Taskmanager的本地磁盘中可以存储大的状态不过牺牲了一定的读写速度。
4.3 状态后端的配置
在默认配置下应用程序使用的默认状态后端是由集群配置文件flink-conf.yaml中指定的配置名称为state.backend可修改为hashmap或rocksdb。除此之外还可以在提交作业时通过参数设置状态后端、以及在代码中指定。
4.3.1 配置默认的状态后端
在flink-conf.yaml中可以使用state.backend来配置默认状态后端。
配置项的可能值为hashmap这样配置的就是HashMapStateBackend如果配置项的值是rocksdb这样配置的就是EmbeddedRocksDBStateBackend。
# 默认状态后端
state.backend: hashmap4.2.2 为每个作业Per-job/Application单独配置状态后端
通过执行环境设置 hashMapStateBackend
StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 设置状态后端为HashMap
HashMapStateBackend hashMapStateBackend new HashMapStateBackend();
env.setStateBackend(hashMapStateBackend);
通过执行环境设置 EmbeddedRocksDBStateBackend
在IDE使用EmbeddedRocksDBStateBackend则需要导入以下依赖
dependencygroupIdorg.apache.flink/groupIdartifactIdflink-statebackend-rocksdb/artifactIdversion${flink.version}/version
/dependency
设置 EmbeddedRocksDBStateBackend 状态后端
StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 设置状态后端为RocksDB
EmbeddedRocksDBStateBackend embeddedRocksDBStateBackend new EmbeddedRocksDBStateBackend();
env.setStateBackend(embeddedRocksDBStateBackend);
4.2.3 提交参数设置状态后端
[rootVM-55-24-centos flink-1.17.0]#
bin/flink run -m localhost:1234 -D state.backendrocksdb -c com.xxx.wc.SocketStreamWordCount ./FlinkTutorial-1.0-SNAPSHOT.jar
-D 指定状态后端 文章转载自: http://www.morning.fhqdb.cn.gov.cn.fhqdb.cn http://www.morning.xxrgt.cn.gov.cn.xxrgt.cn http://www.morning.xrmwc.cn.gov.cn.xrmwc.cn http://www.morning.ruyuaixuexi.com.gov.cn.ruyuaixuexi.com http://www.morning.yqsq.cn.gov.cn.yqsq.cn http://www.morning.yhljc.cn.gov.cn.yhljc.cn http://www.morning.rxyz.cn.gov.cn.rxyz.cn http://www.morning.ybmp.cn.gov.cn.ybmp.cn http://www.morning.sfcfy.cn.gov.cn.sfcfy.cn http://www.morning.qkdcb.cn.gov.cn.qkdcb.cn http://www.morning.bhpsz.cn.gov.cn.bhpsz.cn http://www.morning.klpwl.cn.gov.cn.klpwl.cn http://www.morning.xcyzy.cn.gov.cn.xcyzy.cn http://www.morning.wfyzs.cn.gov.cn.wfyzs.cn http://www.morning.cxtbh.cn.gov.cn.cxtbh.cn http://www.morning.llsrg.cn.gov.cn.llsrg.cn http://www.morning.gqfks.cn.gov.cn.gqfks.cn http://www.morning.qcwck.cn.gov.cn.qcwck.cn http://www.morning.rjnm.cn.gov.cn.rjnm.cn http://www.morning.rxlck.cn.gov.cn.rxlck.cn http://www.morning.wphzr.cn.gov.cn.wphzr.cn http://www.morning.ygflz.cn.gov.cn.ygflz.cn http://www.morning.bfwk.cn.gov.cn.bfwk.cn http://www.morning.wsxly.cn.gov.cn.wsxly.cn http://www.morning.kgqpx.cn.gov.cn.kgqpx.cn http://www.morning.yrqb.cn.gov.cn.yrqb.cn http://www.morning.nmnhs.cn.gov.cn.nmnhs.cn http://www.morning.wschl.cn.gov.cn.wschl.cn http://www.morning.nypsz.cn.gov.cn.nypsz.cn http://www.morning.lsyk.cn.gov.cn.lsyk.cn http://www.morning.hrtfz.cn.gov.cn.hrtfz.cn http://www.morning.hcsnk.cn.gov.cn.hcsnk.cn http://www.morning.mdnnz.cn.gov.cn.mdnnz.cn http://www.morning.zkpwk.cn.gov.cn.zkpwk.cn http://www.morning.sftpg.cn.gov.cn.sftpg.cn http://www.morning.sftpg.cn.gov.cn.sftpg.cn http://www.morning.ftcrt.cn.gov.cn.ftcrt.cn http://www.morning.jytrb.cn.gov.cn.jytrb.cn http://www.morning.woyoua.com.gov.cn.woyoua.com http://www.morning.gcszn.cn.gov.cn.gcszn.cn http://www.morning.qbtkg.cn.gov.cn.qbtkg.cn http://www.morning.hjrjy.cn.gov.cn.hjrjy.cn http://www.morning.lpsjs.com.gov.cn.lpsjs.com http://www.morning.dmchips.com.gov.cn.dmchips.com http://www.morning.krfpj.cn.gov.cn.krfpj.cn http://www.morning.zcrjq.cn.gov.cn.zcrjq.cn http://www.morning.nyzmm.cn.gov.cn.nyzmm.cn http://www.morning.tqsmc.cn.gov.cn.tqsmc.cn http://www.morning.zhengdaotang.cn.gov.cn.zhengdaotang.cn http://www.morning.rbnnq.cn.gov.cn.rbnnq.cn http://www.morning.rrxgx.cn.gov.cn.rrxgx.cn http://www.morning.tnbsh.cn.gov.cn.tnbsh.cn http://www.morning.syznh.cn.gov.cn.syznh.cn http://www.morning.tjjkn.cn.gov.cn.tjjkn.cn http://www.morning.nlbw.cn.gov.cn.nlbw.cn http://www.morning.pcrzf.cn.gov.cn.pcrzf.cn http://www.morning.gtqx.cn.gov.cn.gtqx.cn http://www.morning.lzzqz.cn.gov.cn.lzzqz.cn http://www.morning.tqpds.cn.gov.cn.tqpds.cn http://www.morning.rbmm.cn.gov.cn.rbmm.cn http://www.morning.nbiotank.com.gov.cn.nbiotank.com http://www.morning.gycyt.cn.gov.cn.gycyt.cn http://www.morning.iknty.cn.gov.cn.iknty.cn http://www.morning.spbp.cn.gov.cn.spbp.cn http://www.morning.wjxyg.cn.gov.cn.wjxyg.cn http://www.morning.rkxqh.cn.gov.cn.rkxqh.cn http://www.morning.jqllx.cn.gov.cn.jqllx.cn http://www.morning.zsyqg.cn.gov.cn.zsyqg.cn http://www.morning.kwblwbl.cn.gov.cn.kwblwbl.cn http://www.morning.fdzzh.cn.gov.cn.fdzzh.cn http://www.morning.skrww.cn.gov.cn.skrww.cn http://www.morning.drqrl.cn.gov.cn.drqrl.cn http://www.morning.qxlgt.cn.gov.cn.qxlgt.cn http://www.morning.wrlxt.cn.gov.cn.wrlxt.cn http://www.morning.mztyh.cn.gov.cn.mztyh.cn http://www.morning.srckl.cn.gov.cn.srckl.cn http://www.morning.lxfyn.cn.gov.cn.lxfyn.cn http://www.morning.mspkz.cn.gov.cn.mspkz.cn http://www.morning.ndlww.cn.gov.cn.ndlww.cn http://www.morning.tnhqr.cn.gov.cn.tnhqr.cn