宁夏住房和建设厅官方网站,手机怎样建立网站,重庆市建设工程信息网综合网,商业网站建设与维护Flink主要有两种基础类型的状态#xff1a;keyed state 和operator state。 Keyed State Keyed State总是和keys相关#xff0c;并且只能用于KeyedStream上的函数和操作。 你可以将Keyed State视为是已经被分片或分区的Operator State#xff0c;每个key都有且仅有一个状态分…Flink主要有两种基础类型的状态keyed state 和operator state。 Keyed State Keyed State总是和keys相关并且只能用于KeyedStream上的函数和操作。 你可以将Keyed State视为是已经被分片或分区的Operator State每个key都有且仅有一个状态分区(state-partition)。每个keyed-state逻辑上绑定到一个唯一的parallel-operator-instance, key组合上由于每个key“属于”keyed operator的一个并行实例所以我们可以简单的认为是operator,key。 Keyed State进一步被组织到所谓的Key Groups中。Key Groups是Flink能够重新分配keyed State的原子单元。Key Groups的数量等于定义的最大并行度。在一个keyed operator的并行实例执行期间它与一个或多个Key Groups配合工作。 Raw and Managed State Keyed State 和 Operator State 有两种形式: managed和raw。 Managed State表示数据结构由Flink runtime控制例如内部哈希表或者RocksDB。例如“ValueState”“ListState”等等。Flink的runtime层会编码State并将其写入checkpoint中。 Raw State是操作算子保存在它的数据结构中的state。当进行checkpoint时它只写入字节序列到checkpoint中。Flink并不知道状态的数据结构并且只能看到raw字节。 所有的数据流函数都可以使用managed state但是raw state接口只可以在操作算子的实现类中使用。推荐使用managed state(而不是raw state)因为使用managed state当并行度变化时Flink可以自动的重新分布状态也可以做更好的内存管理。 注意 如果你的managed state需要自定义序列化逻辑请参见managed state的自定义序列化以确保未来的兼容性。Flink默认的序列化不需要特殊处理。 Managed Keyed State managed keyed state接口提供了对当前输入元素的key的不同类型的状态的访问。这意味着这种类型的状态只能在KeyedStream中使用它可以通过stream.keyBy(…)创建。 现在我们首先看下不同类型的状态然后展示如何在程序中使用它们。可用的状态原语是 ValueState:它会保存一个可以被更新和查询的值(受限于上面提到的输入元素的key算子看到的每个key可能仅一个值)。可使用update(T) 和 T value() 更新和查询值。 ListState: 它保存了一个元素列表。你可以添加元素和检索Iterable来获取所有当前存储的元素。添加元素使用add(T)或者addAll(List)方法获取Iterable使用Iterable get()方法。也可以使用update(List)覆盖已有的list。
ReducingState: 它保存了一个聚合了所有添加到这个状态的值的结果。接口和ListState相同但是使用add(T)方法本质是使用指定ReduceFunction的聚合行为。
AggregatingStateIN, OUT: 它保存了一个聚合了所有添加到这个状态的值的结果。与ReducingState有些不同聚合类型可能不同于添加到状态的元素的类型。接口和ListState相同但是使用add(IN)添加的元素本质是通过使用指定的AggregateFunction进行聚合。
FoldingStateT, ACC:它保存了一个聚合了所有添加到这个状态的值的结果。与ReducingState有些不同聚合类型可能不同于添加到状态的元素的类型。接口和ListState相同但是使用add(IN)添加的元素本质是通过使用指定的FoldFunction折叠进行聚合。
MapStateUK, UV:它保存了一个映射列表。你可以将key-value对放入状态中并通过Iterable检索所有当前存储的映射关系。使用put(UK, UV) 或 putAll(MapUK, UV)添加映射关系。使用get(UK)获取key相关的value。分别使用entries(), keys() 和 values() 获取映射关系key和value的视图。
所有类型的状态都有一个clear()方法用以清除当前活跃key(即输入元素的key)的状态。
注意 FoldingState 和 FoldingStateDescriptor在Flink1.4中已经被废弃并且可能在将来完全删除。请使用AggregatingState和 AggregatingStateDescriptor替代。
首先需要记住的是这些状态对象只能用来与状态进行交互。状态不一定存储在内存中但是可能存储在磁盘或者其他地方。第二个需要记住的是从状态获取的值依赖于输入元素的key。因此如果包含不同的key那么在你的用户函数中的一个调用获得的值和另一个调用获得值可能不同。
为了获得状态句柄必须创建一个StateDescriptor。它维护了状态的名称(稍后将看到你可以创建多个状态因此他们必须有唯一的名称以便你可以引用它们)状态维护的值的类型和可用户定义function例如ReduceFunction。根据你想要查询的状态的类型你可以创建ValueStateDescriptorListStateDescriptorReducingStateDescriptorFoldingStateDescriptor或MapStateDescriptor。
使用RuntimeContext访问状态因此它只有在richfunction中才可以使用。rich function的相关信息请看这里但是我们也很快会看到一个示例。RichFunction中RuntimeContext有这些访问状态的方法: ValueState getState(ValueStateDescriptor) ReducingState getReducingState(ReducingStateDescriptor) ListState getListState(ListStateDescriptor) AggregatingStateIN, OUT getAggregatingState(AggregatingStateIN, OUT) FoldingStateT, ACC getFoldingState(FoldingStateDescriptorT, ACC) MapStateUK, UV getMapState(MapStateDescriptorUK, UV)
public class CountWindowAverage extends RichFlatMapFunctionTuple2Long, Long, Tuple2Long, Long {
/*** The ValueState handle. The first field is the count, the second field a running sum.*/
private transient ValueStateTuple2Long, Long sum;Override
public void flatMap(Tuple2Long, Long input, CollectorTuple2Long, Long out) throws Exception {// access the state valueTuple2Long, Long currentSum sum.value();// update the countcurrentSum.f0 1;// add the second field of the input valuecurrentSum.f1 input.f1;// update the statesum.update(currentSum);// if the count reaches 2, emit the average and clear the stateif (currentSum.f0 2) {out.collect(new Tuple2(input.f0, currentSum.f1 / currentSum.f0));sum.clear();}
}Override
public void open(Configuration config) {ValueStateDescriptorTuple2Long, Long descriptor new ValueStateDescriptor(average, // the state nameTypeInformation.of(new TypeHintTuple2Long, Long() {}), // type informationTuple2.of(0L, 0L)); // default value of the state, if nothing was setsum getRuntimeContext().getState(descriptor);
}}
// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env) env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L)) .keyBy(0) .flatMap(new CountWindowAverage()) .print();
// the printed output will be (1,4) and (1,5)
State Time-To-Live(TTL) 任何类型的keyed state都可以使用TTL。如果配置了TTL一个状态值超时了储存的值就会在恰当的时候被删除后面会说到。
所有状态集合类型都支持 per-entry TTL。意味着list的元素和map的entry可以单独设置超时。
TTL的使用也很简单可以参考如下代码 import org.apache.flink.api.common.state.StateTtlConfig; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.time.Time;
StateTtlConfig ttlConfig StateTtlConfig .newBuilder(Time.seconds(1)) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .build();
ValueStateDescriptor stateDescriptor new ValueStateDescriptor(“text state”, String.class); stateDescriptor.enableTimeToLive(ttlConfig);
newBuilder方法是必须的。 Update类型的配置有以下两种 StateTtlConfig.UpdateType.OnCreateAndWrite 创建和写入
StateTtlConfig.UpdateType.OnReadAndWrite 也有读取功能
可视也即是在超时之后删除之前数据是否还能被读取可以配置的
StateTtlConfig.StateVisibility.NeverReturnExpired – 超时元素绝不返回 StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp – 如果数据没被删除可以返回。 NeverReturnExpired该参数一旦配置超时的状态可以视为不存在了即使还没有被删除。该选项是在一些TTL超时要求严格的场景还是很靠谱的比如处理隐私敏感的数据。
小提示 状态后端(statebackend)会给用户的每个value存储一个时间戳这就意味着会增加存储成本。堆状态后端(heap state backend)会在内存里存储一个额外的java对象(该对象带有指向用户状态对象的引用)和一个原始long值。RocksDB状态后端会为每个存储的值(list entry或者map entry)增加8byte。 当前TTL仅仅支持处理时间。 假如想用没有用TTL的savepoint去恢复当前指定了TTL的应用程序会报异常。 带TTL的map状态只有在序列化器支持处理null值的时候支持用户的null值。如果序列化器不支持null值可以使用nullableSerializer取包裹null值当然会带来额外的存储开销。 超时状态清除 当前的情况下超时值状态仅仅在读取的时候删除例如调用ValueState.value().
注意这意味着如果超时状态没有被读取的话就不会被删除然后状态会一直增大.期待将来会有改变吧.
另外可以配置在完成全量状态快照(full state snapshot)的时候删除状态这也可以减少状态大小。在当前的实现机制下本地状态不会被清除但是从之前快照里恢复的过程中不会保护已经删除的超时快照。配置方法如下 import org.apache.flink.api.common.state.StateTtlConfig; import org.apache.flink.api.common.time.Time;
StateTtlConfig ttlConfig StateTtlConfig .newBuilder(Time.seconds(1)) .cleanupFullSnapshot() .build(); 该配置不适合增量的快照机制也即是状态后端不能是RocksDB。 文章转载自: http://www.morning.pwfwk.cn.gov.cn.pwfwk.cn http://www.morning.syhwc.cn.gov.cn.syhwc.cn http://www.morning.rbtny.cn.gov.cn.rbtny.cn http://www.morning.qqhersx.com.gov.cn.qqhersx.com http://www.morning.cjwkf.cn.gov.cn.cjwkf.cn http://www.morning.rwnx.cn.gov.cn.rwnx.cn http://www.morning.dmzfz.cn.gov.cn.dmzfz.cn http://www.morning.xbwqg.cn.gov.cn.xbwqg.cn http://www.morning.gsjzs.cn.gov.cn.gsjzs.cn http://www.morning.nbnq.cn.gov.cn.nbnq.cn http://www.morning.ntffl.cn.gov.cn.ntffl.cn http://www.morning.jgncd.cn.gov.cn.jgncd.cn http://www.morning.sbkb.cn.gov.cn.sbkb.cn http://www.morning.zstbc.cn.gov.cn.zstbc.cn http://www.morning.fkgqn.cn.gov.cn.fkgqn.cn http://www.morning.qnbzs.cn.gov.cn.qnbzs.cn http://www.morning.kzyr.cn.gov.cn.kzyr.cn http://www.morning.srbfp.cn.gov.cn.srbfp.cn http://www.morning.fqljq.cn.gov.cn.fqljq.cn http://www.morning.dkzwx.cn.gov.cn.dkzwx.cn http://www.morning.jsmyw.cn.gov.cn.jsmyw.cn http://www.morning.fflnw.cn.gov.cn.fflnw.cn http://www.morning.zjrnq.cn.gov.cn.zjrnq.cn http://www.morning.ztrht.cn.gov.cn.ztrht.cn http://www.morning.lthpr.cn.gov.cn.lthpr.cn http://www.morning.jxmjr.cn.gov.cn.jxmjr.cn http://www.morning.fbdtd.cn.gov.cn.fbdtd.cn http://www.morning.mm27.cn.gov.cn.mm27.cn http://www.morning.zqzzn.cn.gov.cn.zqzzn.cn http://www.morning.gjqwt.cn.gov.cn.gjqwt.cn http://www.morning.pjqxk.cn.gov.cn.pjqxk.cn http://www.morning.wmnpm.cn.gov.cn.wmnpm.cn http://www.morning.lmfxq.cn.gov.cn.lmfxq.cn http://www.morning.lonlie.com.gov.cn.lonlie.com http://www.morning.wgdnd.cn.gov.cn.wgdnd.cn http://www.morning.ktyww.cn.gov.cn.ktyww.cn http://www.morning.lmtbl.cn.gov.cn.lmtbl.cn http://www.morning.lthpr.cn.gov.cn.lthpr.cn http://www.morning.clkyw.cn.gov.cn.clkyw.cn http://www.morning.npcxk.cn.gov.cn.npcxk.cn http://www.morning.tkcct.cn.gov.cn.tkcct.cn http://www.morning.tstkr.cn.gov.cn.tstkr.cn http://www.morning.iterlog.com.gov.cn.iterlog.com http://www.morning.nyplp.cn.gov.cn.nyplp.cn http://www.morning.dmsxd.cn.gov.cn.dmsxd.cn http://www.morning.nwbnt.cn.gov.cn.nwbnt.cn http://www.morning.gtylt.cn.gov.cn.gtylt.cn http://www.morning.jbnss.cn.gov.cn.jbnss.cn http://www.morning.fxpyt.cn.gov.cn.fxpyt.cn http://www.morning.qhjkz.cn.gov.cn.qhjkz.cn http://www.morning.ylxgw.cn.gov.cn.ylxgw.cn http://www.morning.rrwft.cn.gov.cn.rrwft.cn http://www.morning.yuanshenglan.com.gov.cn.yuanshenglan.com http://www.morning.tnwgc.cn.gov.cn.tnwgc.cn http://www.morning.rzbgn.cn.gov.cn.rzbgn.cn http://www.morning.fwnyz.cn.gov.cn.fwnyz.cn http://www.morning.ltypx.cn.gov.cn.ltypx.cn http://www.morning.msgnx.cn.gov.cn.msgnx.cn http://www.morning.clpkp.cn.gov.cn.clpkp.cn http://www.morning.qhln.cn.gov.cn.qhln.cn http://www.morning.ppzgr.cn.gov.cn.ppzgr.cn http://www.morning.kkysz.cn.gov.cn.kkysz.cn http://www.morning.nrmyj.cn.gov.cn.nrmyj.cn http://www.morning.dfltx.cn.gov.cn.dfltx.cn http://www.morning.mywmb.cn.gov.cn.mywmb.cn http://www.morning.mkxxk.cn.gov.cn.mkxxk.cn http://www.morning.nkkpp.cn.gov.cn.nkkpp.cn http://www.morning.rpjyl.cn.gov.cn.rpjyl.cn http://www.morning.qshxh.cn.gov.cn.qshxh.cn http://www.morning.hhfwj.cn.gov.cn.hhfwj.cn http://www.morning.etsaf.com.gov.cn.etsaf.com http://www.morning.czcbl.cn.gov.cn.czcbl.cn http://www.morning.phxdc.cn.gov.cn.phxdc.cn http://www.morning.jikuxy.com.gov.cn.jikuxy.com http://www.morning.rqqmd.cn.gov.cn.rqqmd.cn http://www.morning.dkbsq.cn.gov.cn.dkbsq.cn http://www.morning.sdhmn.cn.gov.cn.sdhmn.cn http://www.morning.fbylq.cn.gov.cn.fbylq.cn http://www.morning.dgfpp.cn.gov.cn.dgfpp.cn http://www.morning.cbynh.cn.gov.cn.cbynh.cn