dedecms搭建购物网站,ie浏览器网页版,域名购买万网,制作公司网页图片1.Flink的特点
Apache Flink 是一个框架和分布式处理引擎#xff0c;用于对无界和有界数据流进行状态计算。 Flink主要特点如下#xff1a; 高吞吐和低延迟。每秒处理数百万个事件#xff0c;毫秒级延迟。结果的准确性。Flink提供了事件时间(event--time)和处理时间(proces…1.Flink的特点
Apache Flink 是一个框架和分布式处理引擎用于对无界和有界数据流进行状态计算。 Flink主要特点如下 高吞吐和低延迟。每秒处理数百万个事件毫秒级延迟。结果的准确性。Flink提供了事件时间(event--time)和处理时间(processing-time)语义。
对于乱序事件流事件时间语义仍然能提供一致且准确的结果。 精确一次(exactly-once)的状态一致性保证。可以连接到最常用的外部系统如Kafka、Hive、JDBC、HDFS、Redis等。高可用。本身高可用的设置加上与K8s,YARN和Mesos的紧密集成再加上从故障中
快速恢复和动态扩展任务的能力Flink能做到以极少的停机时间7×24全天候运行。 2.分层API 有状态流处理通过底层API〔处理函数对最原始数据加工处理。底层API与DataStream API相集成可以处理复杂的计算。 DataStream API流处理和DataSet API批处理封装了底层处理函数提供了通用的模块比如转换transformations,包括map、flatmap等连接(joins),聚合(aggregations),窗口(windows)操作等。注意Flink1.l2以后DataStream API已经实现真正的流批一体所以DataSet API已经过时。 Table API是以表为中心的声明式编程其中表可能会动态变化。Table API遵循关系模型表有二维数据结构类似于关系数据库中的表同时API提供可比较的操作例如select,project、join,group-by,aggregate等。我们可以在表与DataStream/DataSet之间无缝切换以允许程序将Table API与DataStream以及DataSet混合使用。 SQL这一层在语法与表达能力上与Table API类似但是是以SQL查询表达式的形式表现程序。SQL抽象与Table API交互密切同时SQL查询可以直接在Table API定义的表上执行。
3.流式数据批量数据
批处理的特点是有界、大量非常适合需要访问全套记录才能完成的计算工作一般用于离线统计。
流处理的特点是无界、实时, 无需针对整个数据集执行操作而是对通过系统传输的每个数据项执行操作一般用于实时统计。
在spark的世界观中一切都是由批次组成的离线数据是一个大批次而实时数据是由一个一个无限的小批次组成的。
而在flink的世界观中一切都是由流组成的离线数据是有界限的流实时数据是一个没有界限的流这就是所谓的有界流和无界流。 无界数据流
无界数据流有一个开始但是没有结束它们不会在生成时终止并提供数据必须连续处理无界流也就是说必须在获取后立即处理event。对于无界数据流我们无法等待所有数据都到达因为输入是无界的并且在任何时间点都不会完成。处理无界数据通常要求以特定顺序例如事件发生的顺序获取event以便能够推断结果完整性。
有界数据流
有界数据流有明确定义的开始和结束可以在执行任何计算之前通过获取所有数据来处理有界流处理有界流不需要有序获取因为可以始终对有界数据集进行排序有界流的处理也称为批处理。
4.单作业模式
会话模式因为资源共享会导致很多问题所以为了更好地隔离资源我们可以考虑为每个提交的作业启动一个集群这就是所谓的单作业Per-Job模式。 单作业模式就是严格的一对一集群只为这个作业而生。同样由客户端运行应用程序然后启动集群作业被提交给 JobManager进而分发TaskManager 执行。作业作业完成后集群就会关闭所有资源也会释放。这样一来每个作业都有它自己的 JobManager管理占用独享的资源即使发生故障它的 TaskManager 宕机也不会影响其他作业。
这些特性使得单作业模式在生产环境运行更加稳定所以是实际应用的首选模式。
需要注意的是Flink 本身无法直接这样运行所以单作业模式一般需要借助一些资源管理框架来启动集群比如 YARN、Kubernetes。
5.并行度优先级 • 每一个算子 operator 可以包含一个或多个子任务 operator subtask 这些子任务在不同的线程、不同的物理机或不同的容器中完全独立地执行。 • 一个特定算子的 子任务subtask的个数 被称为其并行度 parallelism 。 一般情况下一个流程序的并行度可以认为就是其所有算子中最大的并行度。一个程序中不同的算子可能具有不同的并行度。 Stream在算子之间传输数据的形式可以是one-to-one(forwarding)的模式也可以是redistributing的模式具体是哪一种形式取决于算子的种类。 One-to-one stream(比如在source和map operator之间)维护着分区以及元素的顺序。那意味着flatmap 算子的子任务看到的元素的个数以及顺序跟source 算子的子任务生产的元素的个数、顺序相同map、fliter、flatMap等算子都是one-to-one的对应关系。类似于spark中的窄依赖 Redistributing stream(map()跟keyBy/window之间或者keyBy/window跟sink之间)的分区会发生改变。每一个算子的子任务依据所选择的transformation发送数据到不同的目标任务。例如keyBy()基于hashCode重分区、broadcast和rebalance会随机重新分区这些算子都会引起redistribute过程而redistribute过程就类似于Spark中的shuffle过程。类似于spark中的宽依赖 并行度可以有如下几种指定方式 Operator Level算子级别(可以使用) 一个算子、数据源和sink的并行度可以通过调用 setParallelism()方法来指定 Execution Environment LevelEnv级别(可以使用) 执行环境(任务)的默认并行度可以通过调用setParallelism()方法指定。为了以并行度3来执行所有的算子、数据源和data sink 可以通过如下的方式设置执行环境的并行度执行环境的并行度可以通过显式设置算子的并行度而被重写 Client Level(客户端级别,推荐使用)(可以使用) 并行度可以在客户端将job提交到Flink时设定。 对于CLI客户端可以通过-p参数指定并行度 ./bin/flink run -p 10 WordCount-java.jar System Level系统默认级别,尽量不使用 在系统级可以通过设置flink-conf.yaml文件中的parallelism.default属性来指定所有执行环境的默认并行度 并行度的优先级算子级别 env级别 Client级别 系统默认级别 (越靠前具体的代码并行度的优先级越高) 6.任务调度执行图
逻辑流图StreamGraph→ 作业图JobGraph→ 执行图ExecutionGraph→ 物理图Physical Graph。 1逻辑流图StreamGraph
这是根据用户通过 DataStream API编写的代码生成的最初的DAG图用来表示程序的拓扑结构。这一步一般在客户端完成。
2作业图JobGraph
StreamGraph经过优化后生成的就是作业图JobGraph这是提交给 JobManager 的数据结构确定了当前作业中所有任务的划分。主要的优化为将多个符合条件的节点链接在一起合并成一个任务节点形成算子链这样可以减少数据交换的消耗。JobGraph一般也是在客户端生成的在作业提交时传递给JobMaster。
我们提交作业之后打开Flink自带的Web UI点击作业就能看到对应的作业图。 3执行图ExecutionGraph
JobMaster收到JobGraph后会根据它来生成执行图ExecutionGraph。ExecutionGraph是JobGraph的并行化版本是调度层最核心的数据结构。与JobGraph最大的区别就是按照并行度对并行子任务进行了拆分并明确了任务间数据传输的方式。
4物理图Physical Graph
JobMaster生成执行图后会将它分发给TaskManager各个TaskManager会根据执行图部署任务最终的物理执行过程也会形成一张“图”一般就叫作物理图Physical Graph。这只是具体执行层面的图并不是一个具体的数据结构。
物理图主要就是在执行图的基础上进一步确定数据存放的位置和收发的具体方式。有了物理图TaskManager就可以对传递来的数据进行处理计算了。
7.转换算子
数据源读入数据之后我们就可以使用各种转换算子将一个或多个DataStream转换为新的DataStream。 基本转换算子
1. 映射map
map 是大家非常熟悉的大数据操作算子主要用于将数据流中的数据进行转换形成新的数据流。简单来说就是一个“一 一映射”消费一个元素就产出一个元素 我们只需要基于 DataStrema 调用 map()方法就可以进行转换处理。方法需要传入的参数是接口 MapFunction 的实现返回值类型还是 DataStream。
public class TransMapTest {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSourceEvent stream env.fromElements(new Event(Mary, ./home, 1000L),new Event(Bob, ./cart, 2000L));//1.自定义类实现mapfunction接口SingleOutputStreamOperatorString result1 stream.map(new UserExtractor());// 2.传入匿名类实现MapFunctionSingleOutputStreamOperatorString result2 stream.map(new MapFunctionEvent, String() {Overridepublic String map(Event value) throws Exception {return value.user;}});//3.传入lambda表达式SingleOutputStreamOperatorString result3 stream.map(data - data.user);result1.print();//result2.print();//result3.print();env.execute();}public static class UserExtractor implements MapFunctionEvent, String {Overridepublic String map(Event value) throws Exception {return value.user;}}
}
MapFunction 实现类的泛型类型与输入数据类型和输出数据的类型有关。
在实现 MapFunction 接口的时候需要指定两个泛型分别是输入事件和输出事件的类型还
需要重写一个 map()方法定义从一个输入事件转换为另一个输出事件的具体逻辑。
2. 过滤filter
filter转换操作顾名思义是对数据流执行一个过滤通过一个布尔条件表达式设置过滤条件对于每一个流内元素进行判断若为true则元素正常输出若为false则元素被过滤掉。 进行filter转换之后的新数据流的数据类型与原数据流是相同的。filter转换需要传入的参数需要实现FilterFunction接口而FilterFunction内要实现filter()方法就相当于一个返回布尔类型的条件表达式。
public class TransFilterTest {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSourceEvent stream env.fromElements(new Event(Mary, ./home, 1000L),new Event(Bob, ./cart, 2000L));//1.传入实现filterfunction的自定义类SingleOutputStreamOperatorEvent result1 stream.filter(new UserFilter());// 2.传入匿名类实现FilterFunctionSingleOutputStreamOperatorEvent result2 stream.filter(new FilterFunctionEvent() {Overridepublic boolean filter(Event e) throws Exception {return e.user.equals(Bob);}});//3.传入lambda表达式SingleOutputStreamOperatorEvent result3 stream.filter(data - data.user.equals(Bob));result1.print();// result2.print();// result3.print();env.execute();}public static class UserFilter implements FilterFunctionEvent {Overridepublic boolean filter(Event e) throws Exception {return e.user.equals(Mary);}}
}
3. 扁平映射flatMap
flatMap 操作又称为扁平映射主要是将数据流中的整体一般是集合类型拆分成一个一个的个体使用。消费一个元素可以产生 0 到多个元素。flatMap 可以认为是“扁平化”flatten和“映射”map两步操作的结合也就是先按照某种规则对数据进行打散拆分再对拆分后的元素做转换处理. 同 map 一样flatMap 也可以使用 Lambda 表达式或者 FlatMapFunction 接口实现类的方式
来进行传参返回值类型取决于所传参数的具体逻辑可以与原数据流相同也可以不同。
public class TransFlatmapTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSourceEvent stream env.fromElements(new Event(Mary, ./home, 1000L),new Event(Bob, ./cart, 2000L));//1.实现自定义的flatmapfunctionstream.flatMap(new MyFlatMap()).print(1);//2.传入lambda表达式stream.flatMap((Event value,CollectorString out)- {if (value.user.equals(Marry))out.collect(value.url);else if(value.user.equals(Bob))out.collect(value.user);out.collect(value.url);out.collect(value.timestamp.toString());}).returns(new TypeHintString() {}).print(2);env.execute();}public static class MyFlatMap implements FlatMapFunctionEvent, String {Overridepublic void flatMap(Event value, CollectorString out) throws Exception {out.collect(value.user);out.collect(value.url);out.collect(value.timestamp.toString());}}}flatMap 操作会应用在每一个输入事件上面FlatMapFunction 接口中定义了 flatMap 方法
用户可以重写这个方法在这个方法中对输入数据进行处理并决定是返回 0 个、1 个或多个
结果数据。因此 flatMap 并没有直接定义返回值类型而是通过一个“收集器”Collector来
指定输出。希望输出结果时只要调用收集器的.collect()方法就可以了这个方法可以多次调
用也可以不调用。所以 flatMap 方法也可以实现 map 方法和 filter 方法的功能当返回结果
是 0 个的时候就相当于对数据进行了过滤当返回结果是 1 个的时候相当于对数据进行了
简单的转换操作。
聚合算子Aggregation
1. 按键分区keyBy
对于Flink而言DataStream是没有直接进行聚合的API的。因为我们对海量数据做聚合肯定要进行分区并行处理这样才能提高效率。所以在Flink中要做聚合需要先进行分区这个操作就是通过keyBy来完成的。
keyBy是聚合前必须要用到的一个算子。keyBy通过指定键key可以将一条流从逻辑上划分成不同的分区partitions。这里所说的分区其实就是并行处理的子任务。
基于不同的key流中的数据将被分配到不同的分区中去这样一来所有具有相同的key的数据都将被发往同一个分区。 2. 简单聚合
有了按键分区的数据流KeyedStream我们就可以基于它进行聚合操作了。Flink为我们内置实现了一些最基本、最简单的聚合API主要有以下几种
sum()在输入流上对指定的字段做叠加求和的操作。min()在输入流上对指定的字段求最小值。max()在输入流上对指定的字段求最大值。minBy()与min()类似在输入流上针对指定字段求最小值。不同的是min()只计算指定字段的最小值其他字段会保留最初第一个数据的值而minBy()则会返回包含字段最小值的整条数据。maxBy()与max()类似在输入流上针对指定字段求最大值。两者区别与min()/minBy()完全一致。
简单聚合算子使用非常方便语义也非常明确。这些聚合方法调用时也需要传入参数但并不像基本转换算子那样需要实现自定义函数只要说明聚合指定的字段就可以了。指定字段的方式有两种指定位置和指定名称。
public class TransformSimpleAggTest {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSourceEvent stream env.fromElements(new Event(Mary, ./home, 1000L),new Event(Bob, ./cart, 2000L),new Event(Alice,./prod?id100, 3000L),new Event(Bob,./prod?id1, 3300L),new Event(Bob, ./home, 3500L),new Event(Alice,./prod?id200, 3200L),new Event(Bob,./prod?id2, 3800L),new Event(Bob,./prod?id3, 4200L));// 按键分组之后进行聚合提取当前用户最后一次访问数据stream.keyBy(new KeySelectorEvent, String() {Overridepublic String getKey(Event value) throws Exception {return value.user;}}).max(timestamp).print(max:);stream.keyBy(data - data.user).maxBy(timestamp).print(maxBy:);env.execute();}
}3. 归约聚合reduce
与简单聚合类似reduce 操作也会将 KeyedStream 转换为 DataStream。它不会改变流的元素数据类型所以输出类型和输入类型是一样的。
调用 KeyedStream 的 reduce 方法时需要传入一个参数实现 ReduceFunction 接口。接口在源码中的定义如下
public interface ReduceFunctionT extends Function, Serializable {
T reduce(T value1, T value2) throws Exception;
}ReduceFunction 接口里需要实现 reduce()方法这个方法接收两个输入事件经过转换处理之后输出一个相同类型的事件所以对于一组数据我们可以先取两个进行合并然后再将合并的结果看作一个数据、再跟后面的数据合并最终会将它“简化”成唯一的一个数据这也就是 reduce“归约”的含义。在流处理的底层实现过程中实际上是将中间“合并的结果”作为任务的一个状态保存起来的之后每来一个新的数据就和之前的聚合状态进一步做归约。
public class TransformReduceTest {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSourceEvent stream env.fromElements(new Event(Mary, ./home, 1000L),new Event(Bob, ./cart, 2000L),new Event(Alice,./prod?id100, 3000L),new Event(Bob,./prod?id1, 3300L),new Event(Alice,./prod?id200, 3200L),new Event(Bob, ./home, 3500L),new Event(Bob,./prod?id2, 3800L),new Event(Bob,./prod?id3, 4200L));//1. 统计每个用户的访问频次SingleOutputStreamOperatorTuple2String, Long clicksByUser stream.map(new MapFunctionEvent, Tuple2String, Long() {Overridepublic Tuple2String, Long map(Event value) throws Exception {return Tuple2.of(value.user, 1L);}}).keyBy(data - data.f0).reduce(new ReduceFunctionTuple2String, Long() {Overridepublic Tuple2String, Long reduce(Tuple2String, Long value1, Tuple2String, Long value2) throws Exception {return Tuple2.of(value1.f0, value1.f1 value2.f1);}});//2. 选取当前最活跃的用户SingleOutputStreamOperatorTuple2String, Long result clicksByUser.keyBy(data - key).reduce(new ReduceFunctionTuple2String, Long() {Overridepublic Tuple2String, Long reduce(Tuple2String, Long value1, Tuple2String, Long value2) throws Exception {return value1.f1 value2.f1 ? value1 : value2;}});result.print();env.execute();}
}8.输出算子sink
连接到外部系统
Flink作为数据处理框架最终还是要把计算处理的结果写入外部存储为外部应用提供支持。 Flink的DataStream API专门提供了向外部写入数据的方法addSink。与addSource类似addSink方法对应着一个“Sink”算子主要就是用来实现与外部系统连接、并将数据提交写入的Flink程序中所有对外的输出操作一般都是利用Sink算子完成的。
Sink 一词有“下沉”的意思有些资料会相对于“数据源”把它翻译为“数据汇”。不论怎样理解Sink 在 Flink 中代表了将结果数据收集起来、输出到外部的意思所以我们这里统一把它直观地叫作“输出算子”。
与 Source 算子非常类似除去一些 Flink 预实现的 Sink一般情况下 Sink 算子的创建是通过调用 DataStream 的.addSink()方法实现的。
stream.addSink(new SinkFunction(…));
addSource 的参数需要实现一个 SourceFunction 接口类似地addSink 方法同样需要传入一个参数实现的是 SinkFunction 接口。在这个接口中只需要重写一个方法 invoke(),用来将指定的值写入到外部系统中。这个方法在每条数据记录到来时都会调用
default void invoke(IN value, Context context) throws Exception SinkFuntion 多数情况下同样并不需要我们自己实现。Flink 官方提供了一部分的框架的 Sink 连接器。
列出了 Flink 官方目前支持的第三方系统连接器 除 Flink 官方之外Apache Bahir 作为给 Spark 和 Flink 提供扩展支持的项目也实现了一些其他第三方系统与 Flink 的连接器 除此以外就需要用户自定义实现 sink 连接器了。
输出到文件
Flink 为此专门提供了一个流式文件系统的连接器StreamingFileSink它继承自抽象类RichSinkFunction而且集成了 Flink 的检查点checkpoint机制用来保证精确一次exactly once的一致性语义。
StreamingFileSink 为批处理和流处理提供了一个统一的 Sink它可以将分区文件写入 Flink支持的文件系统。它可以保证精确一次的状态一致性大大改进了之前流式文件 Sink 的方式。它的主要操作是将数据写入桶buckets每个桶中的数据都可以分割成一个个大小有限的分区文件这样一来就实现真正意义上的分布式文件存储。我们可以通过各种配置来控制“分桶”的操作默认的分桶方式是基于时间的我们每小时写入一个新的桶。换句话说每个桶内保存的文件记录的都是 1 小时的输出数据。
StreamingFileSink 支持行编码Row-encoded和批量编码Bulk-encoded比如 Parquet格式。这两种不同的方式都有各自的构建器builder调用方法也非常简单可以直接调用StreamingFileSink 的静态方法
• 行编码StreamingFileSink.forRowFormatbasePathrowEncoder。 • 批量编码StreamingFileSink.forBulkFormatbasePathbulkWriterFactory。 public class SinkToFileTest {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(4);DataStreamEvent stream env.fromElements(new Event(Mary, ./home, 1000L),new Event(Bob, ./cart, 2000L),new Event(Alice, ./prod?id100, 3000L),new Event(Bob, ./prod?id1, 3300L),new Event(Alice, ./prod?id300, 3200L),new Event(Bob, ./home, 3500L),new Event(Bob, ./prod?id2, 3800L),new Event(Bob, ./prod?id3, 4200L));StreamingFileSinkString fileSink StreamingFileSink.StringforRowFormat(new Path(./output),new SimpleStringEncoder(UTF-8)).withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(TimeUnit.MINUTES.toMillis(15)).withInactivityInterval(TimeUnit.MINUTES.toMillis(5)).withMaxPartSize(1024 * 1024 * 1024).build()).build();// 将Event转换成String写入文件stream.map(data - data.toString()).addSink(fileSink);env.execute();}
} 输出到Kafka
我们要将数据输出到 Kafka整个数据处理的闭环已经形成所以可以完整测试如下
1添加 Kafka 连接器依赖
由于我们已经测试过从 Kafka 数据源读取数据连接器相关依赖已经引入这里就不重复
介绍了。
2启动 Kafka 集群
3编写输出到 Kafka 的示例代码
1.启动zookeeper
命令bin/zkServer.sh start
2.启动kafka
命令bin/kafka-server-start.sh -daemon config/server.properties
3.创建生产者
命令./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic clicks
addSink 传入的参数是一个 FlinkKafkaProducer。这也很好理解因为需要向 Kafka 写入数据自然应该创建一个生产者。FlinkKafkaProducer 继承了抽象类TwoPhaseCommitSinkFunction这是一个实现了“两阶段提交”的 RichSinkFunction。两阶段提交提供了 Flink 向 Kafka 写入数据的事务性保证能够真正做到精确一次exactly once的状态一致性。
4. 运行代码另开启一个master01窗口启动一个消费者, 查看是否收到数据
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic events
public class SinkToKafkaTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);Properties properties new Properties();properties.put(bootstrap.servers, master01:9092);DataStreamSourceString stream env.addSource(new FlinkKafkaConsumerString(clicks,new SimpleStringSchema(),properties));SingleOutputStreamOperatorString result stream.map(new MapFunctionString, String() {Overridepublic String map(String value) throws Exception {String[] fields value.split(,);return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim())).toString();}});result.addSink(new FlinkKafkaProducerString(master01:9092, events, new SimpleStringSchema()));env.execute();}
}
结果 2025年的第一天我居然还在复习【枯死】
新的一年梦虽遥追则能达。愿虽艰持则可圆。祝大家所愿皆所成多喜乐长安宁。 文章转载自: http://www.morning.mlcnh.cn.gov.cn.mlcnh.cn http://www.morning.ymrq.cn.gov.cn.ymrq.cn http://www.morning.mnbgx.cn.gov.cn.mnbgx.cn http://www.morning.zcmpk.cn.gov.cn.zcmpk.cn http://www.morning.ghxzd.cn.gov.cn.ghxzd.cn http://www.morning.qkpzq.cn.gov.cn.qkpzq.cn http://www.morning.skqfx.cn.gov.cn.skqfx.cn http://www.morning.wfykn.cn.gov.cn.wfykn.cn http://www.morning.ckzjl.cn.gov.cn.ckzjl.cn http://www.morning.lskrg.cn.gov.cn.lskrg.cn http://www.morning.brkc.cn.gov.cn.brkc.cn http://www.morning.bsrqy.cn.gov.cn.bsrqy.cn http://www.morning.zwyuan.com.gov.cn.zwyuan.com http://www.morning.lsssx.cn.gov.cn.lsssx.cn http://www.morning.i-bins.com.gov.cn.i-bins.com http://www.morning.hnrls.cn.gov.cn.hnrls.cn http://www.morning.homayy.com.gov.cn.homayy.com http://www.morning.wrdlf.cn.gov.cn.wrdlf.cn http://www.morning.fswml.cn.gov.cn.fswml.cn http://www.morning.ggcjf.cn.gov.cn.ggcjf.cn http://www.morning.dmwjl.cn.gov.cn.dmwjl.cn http://www.morning.wdrxh.cn.gov.cn.wdrxh.cn http://www.morning.fjfjm.cn.gov.cn.fjfjm.cn http://www.morning.gpsr.cn.gov.cn.gpsr.cn http://www.morning.fbfnk.cn.gov.cn.fbfnk.cn http://www.morning.syznh.cn.gov.cn.syznh.cn http://www.morning.kchwr.cn.gov.cn.kchwr.cn http://www.morning.mbhdl.cn.gov.cn.mbhdl.cn http://www.morning.ktlxk.cn.gov.cn.ktlxk.cn http://www.morning.jpdbj.cn.gov.cn.jpdbj.cn http://www.morning.qsfys.cn.gov.cn.qsfys.cn http://www.morning.xzsqb.cn.gov.cn.xzsqb.cn http://www.morning.yzmzp.cn.gov.cn.yzmzp.cn http://www.morning.tkkjl.cn.gov.cn.tkkjl.cn http://www.morning.gjzwj.cn.gov.cn.gjzwj.cn http://www.morning.brwnd.cn.gov.cn.brwnd.cn http://www.morning.rfrxt.cn.gov.cn.rfrxt.cn http://www.morning.xflzm.cn.gov.cn.xflzm.cn http://www.morning.mypxm.com.gov.cn.mypxm.com http://www.morning.rfmzc.cn.gov.cn.rfmzc.cn http://www.morning.lkhgq.cn.gov.cn.lkhgq.cn http://www.morning.ryjl.cn.gov.cn.ryjl.cn http://www.morning.gmmxh.cn.gov.cn.gmmxh.cn http://www.morning.wwxg.cn.gov.cn.wwxg.cn http://www.morning.pmrlt.cn.gov.cn.pmrlt.cn http://www.morning.kzbpx.cn.gov.cn.kzbpx.cn http://www.morning.dlrsjc.com.gov.cn.dlrsjc.com http://www.morning.xbrxk.cn.gov.cn.xbrxk.cn http://www.morning.xkjrs.cn.gov.cn.xkjrs.cn http://www.morning.srjbs.cn.gov.cn.srjbs.cn http://www.morning.yfmlj.cn.gov.cn.yfmlj.cn http://www.morning.kxscs.cn.gov.cn.kxscs.cn http://www.morning.nlrxh.cn.gov.cn.nlrxh.cn http://www.morning.wrqw.cn.gov.cn.wrqw.cn http://www.morning.wbnsf.cn.gov.cn.wbnsf.cn http://www.morning.tphrx.cn.gov.cn.tphrx.cn http://www.morning.tjjkn.cn.gov.cn.tjjkn.cn http://www.morning.lsnnq.cn.gov.cn.lsnnq.cn http://www.morning.dzrcj.cn.gov.cn.dzrcj.cn http://www.morning.ngmjn.cn.gov.cn.ngmjn.cn http://www.morning.kvzvoew.cn.gov.cn.kvzvoew.cn http://www.morning.gcxfh.cn.gov.cn.gcxfh.cn http://www.morning.dkfb.cn.gov.cn.dkfb.cn http://www.morning.yqqgp.cn.gov.cn.yqqgp.cn http://www.morning.rshkh.cn.gov.cn.rshkh.cn http://www.morning.snyqb.cn.gov.cn.snyqb.cn http://www.morning.mxptg.cn.gov.cn.mxptg.cn http://www.morning.bhdtx.cn.gov.cn.bhdtx.cn http://www.morning.rtpw.cn.gov.cn.rtpw.cn http://www.morning.czzpm.cn.gov.cn.czzpm.cn http://www.morning.okiner.com.gov.cn.okiner.com http://www.morning.kxxld.cn.gov.cn.kxxld.cn http://www.morning.yrnyz.cn.gov.cn.yrnyz.cn http://www.morning.wftrs.cn.gov.cn.wftrs.cn http://www.morning.zthln.cn.gov.cn.zthln.cn http://www.morning.rykmz.cn.gov.cn.rykmz.cn http://www.morning.wkknm.cn.gov.cn.wkknm.cn http://www.morning.rwbx.cn.gov.cn.rwbx.cn http://www.morning.pfkrw.cn.gov.cn.pfkrw.cn http://www.morning.dwwbt.cn.gov.cn.dwwbt.cn