直播网站开发,无备案网站可以做百度推广,常德网站制作公司,wordpress文章idFlink学习笔记 前言#xff1a;今天是学习 flink 的第 11 天啦#xff01;学习了 flink 四大基石之 Checkpoint #xff08;检查点#xff09;#xff0c;主要是解决大数据领域持久化中间结果数据#xff0c;以及取消任务#xff0c;下次启动人可以恢复累加数据问题今天是学习 flink 的第 11 天啦学习了 flink 四大基石之 Checkpoint 检查点主要是解决大数据领域持久化中间结果数据以及取消任务下次启动人可以恢复累加数据问题结合自己实验猜想和代码实践总结了很多自己的理解和想法希望和大家多多交流 Tips检查点检查的是历史记录过去的时间值得怀念未来的道路运气会继续累加能力会继续提升明天也要继续努力 文章目录 Flink学习笔记三、Flink 高级 API 开发4. Checkpoint4.1 Checkpoint 介绍4.1.1 流程简介4.1.2 单流的 barrier4.1.3 双流的 checkpoint 实现 4.2 持久化存储4.2.1 MemStateBackend4.2.2 FsStateBackend4.2.3 RocksDBStateBackend4.2.4 配置参数用法4.2.5 修改 State Backend 的两种方式简略新版本前(1) 单任务调整灵活(2) 全局调整 4.2.6 从传统后端迁移的三种情况详细新版本后(1) MemoryStateBackend(2) FsStateBackend(3) RocksDBStateBackend 4.6.7 Checkpoint 案例演示 4.3 Flink 的重启策略4.3.1 概述4.3.2 固定延迟重启策略(1) 全局配置 flink-conf.yaml(2) 代码设置 4.3.3 失败率重启策略(1) 全局配置 flink-conf.yaml(2) 代码设置 4.3.4 无重启策略(1) 全局配置 flink-conf.yaml(2) 代码设置 4.3.5 重启策略的案例演示 4.4 Savepoint4.4.1 应用场景4.4.2 面试问题4.4.3 案例演示 三、Flink 高级 API 开发
4. Checkpoint
4.1 Checkpoint 介绍
4.1.1 流程简介
1- Flink 检查点机制CheckpointCoordinator 协调者定期在数据流上生成 checkpoint barrier2- 当某个算子接收到 barrier数据栅栏 时会基于当前状态生成一份快照3- 后将 barrier 传递给下游算子依次快照并传递下去直到 sink4- 出现异常时根据最近的快照数据将所有算子恢复到之前状态5- CheckpointCoordinator 收到所有算子的报告后才认为该周期快照成功。 4.1.2 单流的 barrier
每个 barier 都带有快照 id并且 barrier之前的数据都进入了该快照一个数据源可以有多个 barrier工作独立互不干扰 4.1.3 双流的 checkpoint 实现
一个算子有两个数据源阻塞先收到 barrier 的快数据源等慢数据源接到相同编号的 barrier再制作自身快照。1快慢不一阻塞快源 2双源到齐制作快照 4.2 持久化存储
4.2.1 MemStateBackend
1- 将状态维护在 Java 堆上的一个内部状态后端2- 大小限制 默认状态限制 5Mb可以通过其构造函数增加大小状态大小 akka 帧大小akka 是并发框架状态大小 job manager 内存大小 3- 适用场景 debug 模式下使用不适合生产环境使用 4.2.2 FsStateBackend 1- 将状态存储在 [本地文件/ HDFS] 使用本地文件new FsStateBackend(“file:///Data”))不推荐集群间读文件难使用 HDFSnew FsStateBackend(“hdfs:///lql/checkpoint”)) 2- 适用场景 具有大状态长窗口大键 / 值状态的作业所有高可用性设置 3- 弊端 分布式文件持久化每次读写都会产生网络 IO整体性能不佳 4.2.3 RocksDBStateBackend 1- RockDB 是一种嵌入式的本地数据库默认是配置成异步快照不需要等待所有信号结束才开始状态拷贝 2- 适用场景 最适合用于处理大状态长窗口或大键值状态的有状态处理任务非常适合用于高可用方案 3- 也需要配置一个文件系统本地 / HDFS 4- RocksDBStateBackendflink1.13 唯一支持增量 checkpoint 的后端 4.2.4 配置参数用法
1- 设置checkpoint周期执行时间即两个 checkpoint 之间的间距 env.enableCheckpointing(5000); 2- 设置checkpoint的执行模式最多执行一次默认值或者至少执行一次 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)env.getCheckpointConifg().setCheckpointingMode(ChexkpointingMode.AT_LEAST_ONCE) 3- 设置checkpoint的超时时间即一个checkpoint操作的最大允许时间 env.getCheckpointConfig().setCheckpointTimeout(60000); 4- 如果在只做快照过程中出现错误是否让整体任务失败true是 false不是 env.getCheckpointConfig().setFailOnCheckpointingErrors(false); 5- 设置同一时间有多少 个checkpoint可以同时执行 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); 6- 任务被 cancel检查点被自动删除了保留以前做的 checkpoint 可以启动外部检查点持久化 env**.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.**RETAIN_ON_CANCELLATION)在作业取消时保留检查点注意您必须在取消后手动清理检查点状态ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION当作业被cancel时删除检查点检查点仅在作业失败时可用 4.2.5 修改 State Backend 的两种方式简略新版本前
(1) 单任务调整灵活
修改方案1env.setStateBackend(new FsStateBackend(“hdfs://node01:8020/flink/checkpoints”));修改方案2new MemoryStateBackend()修改方案3new RocksDBStateBackend(filebackend, true) 需要添加依赖 dependencygroupIdorg.apache.flink/groupIdartifactIdflink-statebackend-rocksdb_2.11/artifactIdversion1.7.0/version/dependency(2) 全局调整
修改flink-conf.yaml state.backend: filesystemstate.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints state.backend的值可以是下面几种 jobmanager (MemoryStateBackend)filesystem (FsStateBackend)rocksdb (RocksDBStateBackend) 4.2.6 从传统后端迁移的三种情况详细新版本后
从 Flink 1.13 版本开始社区重新设计了其公共状态后端类用户可以迁移现有应用程序以使用新 API而不会丢失任何状态或一致性。
(1) MemoryStateBackend
以前的 MemoryStateBackend 相当于使用 HashMapStateBackend 和 JobManagerCheckpointStorage
全局配置 flink-conf.yaml
state.backend: hashmap# Optional, Flink will automatically default to JobManagerCheckpointStorage
# when no checkpoint directory is specified.
state.checkpoint-storage: jobmanager代码设置
StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 因为使用新的类HashMapStateBackend
env.setStateBackend(new HashMapStateBackend());// 因为 JobManagerCheckpointStorage 即checkpoint 与 jobmanager 有关
env.getCheckpointConfig().setCheckpointStorage(new JobManagerStateBackend());(2) FsStateBackend
以前的 FsStateBackend 相当于使用 HashMapStateBackend 和 FileSystemCheckpointStorage
全局配置 flink-conf.yaml
state.backend: hashmap
state.checkpoints.dir: file:///checkpoint-dir/# Optional, Flink will automatically default to FileSystemCheckpointStorage
# when a checkpoint directory is specified.
state.checkpoint-storage: filesystem代码设置
StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 因为使用新的类HashMapStateBackend
env.setStateBackend(new HashMapStateBackend());// 因为 JobManagerCheckpointStorage 即checkpoint 与 jobmanager 有关
env.getCheckpointConfig().setCheckpointStorage(file:///checkpoint-dir);// 下面的设置更高级一点可以传入参数建议
// Advanced FsStateBackend configurations, such as write buffer size
// can be set by manually instantiating a FileSystemCheckpointStorage object.
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage(file:///checkpoint-dir));(3) RocksDBStateBackend
以前的 RocksDBStateBackend相当于使用 EmbeddedRocksDBStateBackend 和 FileSystemCheckpointStorage
全局配置 flink-conf.yaml
state.backend: rocksdb
state.checkpoints.dir: file:///checkpoint-dir/# Optional, Flink will automatically default to FileSystemCheckpointStorage
# when a checkpoint directory is specified.
state.checkpoint-storage: filesystem代码设置
StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 因为使用新的类EmbeddedRocksDBStateBackend
env.setStateBackend(new EmbeddedRocksDBStateBackend());env.getCheckpointConfig().setCheckpointStorage(file:///checkpoint-dir);// If you manually passed FsStateBackend into the RocksDBStateBackend constructor
// to specify advanced checkpointing configurations such as write buffer size,
// you can achieve the same results by using manually instantiating a FileSystemCheckpointStorage object.
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage(file:///checkpoint-dir));4.6.7 Checkpoint 案例演示
例子socket 数据源词频统计开启 checkpoint每隔 5s 写入 HDFS
package cn.itcast.day10.checkpoint;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** 读取服务器node01中端口9999的内容,并切分单词,统计数量* 要求: 开启checkpoint支持,每隔5s钟写入HDFS一次*/
public class StreamCheckpointDemo {public static void main(String[] args) throws Exception {//todo 1获取flink流处理的运行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();//todo 2开启checkpoint//每隔5s周期性的生成barrier栅栏默认情况下没有开启checkpointenv.enableCheckpointing(5000L);//设置checkpoint的超时时间env.getCheckpointConfig().setCheckpointTimeout(2000L);//同一个时间只能有一个栅栏在运行env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);//设置checkpoint的执行模式。仅执行一次env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//设置checkpoint最小时间间隔env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000L);//env.getCheckpointConfig().setFailOnCheckpointingErrors(false);//指定checkpoint的存储位置if(args.length 1){//env.setStateBackend(new FsStateBackend(file:///D:\\checkpoint));env.setStateBackend(new HashMapStateBackend());env.getCheckpointConfig().setCheckpointStorage(file:///D:\\checkpoint);}else{//env.setStateBackend(new FsStateBackend(args[0]));env.setStateBackend(new HashMapStateBackend());env.getCheckpointConfig().setCheckpointStorage(args[0]);} // 设置任务失败时候能够外部持久化检查点env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//todo 2接入数据源读取文件获取数据DataStreamSourceString lines env.socketTextStream(node01, 7777);//todo 3数据处理// 3.1使用flatMap对单词进行拆分SingleOutputStreamOperatorString words lines.flatMap(new FlatMapFunctionString, String() {Overridepublic void flatMap(String line, CollectorString out) throws Exception {String[] words line.split( );//返回数据for (String word : words) {out.collect(word);}}});// 3.2对拆分后的单词进行记一次数SingleOutputStreamOperatorTuple2String, Integer wordAndOne words.map(new MapFunctionString, Tuple2String, Integer() {Overridepublic Tuple2String, Integer map(String word) throws Exception {return Tuple2.of(word, 1);}});KeyedStreamTuple2String, Integer, String grouped wordAndOne.keyBy(t - t.f0);// 3.4对分组后的key进行聚合操作SingleOutputStreamOperatorTuple2String, Integer sumed grouped.sum(1);//todo 4构建sink输出结果sumed.print();//todo 5启动运行env.execute();}
}结果
将程序打包成jar包放在flink的提交页面
# 启动 jobmanager 和 taskmanager
bin/start-cluster.sh输入地址参数最终发现
1. 手动取消作业检查点文件没有消失
2. 输入hdfs地址参数检查点文件生成在hdfs目录上总结
理解配置参数修改后端的方法 4.3 Flink 的重启策略
4.3.1 概述
flink-conf.yaml 配置文件的 restart-strategy 配置参数决定重启策略。
重启策略重启策略值说明Fixed delayfixed-delay固定延迟重启策略Failure ratefailure-rate失败率重启策略No restartNone无重启策略 4.3.2 固定延迟重启策略
配置参数描述默认值restart-strategy.fixed-delay.attempts在 Job 最终失败前Flink 尝试执行的次数如果启用 checkpoint 的话是Integer.MAX_VALUErestart-strategy.fixed-delay.delay延迟重启指一个执行失败后不立即重启等待一段时间。akka.ask.timeout,如果启用checkpoint的话是1s
(1) 全局配置 flink-conf.yaml
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s(2) 代码设置
// 1. 初始化流处理环境
StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, // 尝试重启的次数Time.of(10, TimeUnit.SECONDS) // 延时
))4.3.3 失败率重启策略
配置参数描述默认值restart-strategy.failure-rate.max-failures-per-interval在一个Job认定为失败之前最大的重启次数1restart-strategy.fixed-delay.delay计算失败率的时间间隔1分钟restart-strategy.failure-rate.delay两次连续重启尝试之间的时间间隔akka.ask.timeout
(1) 全局配置 flink-conf.yaml
restart-strategy: failure-rate
restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.delay: 10 s意思就表明为
失败重启之间的间隔是10秒
如果5分钟内失败3次就不会在重启了直接结束任务(2) 代码设置
env.setRestartStrategy(RestartStrategies.failureRateRestart(3, // 每个测量时间间隔最大失败次数Time.of(5, TimeUnit.MINUTES), //失败率测量的时间间隔Time.of(10, TimeUnit.SECONDS) // 两次连续重启尝试的时间间隔
))4.3.4 无重启策略
(1) 全局配置 flink-conf.yaml
restart-strategy: none(2) 代码设置
// 直接失败不会重启
env.setRestartStrategy(RestartStrategies.noRestart());4.3.5 重启策略的案例演示
例子基于之前的单词统计案例改造,当遇到laowang字符串的时候,程序抛出异常,出现3次异常后,程序退出.
/*** 演示flink的重启策略* flink的重启策略是在配置了checkpoint的前提下不停的重启的重启如果不配置checkpoint不能使用重启策略作业直接停止* flink有三种重启策略的方式* 固定延迟重启策略* 设置失败重启的次数以及两次重启的时间间隔如设置重启失败次数是3次每次间隔5秒钟那么输入三次异常以后尝试重启三次第四次依然失败则作业停止* 失败率重启策略* 给定一定时间如果这个时间内设置了n次失败重启一旦超过了N次则作业停止如3分钟失败五次每次时间间隔10秒则任务结束* 无重启策略* 表示运行失败以后立刻停止作业运行*/
public class FixedDelayRestartStrategyDemo {public static void main(String[] args) throws Exception {/*** 实现步骤* 1初始化flink的流处理的运行环境* 2开启checkpint* 3配置重启策略* 4接入数据源* 5对字符串进行空格拆分每个单词记一次数* 6分组聚合* 7打印测试* 8运行作业*///TODO 1初始化flink流处理的运行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();//TODO 2开启checkpoint//周期性的生成barrier栅栏默认情况下checkpoint是没有开启的env.enableCheckpointing(5000L);//设置checkpoint的超时时间env.getCheckpointConfig().setCheckpointTimeout(6000L);//设置同一个时间只能有一个栅栏在运行env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);// 设置checkpoint的执行模式最多执行一次或者至少执行一次env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//Checkpointing最小时间间隔env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);//指定checkpoint的存储位置if(args.length 1) {env.setStateBackend(new FsStateBackend(file:///E:\\checkpoint));}else{env.setStateBackend(new FsStateBackend(args[0]));}env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//TODO 3配置重启策略//固定延迟重启策略程序出现异常的时候重启三次每次延迟五秒钟重启超过三次则程序退出env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.seconds(5)));//TODO 4接入数据源DataStreamSourceString socketTextStream env.socketTextStream(node1, 9999);//TODO 5对字符串进行空格拆分每个单词记一次数SingleOutputStreamOperatorTuple2String, Long wordAndOne socketTextStream.flatMap(new FlatMapFunctionString, Tuple2String, Long() {Overridepublic void flatMap(String line, CollectorTuple2String, Long collector) throws Exception {if(line.startsWith(laowang)){System.out.println(line);int i 1/0;System.out.println(i);throw new RuntimeException(老王驾到程序挂了);}String[] words line.split( );for (String word : words) {collector.collect(Tuple2.of(word, 1L));}}});//TODO 6分组聚合SingleOutputStreamOperatorTuple2String, Long sumed wordAndOne.keyBy(0).sum(1);//TODO 7打印测试sumed.print();//TODO 8运行作业env.execute();}
}总结
抛出异常throw new RuntimeException(); 4.4 Savepoint
4.4.1 应用场景
savepoint的目的是为了从上一次保存的中间结果中恢复过来比如在生产环境中运行着一个作业因为今晚要升级作业因此需要将生产环境的作业停止掉将升级后的jar进行部署和发布希望重新发布以后可以将上一个作业的运行结果恢复后继续运行 4.4.2 面试问题
checkpoint和savepoint的区别
checkpoint周期性定期运行生成barrier栅栏发送到job作业的每个算子当算子收到 barrier以后会将state的中间计算结果快照存储到分布式文件系统中savepoint将指定的checkpoint的结果恢复过来恢复到当前的作业中继续运行 4.4.3 案例演示
例子代码和之前一样
/*** savepoint的目的是为了从上一次保存的中间结果中恢复过来* 举例* 在生产环境中运行着一个作业因为今晚要升级作业因此需要将生产环境的作业停止掉将升级后的jar进行部署和发布* 希望重新发布以后可以将上一个作业的运行结果恢复后继续运行** 所以这时候可以使用savepoint进行解决这个问题问题** 面试问题* checkpoint和savepoint的区别* checkpoint周期性定期运行生成barrier栅栏发送到job作业的每个算子当算子收到barrier以后会将state的中间计算结果快照存储到分布式文件系统中* savepoint将指定的checkpoint的结果恢复过来恢复到当前的作业中继续运行** TODO 当作业重新递交的时候并行度发生了概念在flink1.10版本中可以正常的递交作业且能够恢复历史的累加结果* 但是之前版本中一旦发生并行度的变化作业无法递交*/
public class SavepointDemo {public static void main(String[] args) throws Exception {/*** 实现步骤* 1初始化flink流处理的运行环境* 2开启checkpoint* 3指定数据源* 4对字符串进行空格拆分然后每个单词记一次数* 5对每个单词进行分组聚合操作* 6) 打印测试* 7执行任务递交作业*///TODO 1初始化flink流处理的运行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();//TODO 2开启checkpoint//周期性的生成barrier栅栏默认情况下checkpoint是没有开启的env.enableCheckpointing(5000L);//设置checkpoint的超时时间env.getCheckpointConfig().setCheckpointTimeout(6000L);//设置同一个时间只能有一个栅栏在运行env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);// 设置checkpoint的执行模式最多执行一次或者至少执行一次env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//Checkpointing最小时间间隔env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);//指定checkpoint的存储位置if(args.length 1) {env.setStateBackend(new FsStateBackend(file:///E:\\checkpoint));}else{env.setStateBackend(new FsStateBackend(args[0]));}env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//TODO 3指定数据源DataStreamSourceString socketTextStream env.socketTextStream(node1, 9999);//TODO 4对字符串进行空格拆分然后每个单词记一次数SingleOutputStreamOperatorTuple2String, Long wordAndOne socketTextStream.flatMap(new FlatMapFunctionString, Tuple2String, Long() {Overridepublic void flatMap(String line, CollectorTuple2String, Long collector) throws Exception {String[] words line.split( );for (String word : words) {collector.collect(Tuple2.of(word, 1L));}}});//TODO 5对每个单词进行分组聚合操作SingleOutputStreamOperatorTuple2String, Long sumed wordAndOne.keyBy(0).sum(1);//TODO 6) 打印测试sumed.print();//TODO 7执行任务递交作业env.execute();}
}结果
(hadoop,4)
取消程序后再次运行
结果累加上一次取消任务的结果
(hadoop,5)总结
打包 jar 包到 flink 平台时不可以修改并行度并行度保持一致不然会报错指定 flink 平台提交任务的 savepoint 地址即指定为上一次取消程序的 checkpoint 地址即可恢复上一次的累加数据上一次取消程序的 checkpoint 地址flink 8081 web页面上 checkpoint 页面有历史记录地址哦
文章转载自: http://www.morning.fktlg.cn.gov.cn.fktlg.cn http://www.morning.cwwts.cn.gov.cn.cwwts.cn http://www.morning.eronghe.com.gov.cn.eronghe.com http://www.morning.fpqsd.cn.gov.cn.fpqsd.cn http://www.morning.wqpb.cn.gov.cn.wqpb.cn http://www.morning.rpwm.cn.gov.cn.rpwm.cn http://www.morning.llxyf.cn.gov.cn.llxyf.cn http://www.morning.dxgt.cn.gov.cn.dxgt.cn http://www.morning.cnvlog.cn.gov.cn.cnvlog.cn http://www.morning.bhrkx.cn.gov.cn.bhrkx.cn http://www.morning.ynlbj.cn.gov.cn.ynlbj.cn http://www.morning.nnpfz.cn.gov.cn.nnpfz.cn http://www.morning.plxnn.cn.gov.cn.plxnn.cn http://www.morning.fqklt.cn.gov.cn.fqklt.cn http://www.morning.jhrtq.cn.gov.cn.jhrtq.cn http://www.morning.wnbqy.cn.gov.cn.wnbqy.cn http://www.morning.ampingdu.com.gov.cn.ampingdu.com http://www.morning.xfxqj.cn.gov.cn.xfxqj.cn http://www.morning.ccsdx.cn.gov.cn.ccsdx.cn http://www.morning.gcqs.cn.gov.cn.gcqs.cn http://www.morning.mbrbg.cn.gov.cn.mbrbg.cn http://www.morning.hjlsll.com.gov.cn.hjlsll.com http://www.morning.fjglf.cn.gov.cn.fjglf.cn http://www.morning.xsjfk.cn.gov.cn.xsjfk.cn http://www.morning.rfbpq.cn.gov.cn.rfbpq.cn http://www.morning.pgzgy.cn.gov.cn.pgzgy.cn http://www.morning.lwbhw.cn.gov.cn.lwbhw.cn http://www.morning.qyglt.cn.gov.cn.qyglt.cn http://www.morning.zcfmb.cn.gov.cn.zcfmb.cn http://www.morning.yrsg.cn.gov.cn.yrsg.cn http://www.morning.hqsnt.cn.gov.cn.hqsnt.cn http://www.morning.gbpanel.com.gov.cn.gbpanel.com http://www.morning.xskbr.cn.gov.cn.xskbr.cn http://www.morning.jbqwb.cn.gov.cn.jbqwb.cn http://www.morning.rpkl.cn.gov.cn.rpkl.cn http://www.morning.gmjkn.cn.gov.cn.gmjkn.cn http://www.morning.tpqrc.cn.gov.cn.tpqrc.cn http://www.morning.wmqrn.cn.gov.cn.wmqrn.cn http://www.morning.qcsbs.cn.gov.cn.qcsbs.cn http://www.morning.rttkl.cn.gov.cn.rttkl.cn http://www.morning.bangaw.cn.gov.cn.bangaw.cn http://www.morning.dwmtk.cn.gov.cn.dwmtk.cn http://www.morning.jfymz.cn.gov.cn.jfymz.cn http://www.morning.xhxsr.cn.gov.cn.xhxsr.cn http://www.morning.tturfsoc.com.gov.cn.tturfsoc.com http://www.morning.pgmyn.cn.gov.cn.pgmyn.cn http://www.morning.mwkwg.cn.gov.cn.mwkwg.cn http://www.morning.mjglk.cn.gov.cn.mjglk.cn http://www.morning.hcrxn.cn.gov.cn.hcrxn.cn http://www.morning.xhddb.cn.gov.cn.xhddb.cn http://www.morning.sfgzx.cn.gov.cn.sfgzx.cn http://www.morning.qztsq.cn.gov.cn.qztsq.cn http://www.morning.zmnyj.cn.gov.cn.zmnyj.cn http://www.morning.vjwkb.cn.gov.cn.vjwkb.cn http://www.morning.ndynz.cn.gov.cn.ndynz.cn http://www.morning.zhqfn.cn.gov.cn.zhqfn.cn http://www.morning.ytmx.cn.gov.cn.ytmx.cn http://www.morning.dyght.cn.gov.cn.dyght.cn http://www.morning.lmyq.cn.gov.cn.lmyq.cn http://www.morning.rqjxc.cn.gov.cn.rqjxc.cn http://www.morning.jkwwm.cn.gov.cn.jkwwm.cn http://www.morning.yqqxj26.cn.gov.cn.yqqxj26.cn http://www.morning.hqgxz.cn.gov.cn.hqgxz.cn http://www.morning.tkztx.cn.gov.cn.tkztx.cn http://www.morning.hbqhz.cn.gov.cn.hbqhz.cn http://www.morning.bwkzn.cn.gov.cn.bwkzn.cn http://www.morning.qbgff.cn.gov.cn.qbgff.cn http://www.morning.ntnml.cn.gov.cn.ntnml.cn http://www.morning.mhbcy.cn.gov.cn.mhbcy.cn http://www.morning.yrjym.cn.gov.cn.yrjym.cn http://www.morning.lwwnq.cn.gov.cn.lwwnq.cn http://www.morning.bydpr.cn.gov.cn.bydpr.cn http://www.morning.rghkg.cn.gov.cn.rghkg.cn http://www.morning.yzsdp.cn.gov.cn.yzsdp.cn http://www.morning.qllcp.cn.gov.cn.qllcp.cn http://www.morning.fxpyt.cn.gov.cn.fxpyt.cn http://www.morning.qbpqw.cn.gov.cn.qbpqw.cn http://www.morning.pwlxy.cn.gov.cn.pwlxy.cn http://www.morning.hjwkq.cn.gov.cn.hjwkq.cn http://www.morning.qwwcf.cn.gov.cn.qwwcf.cn