医院网站建设方案计划书,手机app开发网站模板,梅州建网站,使用nas服务器建设网站DataStream API编程模型
1.【Flink-Scala】DataStream编程模型之数据源、数据转换、数据输出 2.【Flink-scala】DataStream编程模型之 窗口的划分-时间概念-窗口计算程序 文章目录 DataStream API编程模型前言1.触发器1.1 代码示例 2.驱逐器2.1 代码示例 总结 前言
本小节我想…DataStream API编程模型
1.【Flink-Scala】DataStream编程模型之数据源、数据转换、数据输出 2.【Flink-scala】DataStream编程模型之 窗口的划分-时间概念-窗口计算程序 文章目录 DataStream API编程模型前言1.触发器1.1 代码示例 2.驱逐器2.1 代码示例 总结 前言
本小节我想把 窗口计算中 的触发器和驱逐器讲完 然后开始水位线延迟数据处理状态编程等。 1.触发器
触发器决定了窗口何时由窗口计算函数进行处理。 触发器就类比枪的扳机触发后 计算函数开始计算计算函数在【Flink-scala】DataStream编程模型之 窗口的划分-时间概念-窗口计算程序
每个窗口分配器都带有一个默认触发器。如果默认触发器不能满足业务需求就需要自定义触发器。
实现自定义触发器的方法很简单只需要继承Trigger接口并实现它的方法即可。 Trigger接口有五种方法允许触发器对不同的事件作出反应 具体如下
onElement()方法每个元素被添加到窗口时调用onEventTime()方法当一个已注册的事件时间计时器启动时调用onProcessingTime()方法当一个已注册的处理时间计时器启动时调用onMerge()方法与状态性触发器相关当使用会话窗口时两个触发器对应的窗口合并时合并两个触发器的状态clear()方法执行任何需要清除的相应窗口。
触发器通过 TriggerContext 来管理和检查状态。 在触发器中我们通常会使用 状态 来记录窗口中的一些信息如已处理的事件数量或累计的值。这些状态决定了窗口是否应当触发计算。
触发器中的 TriggerResult 有几个重要的结果
CONTINUE表示窗口继续等待更多的事件不触发计算。 FIRE表示触发窗口计算并输出结果。 PURGE表示删除某些数据通常在某些特殊场景下使用。
1.1 代码示例
假设股票价格数据流连续不断到达系统现在需要对到达的数据进行监控每到达5条数据就触发计算。实现该功能的代码如下
import java.util.Calendar
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.api.common.state.ReducingStateDescriptor
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.source.RichSourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.{Trigger, TriggerResult}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import scala.util.Randomcase class StockPrice(stockId:String,timeStamp:Long,price:Double)object TriggerTest {def main(args: Array[String]) {//创建执行环境val env StreamExecutionEnvironment.getExecutionEnvironment//设置程序并行度env.setParallelism(1)//设置为处理时间env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)//创建数据源股票价格数据流val source env.socketTextStream(localhost, 9999)//指定针对数据流的转换操作逻辑val stockPriceStream source.map(s s.split(,)).map(sStockPrice(s(0).toString,s(1).toLong,s(2).toDouble))val sumStream stockPriceStream.keyBy(s s.stockId).timeWindow(Time.seconds(50)).trigger(new MyTrigger(5)).reduce((s1, s2) StockPrice(s1.stockId,s1.timeStamp, s1.price s2.price))//打印输出sumStream.print()//程序触发执行env.execute(Trigger Test)}class MyTrigger extends Trigger[StockPrice, TimeWindow] {//触发计算的最大数量private var maxCount: Long _//记录当前数量的状态
private lazy val countStateDescriptor: ReducingStateDescriptor[Long] new ReducingStateDescriptor[Long](counter, new Sum, classOf[Long])def this(maxCount: Int) {this()this.maxCount maxCount
}override def onProcessingTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult {TriggerResult.CONTINUE
}override def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult {TriggerResult.CONTINUE
} override def onElement(element: StockPrice, timestamp: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult {val countState ctx.getPartitionedState(countStateDescriptor)//计数状态加1countState.add(1L) if (countState.get() this.maxCount) {//达到指定指定数量 //清空计数状态countState.clear()//触发计算 TriggerResult.FIRE} else {TriggerResult.CONTINUE}}//窗口结束时清空状态override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit {println(窗口结束时清空状态)ctx.getPartitionedState(countStateDescriptor).clear()}//更新状态为累加值class Sum extends ReduceFunction[Long] {override def reduce(value1: Long, value2: Long): Long value1 value2} }
}
注意 这一行代码
val sumStream stockPriceStream.keyBy(s s.stockId).timeWindow(Time.seconds(50)).trigger(new MyTrigger(5)).reduce((s1, s2) StockPrice(s1.stockId,s1.timeStamp, s1.price s2.price))在该代码中MyTrigger 是一个自定义触发器它控制在窗口中积累一定数量的事件后触发计算。
具体来说窗口中的数据会根据 StockPrice 的数量来决定是否触发计算而不是依赖于时间 。
接下来分析代码
def this(maxCount: Int) {this()this.maxCount maxCount
}这是它的主构造函数它接受一个 maxCount 参数表示在触发器中窗口内允许的最大元素数量。也就是说窗口中的元素数达到 maxCount 时触发器会触发计算即调用 TriggerResult.FIRE。 .trigger(new MyTrigger(5))表示创建一个 MyTrigger 的实例并传入一个 maxCount 为 5 的参数意思是窗口中最大允许 5 个元素达到该数量后窗口会触发计算。
private lazy val countStateDescriptor: ReducingStateDescriptor[Long] new ReducingStateDescriptor[Long](counter, new Sum, classOf[Long])lazy表示这是一个延迟初始化的变量。只有在第一次使用 countStateDescriptor 时才会初始化它。这在性能优化上有作用避免了不必要的初始化开销。
ReducingStateDescriptor 是 Flink 提供的一个状态描述符它用于定义一个可减少的状态。这个状态会随着事件的到来不断累积并且可以执行自定义的聚合操作。在 ReducingStateDescriptor 中状态值的更新是通过 ReduceFunction 来实现的。
在这里ReducingStateDescriptor[Long] 定义了一个状态它的值是 Long 类型并且该状态将执行 聚合操作即对 Long 类型的值进行合并。
ReducingStateDescriptor 的构造函数接受三个参数
1.状态名称“counter” 这是该状态的名称用于在 Flink 的状态后端存储中标识该状态。
2.聚合操作new Sum 这是一个 ReduceFunction 的实例它定义了如何合并状态。在这个例子中Sum 类是一个自定义的 ReduceFunction用于对 Long 类型的值进行加法操作。
3.状态类型classOf[Long] 这是状态的类型。classOf[Long] 表示状态值的类型是 Long用于在 Flink 的状态管理中描述状态类型。
ClassOf(Long)这是 Scala 反射机制的语法用于获取 Long 类型的 Class 对象。它在这里用于指定状态值的类型以便 Flink 的状态管理能够正确地处理状态。
接下来的几个方法**TriggerResult.CONTINUE* 表示继续不触发计算。 接下来是onElement方法
val countState ctx.getPartitionedState(countStateDescriptor)然后计数器1加到5就触发。 看是否到达maxcount。到达就触发不到达就不触发
自己的疑惑才开始看代码的时候我一直纠结ctx ctx.getPartitionedState(countStateDescriptor)这是从哪里来的这个是
org.apache.flink.streaming.api.windowing.triggers.Trigger下面的实例化对象直接使用即可。
代码最后 Sum类它的作用是在状态更新时执行对状态的累加操作。
为什么用 Sum 作为累加器
由于我们在 Trigger 中的 onElement 方法使用了 ctx.getPartitionedState(countStateDescriptor) 来获取一个 ReducingState累加状态这个状态将会不断地被更新每次一个新元素进入时都会触发 reduce 操作。
Sum 类就是定义了如何对这个 ReducingState 状态进行累加操作。 ReduceFunction 提供了累加器的逻辑这样当多个元素进来时value1 和 value2 就会被相加最终在窗口中保持一个累积的状态。
2.驱逐器
学完上面的学个单词Evictor 驱逐器是Flink窗口机制的一个可选择组件。
驱逐 汉语意思就是 赶走作用就是对进入窗口前后的数据进行驱逐就是不接收不要你走
Flink内部实现了三种驱逐器包括CountEvictor、DeltaEvictor和TimeEvictor。
三种驱逐器的功能如下 CountEvictor保持在窗口中具有固定数量的记录将超过指定大小的数据在窗口计算之前删除
DeltaEvictor使用DeltaFunction和一个阈值来计算窗口缓冲区中的最后一个元素与其余每个元素之间的差值并删除差值大于或等于阈值的元素
TimeEvictor以毫秒为单位的时间间隔interval作为参数对于给定的窗口找到元素中的最大的时间戳max_ts并删除时间戳小于max_ts - interval的所有元素。
在使用窗口函数之前被逐出的元素将不被处理。 默认情况下所有内置的驱逐器在窗口函数之前使用。
和触发器一样用户也可以通过实现Evictor接口完成自定义的驱逐器。 自定义驱逐器时需要复写Evictor接口的两个方法 evictBefore()和evictAfter()。 其中evictBefore()方法定义数据在进入窗口函数计算之前执行驱逐操作的逻辑 evictAfter()方法定义数据在进入窗口函数计算之后执行驱逐操作的逻辑。
2.1 代码示例
这个代码在做的事统计窗口内股票价的平均值并删除小于0的记录
import java.time.Duration
import java.util
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.evictors.Evictor
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue
import org.apache.flink.util.Collectorcase class StockPrice(stockId:String,timeStamp:Long,price:Double)
object EvictorTest {def main(args: Array[String]) {//设置执行环境val env StreamExecutionEnvironment.getExecutionEnvironment//设置程序并行度env.setParallelism(1)//设置为处理时间env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)//创建数据源股票价格数据流val source env.socketTextStream(localhost, 9999)//指定针对数据流的转换操作逻辑val stockPriceStream source.map(s s.split(,)).map(sStockPrice(s(0).toString,s(1).toLong,s(2).toDouble))val sumStream stockPriceStream.assignTimestampsAndWatermarks(WatermarkStrategy//为了测试方便这里把水位线设置为0.forBoundedOutOfOrderness[StockPrice](Duration.ofSeconds(0)).withTimestampAssigner(new SerializableTimestampAssigner[StockPrice] {override def extractTimestamp(element: StockPrice, recordTimestamp: Long): Long element.timeStamp})).keyBy(s s.stockId).timeWindow(Time.seconds(3)).evictor(new MyEvictor()) //自定义驱逐器.process(new MyProcessWindowFunction()) //自定义窗口计算函数//打印输出sumStream.print() //程序触发执行 env.execute(Evictor Test)}class MyEvictor() extends Evictor[StockPrice, TimeWindow] {override def evictBefore(iterable: java.lang.Iterable[TimestampedValue[StockPrice]], i: Int, w: TimeWindow, evictorContext: Evictor.EvictorContext): Unit {val ite: util.Iterator[TimestampedValue[StockPrice]] iterable.iterator()while (ite.hasNext) {val elment: TimestampedValue[StockPrice] ite.next()println(驱逐器获取到的股票价格 elment.getValue().price)//模拟去掉非法参数数据if (elment.getValue().price 0) {println(股票价格小于0删除该记录)ite.remove()}}}
override def evictAfter(iterable: java.lang.Iterable[TimestampedValue[StockPrice]], i: Int, w: TimeWindow, evictorContext: Evictor.EvictorContext): Unit {
//不做任何操作}}class MyProcessWindowFunction extends ProcessWindowFunction[StockPrice, (String, Double), String, TimeWindow] {// 一个窗口结束的时候调用一次一个分组执行一次不适合大量数据全量数据保存在内存中会造成内存溢出override def process(key: String, context: Context, elements: Iterable[StockPrice], out: Collector[(String, Double)]): Unit {// 聚合注意整个窗口的数据保存到Iterable里面有很多行数据var sumPrice 0.0;elements.foreach(stock {sumPrice sumPrice stock.price})out.collect(key, sumPrice/elements.size)}}
}
myEvictor 是我们自定义的驱逐器类它实现了 Evictor[StockPrice, TimeWindow] 接口. 在Evictor类中定义了before和after两个方法。 这着重看beforeafter没做什么。
override def evictBefore(iterable: java.lang.Iterable[TimestampedValue[StockPrice]], i: Int, w: TimeWindow, evictorContext: Evictor.EvictorContext): Unit {val ite: util.Iterator[TimestampedValue[StockPrice]] iterable.iterator()while (ite.hasNext) {val elment: TimestampedValue[StockPrice] ite.next()println(驱逐器获取到的股票价格 elment.getValue().price)if (elment.getValue().price 0) {println(股票价格小于0删除该记录)ite.remove()}}参数的迭代器里每个元素都是TimestampedValue[StockPrice]。 i表示要处理的数据量。 evictorContext这是 Evictor.EvictorContext 类型它提供了访问驱逐器上下文的接口。在此方法中并未使用通常它可以用于管理驱逐操作的状态或者提供更多的上下文信息。 while里面的代码就不解释啦。
这里的参数太长且陌生多解释一下。 在窗口中运行输入
stock_1,1602031567000,8
stock_1,1602031568000,-4
stock_1,1602031569000,3
stock_1,1602031570000,-8
stock_1,1602031571000,9
stock_1,1602031572000,10输出后的结果是
驱逐器获取到的股票价格8.0
驱逐器获取到的股票价格-4.0
股票价格小于0删除该记录
(stock_1,8.0)
驱逐器获取到的股票价格3.0
驱逐器获取到的股票价格-8.0
股票价格小于0删除该记录
驱逐器获取到的股票价格9.0
(stock_1,6.0)
驱逐器获取到的股票价格10.0
(stock_1,10.0)驱逐器会在每个窗口开始时检查所有输入的事件并对那些满足特定条件的事件在本例中股票价格 0进行处理并移除。
stock_1, 1602031567000, 8 — 股票价格大于0保留。 stock_1, 1602031568000, -4 — 股票价格小于0驱逐。 stock_1, 1602031569000, 3 — 股票价格大于0保留。 stock_1, 1602031570000, -8 — 股票价格小于0驱逐。 stock_1, 1602031571000, 9 — 股票价格大于0保留。 stock_1, 1602031572000, 10 — 股票价格大于0保留。 驱逐器输出的处理结果 窗口 1时间区间1602031567000 - 1602031570000窗口大小 3秒
该窗口内剩下的有效记录stock_1, 1602031567000, 8 和 stock_1, 1602031569000, 3-4 和 -8 被驱逐。 计算平均价格(8 3) / 2 5.5 输出 (stock_1, 8.0)按窗口内第一个事件的 stockId 输出 窗口 2时间区间1602031569000 - 1602031572000窗口大小 3秒
该窗口内剩下的有效记录stock_1, 1602031569000, 3 和 stock_1, 1602031571000, 9-8 被驱逐。 计算平均价格(3 9) / 2 6.0 输出 (stock_1, 6.0)按窗口内第一个事件的 stockId 输出 窗口 3时间区间1602031571000 - 1602031574000窗口大小 3秒
该窗口内剩下的有效记录stock_1, 1602031571000, 9 和 stock_1, 1602031572000, 10。 计算平均价格(9 10) / 2 9.5 输出 (stock_1, 10.0)按窗口内第一个事件的 stockId 输出
输出后的结果是
驱逐器获取到的股票价格8.0
驱逐器获取到的股票价格-4.0
股票价格小于0删除该记录
(stock_1,8.0)
驱逐器获取到的股票价格3.0
驱逐器获取到的股票价格-8.0
股票价格小于0删除该记录
驱逐器获取到的股票价格9.0
(stock_1,6.0)
驱逐器获取到的股票价格10.0
(stock_1,10.0)总结
以上就是今天要讲的内容触发器和驱逐器。下一小节应该讲水位线啦。 文章转载自: http://www.morning.rzbgn.cn.gov.cn.rzbgn.cn http://www.morning.hcwlq.cn.gov.cn.hcwlq.cn http://www.morning.nldsd.cn.gov.cn.nldsd.cn http://www.morning.wmnpm.cn.gov.cn.wmnpm.cn http://www.morning.rxdsq.cn.gov.cn.rxdsq.cn http://www.morning.hysqx.cn.gov.cn.hysqx.cn http://www.morning.sxfnf.cn.gov.cn.sxfnf.cn http://www.morning.tckxl.cn.gov.cn.tckxl.cn http://www.morning.hxbps.cn.gov.cn.hxbps.cn http://www.morning.rwxnn.cn.gov.cn.rwxnn.cn http://www.morning.rksg.cn.gov.cn.rksg.cn http://www.morning.nlkm.cn.gov.cn.nlkm.cn http://www.morning.lekbiao.com.gov.cn.lekbiao.com http://www.morning.xltdh.cn.gov.cn.xltdh.cn http://www.morning.brcdf.cn.gov.cn.brcdf.cn http://www.morning.xprzq.cn.gov.cn.xprzq.cn http://www.morning.blxlf.cn.gov.cn.blxlf.cn http://www.morning.nkqnn.cn.gov.cn.nkqnn.cn http://www.morning.wzwyz.cn.gov.cn.wzwyz.cn http://www.morning.tblbr.cn.gov.cn.tblbr.cn http://www.morning.bklkt.cn.gov.cn.bklkt.cn http://www.morning.mkyny.cn.gov.cn.mkyny.cn http://www.morning.lkhgq.cn.gov.cn.lkhgq.cn http://www.morning.kpfds.cn.gov.cn.kpfds.cn http://www.morning.ynrzf.cn.gov.cn.ynrzf.cn http://www.morning.yrctp.cn.gov.cn.yrctp.cn http://www.morning.mtgnd.cn.gov.cn.mtgnd.cn http://www.morning.fqtdz.cn.gov.cn.fqtdz.cn http://www.morning.zmbzl.cn.gov.cn.zmbzl.cn http://www.morning.wzwpz.cn.gov.cn.wzwpz.cn http://www.morning.yfmlj.cn.gov.cn.yfmlj.cn http://www.morning.flncd.cn.gov.cn.flncd.cn http://www.morning.ngmjn.cn.gov.cn.ngmjn.cn http://www.morning.nqcwz.cn.gov.cn.nqcwz.cn http://www.morning.znpyw.cn.gov.cn.znpyw.cn http://www.morning.ffwrq.cn.gov.cn.ffwrq.cn http://www.morning.qcsbs.cn.gov.cn.qcsbs.cn http://www.morning.bfbl.cn.gov.cn.bfbl.cn http://www.morning.nhrkl.cn.gov.cn.nhrkl.cn http://www.morning.qstjr.cn.gov.cn.qstjr.cn http://www.morning.lnrr.cn.gov.cn.lnrr.cn http://www.morning.mprky.cn.gov.cn.mprky.cn http://www.morning.btqrz.cn.gov.cn.btqrz.cn http://www.morning.bcjbm.cn.gov.cn.bcjbm.cn http://www.morning.xhqr.cn.gov.cn.xhqr.cn http://www.morning.yxplz.cn.gov.cn.yxplz.cn http://www.morning.gxtfk.cn.gov.cn.gxtfk.cn http://www.morning.rnht.cn.gov.cn.rnht.cn http://www.morning.bscsp.cn.gov.cn.bscsp.cn http://www.morning.hphrz.cn.gov.cn.hphrz.cn http://www.morning.pwggd.cn.gov.cn.pwggd.cn http://www.morning.cfocyfa.cn.gov.cn.cfocyfa.cn http://www.morning.wjhdn.cn.gov.cn.wjhdn.cn http://www.morning.npgwb.cn.gov.cn.npgwb.cn http://www.morning.kfbth.cn.gov.cn.kfbth.cn http://www.morning.kqkmx.cn.gov.cn.kqkmx.cn http://www.morning.lqjpb.cn.gov.cn.lqjpb.cn http://www.morning.tzmjc.cn.gov.cn.tzmjc.cn http://www.morning.bhpsz.cn.gov.cn.bhpsz.cn http://www.morning.mjzcp.cn.gov.cn.mjzcp.cn http://www.morning.yydzk.cn.gov.cn.yydzk.cn http://www.morning.trjdr.cn.gov.cn.trjdr.cn http://www.morning.rkwwy.cn.gov.cn.rkwwy.cn http://www.morning.cxlys.cn.gov.cn.cxlys.cn http://www.morning.qkqzm.cn.gov.cn.qkqzm.cn http://www.morning.kwxr.cn.gov.cn.kwxr.cn http://www.morning.rongxiaoman.com.gov.cn.rongxiaoman.com http://www.morning.khcpx.cn.gov.cn.khcpx.cn http://www.morning.cfocyfa.cn.gov.cn.cfocyfa.cn http://www.morning.hpdpp.cn.gov.cn.hpdpp.cn http://www.morning.hbxnb.cn.gov.cn.hbxnb.cn http://www.morning.nkqnn.cn.gov.cn.nkqnn.cn http://www.morning.jcxqc.cn.gov.cn.jcxqc.cn http://www.morning.wjxtq.cn.gov.cn.wjxtq.cn http://www.morning.wwthz.cn.gov.cn.wwthz.cn http://www.morning.qqhmg.cn.gov.cn.qqhmg.cn http://www.morning.pzcjq.cn.gov.cn.pzcjq.cn http://www.morning.mfqmk.cn.gov.cn.mfqmk.cn http://www.morning.jyfrz.cn.gov.cn.jyfrz.cn http://www.morning.brcdf.cn.gov.cn.brcdf.cn