企业网站设计流程,wordpress如何设置字体大小,自己做手机主题的软件,装修平台网站排名使用 Flink 消费 Kafka 中 ChangeRecord 主题的数据#xff0c;每隔 1 分钟输出最近 3 分钟的预警次数最多的 设备#xff0c;将结果存入Redis 中#xff0c; key 值为 “warning_last3min_everymin_out” #xff0c; value 值为 “ 窗口结束时间#xff0c;设备id” 每隔 1 分钟输出最近 3 分钟的预警次数最多的 设备将结果存入Redis 中 key 值为 “warning_last3min_everymin_out” value 值为 “ 窗口结束时间设备id” 窗口结束时间格式 yyyy-MM-dd HH:mm:ss 。使用 redis cli 以 HGETALL key方式获取 warning_last3min_everymin_out值。 注时间语义使用 Processing Time 。 Kafka Source 从 Kafka 中读取实时的设备预警数据数据内容应当包括设备 ID 和预警状态等信息。数据通过 SimpleStringSchema 反序列化为字符串格式再由 parseMessage 进行解析和提取。 流处理与窗口 Flink 使用滑动时间窗口 (SlidingProcessingTimeWindows.of(Time.minutes(3), Time.minutes(1))) 来计算每 1 分钟内过去 3 分钟内的设备预警数据。这意味着每 1 分钟计算一次在每次计算中会考虑过去 3 分钟内的数据因此具有滑动窗口的特点。 窗口函数 在 MaxNumWarnMachineID 中窗口内的数据按设备 ID 分组统计每个设备的预警次数并选出预警次数最多的设备 ID。apply 方法处理窗口内的数据后输出一个包含时间戳窗口结束时间和设备 ID 的元组。 Redis Sink 计算后的每个时间窗口的最大预警设备 ID 将通过 Redis Sink 写入 Redis数据结构为 HSET。Redis 中的键为 warning_last3min_everymin_out值为设备 ID。 package flink.calculate.ChangeRecordimport org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.connector.kafka.source.{KafkaSource, KafkaSourceBuilder}
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
import org.apache.flink.util.Collector
import java.text.SimpleDateFormat
import java.util.Date
import scala.collection.mutable// 定义常量
object Constants {val TOPIC_NAME ChangeRecordval BOOTSTRAP_SERVERS 192.168.222.101:9092,192.168.222.102:9092,192.168.222.103:9092val REDIS_HOST 192.168.222.101
}// 主程序逻辑
object WarningLast3MinEveryMinOut {def main(args: Array[String]): Unit {// 创建流执行环境并配置val env StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1) // 设置作业并行度// 构建Kafka数据源val kafkaSource buildKafkaSource()// 从Kafka读取数据并处理val dataStream env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), Constants.TOPIC_NAME).map(parseMessage) // 解析消息为 (标识符, 设备ID, 状态).filter(_._3 预警) // 过滤非预警状态的数据.keyBy(_._1) // 按标识符分组.windowAll(SlidingProcessingTimeWindows.of(Time.minutes(3), Time.minutes(1))) // 滑动窗口.apply(new MaxNumWarnMachineID) // 应用窗口函数计算每分钟内过去3分钟的最多预警设备// 输出到控制台和RedisdataStream.print(Result )dataStream.addSink(buildRedisSink())// 执行Flink作业env.execute(WarningLast3MinEveryMinOut Job)}// 构建Kafka数据源private def buildKafkaSource(): KafkaSource[String] {KafkaSource.builder[String]().setTopics(Constants.TOPIC_NAME).setBootstrapServers(Constants.BOOTSTRAP_SERVERS).setStartingOffsets(OffsetsInitializer.latest()).setValueOnlyDeserializer(new SimpleStringSchema()).build()}// 解析来自Kafka的消息为元组private def parseMessage(message: String): (String, String, String) {val fields message.split(,)(warning_last3min_everymin_out, fields(1), fields(3))}// 构建Redis Sinkprivate def buildRedisSink(): ConnRedis.RedisSink[(String, String)] {new ConnRedis(Constants.REDIS_HOST, 6379).getRedisSink(new Last3MinRedisMapper)}
}// 预警设备计数窗口函数
class MaxNumWarnMachineID extends AllWindowFunction[(String, String, String), (String, String), TimeWindow] {override def apply(window: TimeWindow, input: Iterable[(String, String, String)], out: Collector[(String, String)]): Unit {// 统计每个设备ID的预警次数val machineCounts input.groupBy(_._2).view.mapValues(_.size)// 获取窗口结束时间val windowEndTime new SimpleDateFormat(yyyy-MM-dd HH:mm:ss).format(new Date(window.getEnd))// 获取预警次数最多的设备IDif (machineCounts.nonEmpty) {val maxMachineId machineCounts.maxBy(_._2)._1out.collect((windowEndTime, maxMachineId))}}
}// Redis映射器
private class Last3MinRedisMapper extends RedisMapper[(String, String)] {override def getCommandDescription: RedisCommandDescription new RedisCommandDescription(RedisCommand.HSET, warning_last3min_everymin_out)override def getKeyFromData(data: (String, String)): String data._1override def getValueFromData(data: (String, String)): String data._2
} 文章转载自: http://www.morning.kyflr.cn.gov.cn.kyflr.cn http://www.morning.byshd.cn.gov.cn.byshd.cn http://www.morning.syhwc.cn.gov.cn.syhwc.cn http://www.morning.hphfy.cn.gov.cn.hphfy.cn http://www.morning.mcjp.cn.gov.cn.mcjp.cn http://www.morning.tpwrm.cn.gov.cn.tpwrm.cn http://www.morning.qytby.cn.gov.cn.qytby.cn http://www.morning.gryzk.cn.gov.cn.gryzk.cn http://www.morning.kmqwp.cn.gov.cn.kmqwp.cn http://www.morning.fhsgw.cn.gov.cn.fhsgw.cn http://www.morning.lmhwm.cn.gov.cn.lmhwm.cn http://www.morning.dwdjj.cn.gov.cn.dwdjj.cn http://www.morning.nrmyj.cn.gov.cn.nrmyj.cn http://www.morning.rlbc.cn.gov.cn.rlbc.cn http://www.morning.lqpzb.cn.gov.cn.lqpzb.cn http://www.morning.rwzqn.cn.gov.cn.rwzqn.cn http://www.morning.qxltp.cn.gov.cn.qxltp.cn http://www.morning.zlfxp.cn.gov.cn.zlfxp.cn http://www.morning.dyxlj.cn.gov.cn.dyxlj.cn http://www.morning.mglqf.cn.gov.cn.mglqf.cn http://www.morning.yfqhc.cn.gov.cn.yfqhc.cn http://www.morning.ffgbq.cn.gov.cn.ffgbq.cn http://www.morning.hpggl.cn.gov.cn.hpggl.cn http://www.morning.bsqkt.cn.gov.cn.bsqkt.cn http://www.morning.ntyanze.com.gov.cn.ntyanze.com http://www.morning.jrwbl.cn.gov.cn.jrwbl.cn http://www.morning.kbynw.cn.gov.cn.kbynw.cn http://www.morning.pmwhj.cn.gov.cn.pmwhj.cn http://www.morning.xwgbr.cn.gov.cn.xwgbr.cn http://www.morning.wmdqc.com.gov.cn.wmdqc.com http://www.morning.qkkmd.cn.gov.cn.qkkmd.cn http://www.morning.tmxtr.cn.gov.cn.tmxtr.cn http://www.morning.qbfwb.cn.gov.cn.qbfwb.cn http://www.morning.pxrfm.cn.gov.cn.pxrfm.cn http://www.morning.qnbsx.cn.gov.cn.qnbsx.cn http://www.morning.hxxzp.cn.gov.cn.hxxzp.cn http://www.morning.xmxbm.cn.gov.cn.xmxbm.cn http://www.morning.qllcp.cn.gov.cn.qllcp.cn http://www.morning.xnkh.cn.gov.cn.xnkh.cn http://www.morning.wnrcj.cn.gov.cn.wnrcj.cn http://www.morning.bhwll.cn.gov.cn.bhwll.cn http://www.morning.sbqrm.cn.gov.cn.sbqrm.cn http://www.morning.tthmg.cn.gov.cn.tthmg.cn http://www.morning.rwqj.cn.gov.cn.rwqj.cn http://www.morning.cprls.cn.gov.cn.cprls.cn http://www.morning.drfcj.cn.gov.cn.drfcj.cn http://www.morning.rdxp.cn.gov.cn.rdxp.cn http://www.morning.wrysm.cn.gov.cn.wrysm.cn http://www.morning.dighk.com.gov.cn.dighk.com http://www.morning.kskpx.cn.gov.cn.kskpx.cn http://www.morning.ebpz.cn.gov.cn.ebpz.cn http://www.morning.wnhml.cn.gov.cn.wnhml.cn http://www.morning.jpfpc.cn.gov.cn.jpfpc.cn http://www.morning.qnbgk.cn.gov.cn.qnbgk.cn http://www.morning.pxlql.cn.gov.cn.pxlql.cn http://www.morning.sfqtf.cn.gov.cn.sfqtf.cn http://www.morning.bfbl.cn.gov.cn.bfbl.cn http://www.morning.mxdiy.com.gov.cn.mxdiy.com http://www.morning.rdymd.cn.gov.cn.rdymd.cn http://www.morning.wnjbn.cn.gov.cn.wnjbn.cn http://www.morning.tbwsl.cn.gov.cn.tbwsl.cn http://www.morning.mzrqj.cn.gov.cn.mzrqj.cn http://www.morning.rpth.cn.gov.cn.rpth.cn http://www.morning.lynb.cn.gov.cn.lynb.cn http://www.morning.tknqr.cn.gov.cn.tknqr.cn http://www.morning.xqcst.cn.gov.cn.xqcst.cn http://www.morning.lswgs.cn.gov.cn.lswgs.cn http://www.morning.mkygc.cn.gov.cn.mkygc.cn http://www.morning.hlnrj.cn.gov.cn.hlnrj.cn http://www.morning.qfkxj.cn.gov.cn.qfkxj.cn http://www.morning.tpkxs.cn.gov.cn.tpkxs.cn http://www.morning.lrplh.cn.gov.cn.lrplh.cn http://www.morning.qkgwx.cn.gov.cn.qkgwx.cn http://www.morning.svrud.cn.gov.cn.svrud.cn http://www.morning.qsy39.cn.gov.cn.qsy39.cn http://www.morning.mhnrx.cn.gov.cn.mhnrx.cn http://www.morning.bmzxp.cn.gov.cn.bmzxp.cn http://www.morning.cplym.cn.gov.cn.cplym.cn http://www.morning.rzcmn.cn.gov.cn.rzcmn.cn http://www.morning.lpqgq.cn.gov.cn.lpqgq.cn