电子商务网站建设的核心,网站前端工资,注册安全工程师条件,wordpress上传到哪个面试题#xff1a;Flink数据延迟怎么处理#xff1f;
将迟到数据直接丢弃【默认方案】将迟到数据收集起来另外处理#xff08;旁路输出#xff09;重新激活已经关闭的窗口并重新计算以修正结果#xff08;Lateness#xff09;
Flink数据延迟处理方案
用一个案例说明三…面试题Flink数据延迟怎么处理
将迟到数据直接丢弃【默认方案】将迟到数据收集起来另外处理旁路输出重新激活已经关闭的窗口并重新计算以修正结果Lateness
Flink数据延迟处理方案
用一个案例说明三种处理方式
举个例子左流跟右流按照5秒的时间窗口进行coGroup操作按单词进行关联超过5秒进行丢弃。
结果说明在Socket数据源输入 1005000 java 后会统计1005000时间戳之前的数据而在1005000时间戳之后输入的hello就没有被统计输出。当输入 1010000 xixi 后触发了第2个窗口只输出了java还是没有后输入的hello统计结果这也更明确了1005000时间戳之后输入的hello被丢弃了。
object MyCoGroupJoin {def main(args: Array[String]): Unit {//创建环境变量val env StreamExecutionEnvironment.getExecutionEnvironment//指定事件时间env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)//创建Socket源数据流内容格式 时间戳 单词val s1 env.socketTextStream(127.0.0.1, 9000)// 设置事件时间戳字段.assignAscendingTimestamps(_.split( )(0).toLong).map(line {val strs line.split( )(strs(0).toLong, strs(1))})//创建Socket源数据流内容格式 时间戳 单词val s2 env.socketTextStream(127.0.0.1, 8888)// 设置事件时间戳字段.assignAscendingTimestamps(_.split( )(0).toLong).map(line {val strs line.split( )(strs(0).toLong, strs(1))})//将两个数据流进行合并统计这里是将两数据流利用窗口进行单词拼串处理s1.coGroup(s2).where(_._2).equalTo(_._2).window(TumblingEventTimeWindows.of(Time.seconds(5))) //滚动窗口窗口大小5秒.apply(new CoGroupFunction[(Long, String), (Long, String), String] {override def coGroup(first: lang.Iterable[(Long, String)], second: lang.Iterable[(Long, String)], out: Collector[String]): Unit {//对两数据流中的数据进行循环遍历并拼串下发first.forEach(r1 {second.forEach(r2 {println(s${r1}::${r2})val str r1._2 r2._2out.collect(str)})})}}).print()env.execute(cogroupjoin)}
} 运行效果
左流
nc -l 99991001000 hello
1005000 java
1003000 hello [这条被丢弃了]
1010000 xixi右流
nc -l 88881002000 hello
1005000 java
1001000 hello [这条被丢弃了]
1010000 xixi程序控制台输出结果
(1001000,hello)::(1002000,hello)
4 hellohello
(1005000,java)::(1005000,java)
2 javajava
设置Watermark
时间语义
Event Time事件时间每条数据或事件自带的时间属性。由于时间属性依附于数据本身在高并发的情况下可能存在Event Time的到达为乱序的即一个较早发生的数据延迟到达Process Time处理时间对于某个算子来说Processing Time指算子使用当前机器的系统时钟时间Ingestion Time接入时间事件到达Flink Source的时间
Flink的三种时间语义中Processing Time和Ingestion Time都是基于Flink本身所产生的时间可以不用设置时间字段和Watermark。如果要使用Event Time以下两项配置缺一不可第一使用一个时间戳为数据流中每个事件的Event Time赋值第二生成Watermark。
Event Time是每个事件的元数据如果不设置Flink并不知道每个事件的发生时间我们必须要为每个事件的Event Time赋值一个时间戳。关于时间戳包括Flink在内的绝大多数系统都使用Unix时间戳系统Unix time或Unix epoch。Unix时间戳系统以1970-01-01 00:00:00.000 为起始点其他时间记为距离该起始时间的整数差值一般是毫秒millisecond精度。
有了Event Time时间戳我们还必须生成Watermark。Watermark是Flink插入到数据流中的一种特殊的数据结构它包含一个时间戳并假设后续不会有小于该时间戳的数据如果后续数据存在小于该时间戳的数据则视为延迟数据需另外处理。下图展示了一个乱序数据流其中方框是单个事件方框中的数字是其对应的Event Time时间戳圆圈为Watermark圆圈中的数字为Watermark对应的时间戳。 将之前的例子进行处理说明
object MyCoGroupJoin {def main(args: Array[String]): Unit {// 创建环境变量val env StreamExecutionEnvironment.getExecutionEnvironment// 指定事件时间env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)// 创建Socket源数据流内容格式 时间戳 单词val s1 env.socketTextStream(127.0.0.1, 9999)// 设置事件时间戳字段// .assignAscendingTimestamps(_.split( )(0).toLong)// 这里可以指定周期性产生WaterMark 或 间歇性产生WaterMark分别使用AssignerWithPeriodicWatermarks和AssignerWithPunctuatedWatermarks来实现// 这里使用周期性产生WaterMark,延长2秒.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[String](Time.seconds(2)) {override def extractTimestamp(element: String): Long {val strs element.split( )strs(0).toLong}}).map(line {val strs line.split( )(strs(0).toLong, strs(1))})// 创建Socket源数据流内容格式 时间戳 单词val s2 env.socketTextStream(127.0.0.1, 8888)// 设置事件时间戳字段// .assignAscendingTimestamps(_.split( )(0).toLong)// 这里可以指定周期性产生WaterMark 或 间歇性产生WaterMark分别使用AssignerWithPeriodicWatermarks和AssignerWithPunctuatedWatermarks来实现// 这里使用周期性产生WaterMark,延长2秒.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[String](Time.seconds(2)) {override def extractTimestamp(element: String): Long {val strs element.split( )strs(0).toLong}}).map(line {val strs line.split( )(strs(0).toLong, strs(1))})//将两个数据流进行合并统计这里是将两数据流利用窗口进行单词拼串处理s1.coGroup(s2).where(_._2).equalTo(_._2).window(TumblingEventTimeWindows.of(Time.seconds(5))) //滚动窗口窗口大小5秒.apply(new CoGroupFunction[(Long, String), (Long, String), String] {override def coGroup(first: lang.Iterable[(Long, String)], second: lang.Iterable[(Long, String)], out: Collector[String]): Unit {// 对两数据流中的数据进行循环遍历并拼串下发first.forEach(r1 {second.forEach(r2 {println(s${r1}::${r2})val str r1._2 r2._2out.collect(str)})})}}).print()env.execute(cogroupjoin)}
}
执行效果
左流
nc -l 9999
1001000 hello
1005000 java
1003000 hello
1007000 java右流
nc -l 8888
1002000 hello
1005000 java
1001000 hello
1007000 java程序控制台输出
(1001000,hello)::(1002000,hello)
4 hellohello
(1001000,hello)::(1001000,hello)
4 hellohello
(1003000,hello)::(1002000,hello)
4 hellohello
(1003000,hello)::(1001000,hello)
4 hellohello
当我们使用Watermark后我们可以发现在两个Socket终端输入1005000 java时控制台并没有立刻统计输出信息。而是在两个Socket终端输入 1007000 java后控制台才将统计结果输出出来且在时间戳1005000之后输入的hello也同时给统计出来了上面的问题可以解决了但是 1007000 java 之后我们再输入 hello 你会发现还是存在问题没有输出又给丢弃了。继续测试如下。 左流
nc -l 9999
1001000 hello
1005000 java
1003000 hello
1007000 java
1003000 hello
1012000 spark右流
nc -l 8888
1002000 hello
1005000 java
1001000 hello
1007000 java
1004000 hello
1012000 spark程序执行控制台输出结果
(1001000,hello)::(1002000,hello)
4 hellohello
(1001000,hello)::(1001000,hello)
4 hellohello
(1003000,hello)::(1002000,hello)
4 hellohello
(1003000,hello)::(1001000,hello)
4 hellohello
(1005000,java)::(1005000,java)
2 javajava
(1005000,java)::(1007000,java)
2 javajava
(1007000,java)::(1005000,java)
2 javajava
(1007000,java)::(1007000,java)
2 javajava
所以waterMark只能在一定程度上解决这种问题。我们再来看看allowedLateness机制。
设置Lateness
object MyCoGroupJoin {def main(args: Array[String]): Unit {// 创建环境变量val env StreamExecutionEnvironment.getExecutionEnvironment// 指定事件时间env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)// 创建Socket源数据流内容格式 时间戳 单词val s1 env.socketTextStream(127.0.0.1, 9999)// 设置事件时间戳字段// .assignAscendingTimestamps(_.split( )(0).toLong)// 这里可以指定周期性产生WaterMark 或 间歇性产生WaterMark分别使用AssignerWithPeriodicWatermarks和AssignerWithPunctuatedWatermarks来实现// 这里使用周期性产生WaterMark,延长2秒.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[String](Time.seconds(2)) {override def extractTimestamp(element: String): Long {val strs element.split( )strs(0).toLong}}).map(line {val strs line.split( )(strs(0).toLong, strs(1))})// 创建Socket源数据流内容格式 时间戳 单词val s2 env.socketTextStream(127.0.0.1, 8888)// 设置事件时间戳字段// .assignAscendingTimestamps(_.split( )(0).toLong)// 这里可以指定周期性产生WaterMark 或 间歇性产生WaterMark分别使用AssignerWithPeriodicWatermarks和AssignerWithPunctuatedWatermarks来实现// 这里使用周期性产生WaterMark,延长2秒.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[String](Time.seconds(2)) {override def extractTimestamp(element: String): Long {val strs element.split( )strs(0).toLong}}).map(line {val strs line.split( )(strs(0).toLong, strs(1))})//将两个数据流进行合并统计这里是将两数据流利用窗口进行单词拼串处理s1.coGroup(s2).where(_._2).equalTo(_._2).window(TumblingEventTimeWindows.of(Time.seconds(5))) //滚动窗口窗口大小5秒// 允许数据迟到2秒窗口触发后2秒内过来的数据还可以重新被计算.allowedLateness(Time.seconds(2)).apply(new CoGroupFunction[(Long, String), (Long, String), String] {override def coGroup(first: lang.Iterable[(Long, String)], second: lang.Iterable[(Long, String)], out: Collector[String]): Unit {// 对两数据流中的数据进行循环遍历并拼串下发first.forEach(r1 {second.forEach(r2 {println(s${r1}::${r2})val str r1._2 r2._2out.collect(str)})})}}).print()env.execute(cogroupjoin)}
}
执行效果
左流
nc -l 9999
1001000 hello
1007000 java右流
nc -l 8888
1002000 hello
1007000 java
1003000 hello程序执行控制台输出结果
(1001000,hello)::(1002000,hello)
4 hellohello
(1001000,hello)::(1002000,hello)
4 hellohello
(1001000,hello)::(1003000,hello)
4 hellohello
到这里估计有朋友又有疑问了allowedLateness机制解决数据延迟设置的时间段那之后再来的延迟数据呢还是被丢弃了并没有彻底解决问题。别慌针对allowedLateness机制之后来的延迟数据Flink还提供了另一种方案就是sideOutput机制。
旁路输出
Side Output简单来说就是在程序执行过程中,将主流stream流中的不同的业务类型或者不同条件的数据分别输出到不同的地方。如果我们想对没能及时在Flink窗口计算的延迟数据专门处理也就是窗口已经计算了但后面才来的数据专门处理我们可以使用旁路输出到侧流中去处理。
object MyCoGroupJoin {def main(args: Array[String]): Unit {// 创建环境变量val env StreamExecutionEnvironment.getExecutionEnvironment// 指定事件时间env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)// 创建Socket源数据流内容格式 时间戳 单词val s1 env.socketTextStream(127.0.0.1, 9999)// 这里可以指定周期性产生WaterMark 或 间歇性产生WaterMark分别使用AssignerWithPeriodicWatermarks和AssignerWithPunctuatedWatermarks来实现// 这里使用周期性产生WaterMark,延长2秒.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[String](Time.seconds(2)) {override def extractTimestamp(element: String): Long {val strs element.split( )strs(0).toLong}}).map(line {val strs line.split( )(strs(0).toLong, strs(1))})// 定义一个侧输出流val lateData: OutputTag[(Long, String)] new OutputTag[(Long, String)](late)val s2 s1.timeWindowAll(Time.seconds(5))//允许迟到2秒数据.allowedLateness(Time.seconds(2))//迟到长于2秒的数据将被保存到lateData侧数据流中.sideOutputLateData(lateData).process(new ProcessAllWindowFunction[(Long, String), (Long, String), TimeWindow] {override def process(context: Context, elements: Iterable[(Long, String)], out: Collector[(Long, String)]): Unit {elements.foreach(ele {out.collect(ele)})}})s2.print(主流)s2.getSideOutput(lateData).print(侧流)env.execute(cogroupjoin)}
}
数据流
nc -l 9999
1001000 hello
1005000 java
1007000 python
1002000 hello
1009000 java
1001000 xixi
1002000 haha程序执行控制台输出结果
主流:10 (1001000,hello)
主流:11 (1001000,hello)
主流:12 (1002000,hello)
侧流:1 (1001000,xixi)
侧流:2 (1002000,haha)
通过上面测试可以发现晚于allowedLateness机制的延迟数据Flink没有丢弃而是输出到了侧输出流中等待处理了这样延迟数据就完美解决了。
双流Join中的数据延迟处理
数据质量问题流式数据到达计算引擎的时间不一定比如 A 流的数据先到了A 流不知道 B 流对应同 key 的数据什么时候到没法关联数据时效问题流式数据不知何时、下发怎样的数据A 流的数据到达后如果 B 流的数据永远不到那么 A 流的数据在什么时候以及是否要填充一个 null 值下发下去。
Window Join
Flink Window Join。就是将两条流的数据从无界数据变为有界数据即划分出时间窗口然后将同一时间窗口内的两条流的数据做 Join这里的时间窗口支持 Tumbling、Sliding、Session。
解法说明
流式数据到达计算引擎的时间不一定数据已经被划分为窗口无界数据变为有界数据就和离线批处理的方式一样了两个窗口的数据简单的进行关联即可流式数据不知何时、下发怎样的数据窗口结束就把数据下发下去关联到的数据就下发 [A, B]没有关联到的数据取决于是否是 outer join 然后进行数据下发
Tumbling Sliding Session Flink SQLFlink 1.14 版本 Window TVF 中支持
SELECT L.num as L_Num, L.id as L_Id, R.num as R_Num, R.id as R_Id, L.window_start, L.window_end
FROM (SELECT * FROM TABLE(TUMBLE(TABLE LeftTable, DESCRIPTOR(row_time), INTERVAL 5 MINUTES))
) L
FULL JOIN (SELECT * FROM TABLE(TUMBLE(TABLE RightTable, DESCRIPTOR(row_time), INTERVAL 5 MINUTES))
) R
ON L.num R.num
AND L.window_start R.window_start
AND L.window_end R.window_end;
方案特点
⭐ 产出数据质量低 ⭐ 产出数据时效性中 当我们的窗口大小划分的越细时在窗口边缘关联不上的数据就会越多数据质量就越差。窗口大小划分的越宽时窗口内关联上的数据就会越多数据质量越好但是产出时效性就会越差。所以要注意取舍。
举个例子以曝光关联点击来说如果我们划分的时间窗口为 1 分钟那么一旦出现曝光在 0:59点击在 1:01 的情况就会关联不上当我们的划分的时间窗口 1 小时时只有在每个小时的边界处的数据才会出现关联不上的情况。
适用场景
该种解决方案适用于可以评估出窗口内的关联率高的场景如果窗口内关联率不高则不建议使用。注意这种方案由于上面说到的数据质量和时效性问题在实际生产环境中很少使用。
Interval Join
Interval Join。其也是将两条流的数据从无界数据变为有界数据但是这里的有界和Window Join 的有界的概念是不一样的这里的有界是指两条流之间的有界。
以 A 流 join B 流举例interval join 可以让 A 流可以关联 B 流一段时间区间内的数据比如 A 流关联 B 流前后 5 分钟的数据。 解法说明
流式数据到达计算引擎的时间不一定数据已经被划分为窗口无界数据变为有界数据就和离线批处理的方式一样了两个窗口的数据简单的进行关联即可流式数据不知何时、下发怎样的数据窗口结束这里的窗口结束是指 interval 区间结束区间的结束是利用 watermark 来判断的就把数据下发下去关联到的数据就下发 [A, B]没有关联到的数据取决于是否是 outer join 然后进行数据下发
CREATE TABLE show_log_table (log_id BIGINT,show_params STRING,row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),WATERMARK FOR row_time AS row_time) WITH (connector datagen,rows-per-second 1,fields.show_params.length 1,fields.log_id.min 1,fields.log_id.max 10);CREATE TABLE click_log_table (log_id BIGINT,click_params STRING,row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),WATERMARK FOR row_time AS row_time)WITH (connector datagen,rows-per-second 1,fields.click_params.length 1,fields.log_id.min 1,fields.log_id.max 10);CREATE TABLE sink_table (s_id BIGINT,s_params STRING,c_id BIGINT,c_params STRING) WITH (connector print);INSERT INTO sink_tableSELECTshow_log_table.log_id as s_id,show_log_table.show_params as s_params,click_log_table.log_id as c_id,click_log_table.click_params as c_paramsFROM show_log_table FULL JOIN click_log_table ON show_log_table.log_id click_log_table.log_idAND show_log_table.row_time BETWEEN click_log_table.row_time - INTERVAL 5 SECOND AND click_log_table.row_time
方案特点
⭐ 产出数据质量中 ⭐ 产出数据时效性中 interval join 的方案比 window join 方案在数据质量上好很多但是其也是存在 join 不到的情况的。并且如果为 outer join 的话outer 一测的流数据需要要等到区间结束才能下发。
适用场景
该种解决方案适用于两条流之间可以明确评估出相互延迟的时间是多久的这里我们可以使用离线数据进行评估使用离线数据的两条流的时间戳做差得到一个分布区间。
比如在 A 流和 B 流时间戳相差在 1min 之内的有 95%在 1-4 min 之内的有 4.5%则我们就可以认为两条流数据时间相差在 4 min 之内的有 99.5%这时我们将上下界设置为 4min 就是一个能保障 0.5% 误差的合理区间。
注意这种方案在生产环境中还是比较常用的。
Regular Join
Regular Join。上面两节说的两种 Join 都是基于划分窗口将无界数据变为有界数据进行关联机制但是本节说的 regular join 则还是基于无界数据进行关联。
以 A 流 left join B 流举例A 流数据到来之后直接去尝试关联 B 流数据。 1. 如果关联到了则直接下发关联到的数据 2. 如果没有关联到则也直接下发没有关联到的数据后续 B 流中的数据到来之后会把之前下发下去的没有关联到数据撤回然后把关联到的数据数据进行下发。由此可以看出这是基于 Flink SQL 的 retract 机制则也就说明了其目前只支持 Flink SQL。
解法说明 流式数据到达计算引擎的时间不一定两条流的数据会尝试关联能关联到直接下发关联不到先下发一个目前的结果数据 流式数据不知何时、下发怎样的数据两条流的数据会尝试关联能关联到直接下发关联不到先下发一个目前的结果数据
CREATE TABLE show_log_table (log_id BIGINT,show_params STRING
) WITH (connector datagen,rows-per-second 1,fields.show_params.length 3,fields.log_id.min 1,fields.log_id.max 10
);CREATE TABLE click_log_table (log_id BIGINT,click_params STRING
)
WITH (connector datagen,rows-per-second 1,fields.click_params.length 3,fields.log_id.min 1,fields.log_id.max 10
);CREATE TABLE sink_table (s_id BIGINT,s_params STRING,c_id BIGINT,c_params STRING
) WITH (connector print
);INSERT INTO sink_table
SELECTshow_log_table.log_id as s_id,show_log_table.show_params as s_params,click_log_table.log_id as c_id,click_log_table.click_params as c_params
FROM show_log_table
LEFT JOIN click_log_table ON show_log_table.log_id click_log_table.log_id;
方案特点
产出数据质量高 产出数据时效性高 数据质量和时效性高的原因都是因为 regular join 会保障目前 Flink 任务已经接收到的数据中能关联的一定是关联上的即使关联不上数据也会下发完完全全保障了当前数据的客观性和时效性。
适用场景
该种解决方案虽然是目前在产出质量、时效性上最好的一种解决方案但是在实际场景中使用时也存在一些问题
基于 retract 机制所有的数据都会存储在 state 中以判断能否关联到所以我们要设置合理的 state ttl 来避免大 state 问题导致的任务不稳定 基于 retract 机制所以在数据发生更新时会下发回撤数据、最新数据 2 条消息当我们的关联层级越多则下发消息量的也会放大 sink 组件要支持 retract我们不要忘了最终数据是要提供数据服务给需求方进行使用的所以我们最终写入的数据组件也需要支持 retract比如 MySQL。如果写入的是 Kafka则下游消费这个 Kafka 的引擎也需要支持回撤\更新机制。 文章转载自: http://www.morning.wgqtt.cn.gov.cn.wgqtt.cn http://www.morning.dzzjq.cn.gov.cn.dzzjq.cn http://www.morning.xpwdf.cn.gov.cn.xpwdf.cn http://www.morning.bloao.com.gov.cn.bloao.com http://www.morning.nhgfz.cn.gov.cn.nhgfz.cn http://www.morning.wclxm.cn.gov.cn.wclxm.cn http://www.morning.xysxj.com.gov.cn.xysxj.com http://www.morning.jjzbx.cn.gov.cn.jjzbx.cn http://www.morning.knlgk.cn.gov.cn.knlgk.cn http://www.morning.pnljy.cn.gov.cn.pnljy.cn http://www.morning.nqwkn.cn.gov.cn.nqwkn.cn http://www.morning.fgxnb.cn.gov.cn.fgxnb.cn http://www.morning.hrtwt.cn.gov.cn.hrtwt.cn http://www.morning.cwjxg.cn.gov.cn.cwjxg.cn http://www.morning.gsjfn.cn.gov.cn.gsjfn.cn http://www.morning.tnwgc.cn.gov.cn.tnwgc.cn http://www.morning.clyhq.cn.gov.cn.clyhq.cn http://www.morning.nrbcx.cn.gov.cn.nrbcx.cn http://www.morning.nrqnj.cn.gov.cn.nrqnj.cn http://www.morning.sjwqr.cn.gov.cn.sjwqr.cn http://www.morning.wmdqc.com.gov.cn.wmdqc.com http://www.morning.trrpb.cn.gov.cn.trrpb.cn http://www.morning.c7625.cn.gov.cn.c7625.cn http://www.morning.dzzjq.cn.gov.cn.dzzjq.cn http://www.morning.ptwrz.cn.gov.cn.ptwrz.cn http://www.morning.yxlpj.cn.gov.cn.yxlpj.cn http://www.morning.prgnp.cn.gov.cn.prgnp.cn http://www.morning.syhwc.cn.gov.cn.syhwc.cn http://www.morning.gwkjg.cn.gov.cn.gwkjg.cn http://www.morning.xzsqb.cn.gov.cn.xzsqb.cn http://www.morning.zlmbc.cn.gov.cn.zlmbc.cn http://www.morning.zyslyq.cn.gov.cn.zyslyq.cn http://www.morning.xpgwz.cn.gov.cn.xpgwz.cn http://www.morning.ygrdb.cn.gov.cn.ygrdb.cn http://www.morning.wbqt.cn.gov.cn.wbqt.cn http://www.morning.skscy.cn.gov.cn.skscy.cn http://www.morning.plydc.cn.gov.cn.plydc.cn http://www.morning.hrpmt.cn.gov.cn.hrpmt.cn http://www.morning.qxmys.cn.gov.cn.qxmys.cn http://www.morning.bnylg.cn.gov.cn.bnylg.cn http://www.morning.vjwkb.cn.gov.cn.vjwkb.cn http://www.morning.gqjqf.cn.gov.cn.gqjqf.cn http://www.morning.fhyhr.cn.gov.cn.fhyhr.cn http://www.morning.fflnw.cn.gov.cn.fflnw.cn http://www.morning.zknxh.cn.gov.cn.zknxh.cn http://www.morning.bhdtx.cn.gov.cn.bhdtx.cn http://www.morning.stflb.cn.gov.cn.stflb.cn http://www.morning.msmtf.cn.gov.cn.msmtf.cn http://www.morning.qgjwx.cn.gov.cn.qgjwx.cn http://www.morning.nysjb.cn.gov.cn.nysjb.cn http://www.morning.qhmgq.cn.gov.cn.qhmgq.cn http://www.morning.pxmyw.cn.gov.cn.pxmyw.cn http://www.morning.bccls.cn.gov.cn.bccls.cn http://www.morning.bfwk.cn.gov.cn.bfwk.cn http://www.morning.ksggr.cn.gov.cn.ksggr.cn http://www.morning.ywqsk.cn.gov.cn.ywqsk.cn http://www.morning.mzskr.cn.gov.cn.mzskr.cn http://www.morning.lfbzg.cn.gov.cn.lfbzg.cn http://www.morning.slqzb.cn.gov.cn.slqzb.cn http://www.morning.qzqjz.cn.gov.cn.qzqjz.cn http://www.morning.dqrpz.cn.gov.cn.dqrpz.cn http://www.morning.kwksj.cn.gov.cn.kwksj.cn http://www.morning.skkln.cn.gov.cn.skkln.cn http://www.morning.lqypx.cn.gov.cn.lqypx.cn http://www.morning.jrsgs.cn.gov.cn.jrsgs.cn http://www.morning.qrgfw.cn.gov.cn.qrgfw.cn http://www.morning.bjsites.com.gov.cn.bjsites.com http://www.morning.lqljj.cn.gov.cn.lqljj.cn http://www.morning.shsh1688.com.gov.cn.shsh1688.com http://www.morning.ngdkn.cn.gov.cn.ngdkn.cn http://www.morning.skqfx.cn.gov.cn.skqfx.cn http://www.morning.rqkk.cn.gov.cn.rqkk.cn http://www.morning.xsfny.cn.gov.cn.xsfny.cn http://www.morning.ysmw.cn.gov.cn.ysmw.cn http://www.morning.fgppj.cn.gov.cn.fgppj.cn http://www.morning.dfrenti.com.gov.cn.dfrenti.com http://www.morning.wfjrl.cn.gov.cn.wfjrl.cn http://www.morning.gcspr.cn.gov.cn.gcspr.cn http://www.morning.pjwrl.cn.gov.cn.pjwrl.cn http://www.morning.jbfjp.cn.gov.cn.jbfjp.cn