网站服务器干啥,潍坊网站制作案例,新河企业做网站,怎么做外贸网站需注意哪些Table API 和 DataStream API 在定义数据处理管道时同样重要。DataStream API 提供了流处理的基本操作#xff08;即时间、状态和数据流管理#xff09;#xff0c;并且是一个相对低级的命令式编程 API。而 Table API 抽象了许多内部实现#xff0c;提供了一个结构化和声明…Table API 和 DataStream API 在定义数据处理管道时同样重要。DataStream API 提供了流处理的基本操作即时间、状态和数据流管理并且是一个相对低级的命令式编程 API。而 Table API 抽象了许多内部实现提供了一个结构化和声明式的 API。这两个 API 都可以处理有界流和无界流。
有界流需要在处理历史数据时进行管理。无界流通常出现在实时处理场景中可能会先通过历史数据初始化。为了高效执行两个 API 都提供了通过优化的批处理执行模式来处理有界流。然而由于批处理只是流处理的一个特殊情况处理有界流的管道也可以在常规流处理模式下运行。在一个 API 中可以定义端到端的管道而无需依赖另一个 API。然而混合使用这两个 API 在某些情况下可能会有益
在实现 DataStream API 的主要管道之前使用 Table API 访问目录或连接外部系统。在实现 DataStream API 的主要管道之前使用一些 SQL 函数进行无状态的数据归一化和清洗。如果需要更低级的操作例如自定义定时器处理可以切换到 DataStream API因为 Table API 中没有提供这些功能。Flink 提供了特别的桥接功能使得与 DataStream API 的集成尽可能顺畅。
1. Converting between DataStream and Table
Flink 提供了一个专门的 StreamTableEnvironment 用于与 DataStream API 集成。这个环境在常规的 TableEnvironment 基础上扩展了额外的方法并以 DataStream API 中使用的 StreamExecutionEnvironment 作为参数。
以下代码展示了如何在两个 API 之间来回转换。表的列名和类型会自动从 DataStream 的 TypeInformation 中推导出来。由于 DataStream API 本身不支持变更日志处理因此在流到表和表到流的转换过程中假设使用的是仅追加/仅插入的语义。
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;// create environments of both APIs
StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv StreamTableEnvironment.create(env);// create a DataStream
DataStreamString dataStream env.fromElements(Alice, Bob, John);// interpret the insert-only DataStream as a Table
Table inputTable tableEnv.fromDataStream(dataStream);// register the Table object as a view and query it
tableEnv.createTemporaryView(InputTable, inputTable);
Table resultTable tableEnv.sqlQuery(SELECT UPPER(f0) FROM InputTable);// interpret the insert-only Table as a DataStream again
DataStreamRow resultStream tableEnv.toDataStream(resultTable);// add a printing sink and execute in DataStream API
resultStream.print();
env.execute();// prints:
// I[ALICE]
// I[BOB]
// I[JOHN]
根据查询的类型在许多情况下结果动态表是一个管道在将 Table 转换为 DataStream 时不仅会生成仅插入的变化还可能生成撤回和其他类型的更新。在表到流的转换过程中这可能会导致类似的异常。表接收器 Unregistered_DataStream_Sink 不支持消费更新变化 [...] 在这种情况下需要重新审查查询或切换到 toChangelogStream。
以下示例展示了如何转换更新表。每个结果行表示变更日志中的一条记录变更标志可以通过调用 row.getKind() 来查询。在示例中Alice 的第二个分数会在变更前创建一个更新标志 (-U)并在变更后创建一个更新标志 (U)。
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;// create environments of both APIs
StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv StreamTableEnvironment.create(env);// create a DataStream
DataStreamRow dataStream env.fromElements(Row.of(Alice, 12),Row.of(Bob, 10),Row.of(Alice, 100));// interpret the insert-only DataStream as a Table
Table inputTable tableEnv.fromDataStream(dataStream).as(name, score);// register the Table object as a view and query it
// the query contains an aggregation that produces updates
tableEnv.createTemporaryView(InputTable, inputTable);
Table resultTable tableEnv.sqlQuery(SELECT name, SUM(score) FROM InputTable GROUP BY name);// interpret the updating Table as a changelog DataStream
DataStreamRow resultStream tableEnv.toChangelogStream(resultTable);// add a printing sink and execute in DataStream API
resultStream.print();
env.execute();// prints:
// I[Alice, 12]
// I[Bob, 10]
// -U[Alice, 12]
// U[Alice, 112]
上面的示例展示了如何通过持续地逐行发布更新来增量地计算最终结果。然而在输入流是有限的即有界的情况下可以利用批处理原则更高效地计算结果。
在批处理处理中操作符可以在多个阶段中执行这些阶段会先消费整个输入表然后再发出结果。例如连接操作符可以在执行实际连接之前对两个有界输入进行排序即排序合并连接算法或者在消费另一个输入之前从一个输入构建哈希表即哈希连接算法的构建/探测阶段。
DataStream API 和 Table API 都提供了专门的批处理运行时模式。
以下示例说明了通过简单地切换标志统一管道能够同时处理批处理和流数据。
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;// setup DataStream API
StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// set the batch runtime mode
env.setRuntimeMode(RuntimeExecutionMode.BATCH);// uncomment this for streaming mode
// env.setRuntimeMode(RuntimeExecutionMode.STREAMING);// setup Table API
// the table environment adopts the runtime mode during initialization
StreamTableEnvironment tableEnv StreamTableEnvironment.create(env);// define the same pipeline as above// prints in BATCH mode:
// I[Bob, 10]
// I[Alice, 112]// prints in STREAMING mode:
// I[Alice, 12]
// I[Bob, 10]
// -U[Alice, 12]
// U[Alice, 112]
一旦变更日志应用到外部系统例如键值存储可以看到两种模式都能够产生完全相同的输出表。通过在发出结果之前消费所有输入数据批处理模式的变更日志仅包含插入类型的变化。
1.1 Dependencies and Imports
结合 Table API 和 DataStream API 的项目需要添加以下其中一个桥接模块。这些模块包括了 flink-table-api-java 或 flink-table-api-scala 的传递依赖以及相应语言特定的 DataStream API 模块。
dependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-api-java-bridge_2.12/artifactIdversion1.20.0/versionscopeprovided/scope
/dependency
以下导入是声明使用 Java 或 Scala 版本的 DataStream API 和 Table API 的公共管道所必需的。
// imports for Java DataStream API
import org.apache.flink.streaming.api.*;
import org.apache.flink.streaming.api.environment.*;// imports for Table API with bridging to Java DataStream API
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.*;
1.2 Configuration
TableEnvironment 将采用传入的 StreamExecutionEnvironment 的所有配置选项。然而在实例化之后无法保证对 StreamExecutionEnvironment 配置的进一步更改会传播到 StreamTableEnvironment。建议在切换到 Table API 之前尽早在 DataStream API 中设置所有配置选项。
import java.time.ZoneId;
import org.apache.flink.core.execution.CheckpointingMode.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;// create Java DataStream APIStreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// set various configuration earlyenv.setMaxParallelism(256);env.getConfig().addDefaultKryoSerializer(MyCustomType.class, CustomKryoSerializer.class);env.getCheckpointConfig().setCheckpointingConsistencyMode(CheckpointingMode.EXACTLY_ONCE);// then switch to Java Table APIStreamTableEnvironment tableEnv StreamTableEnvironment.create(env);// set configuration earlytableEnv.getConfig().setLocalTimeZone(ZoneId.of(Europe/Berlin));// start defining your pipelines in both APIs...
1.3 Execution Behavior
两个 API 都提供了执行管道的方法。换句话说如果需要它们会编译一个作业图并提交到集群触发执行。结果将被流式传输到声明的接收器。通常两个 API 都在方法名称中使用 execute 来标识这种行为。然而Table API 和 DataStream API 之间的执行行为略有不同。
1.3.1 DataStream API
DataStream API 的 StreamExecutionEnvironment 使用构建器来构建复杂的管道。该管道可能会拆分成多个分支这些分支可能会或可能不会以Sink结束。执行环境会缓冲所有这些定义的分支直到提交作业为止。StreamExecutionEnvironment.execute() 提交整个构建的管道并在之后清除构建器。换句话说不再声明Source和Sink可以将新的管道添加到构建器中。因此每个 DataStream 程序通常以调用 StreamExecutionEnvironment.execute() 结束。或者DataStream.executeAndCollect() 会隐式地为结果流式传输到本地客户端定义一个Sink
1.3.2 Table API
在 Table API 中分支管道仅在 StatementSet 中受到支持其中每个分支必须声明一个最终的Sink。TableEnvironment 和 StreamTableEnvironment 都不提供专门的通用 execute() 方法。相反它们提供了提交单个Source到Sink管道或StatementSet的方法
// execute with explicit sink
tableEnv.from(InputTable).insertInto(OutputTable).execute();tableEnv.executeSql(INSERT INTO OutputTable SELECT * FROM InputTable);tableEnv.createStatementSet().add(tableEnv.from(InputTable).insertInto(OutputTable)).add(tableEnv.from(InputTable).insertInto(OutputTable2)).execute();tableEnv.createStatementSet().addInsertSql(INSERT INTO OutputTable SELECT * FROM InputTable).addInsertSql(INSERT INTO OutputTable2 SELECT * FROM InputTable).execute();// execute with implicit local sinktableEnv.from(InputTable).execute().print();tableEnv.executeSql(SELECT * FROM InputTable).print();
为了结合这两种执行行为每次调用 StreamTableEnvironment.toDataStream 或 StreamTableEnvironment.toChangelogStream 时会将 Table API 子管道物化即编译并插入到 DataStream API 管道构建器中。这意味着之后必须调用 StreamExecutionEnvironment.execute() 或 DataStream.executeAndCollect。在 Table API 中的执行不会触发这些“外部部分”。
// (1)// adds a branch with a printing sink to the StreamExecutionEnvironment
tableEnv.toDataStream(table).print();// (2)// executes a Table API end-to-end pipeline as a Flink job and prints locally,
// thus (1) has still not been executed
table.execute().print();// executes the DataStream API pipeline with the sink defined in (1) as a
// Flink job, (2) was already running before
env.execute();
2. Batch Runtime Mode
批处理运行时模式是针对有界 Flink 程序的专门执行模式。一般来说有界性是数据源的一个属性它告诉我们该数据源中所有记录在执行前是否已知或者是否会出现新的数据可能是无限期的。一个作业如果其所有数据源都是有界的则该作业是有界的否则就是无界的。另一方面流处理运行时模式可以用于有界和无界作业。Table API 和 SQL planner为这两种模式提供了一组专门的优化器规则和运行时操作符。目前运行时模式不会自动从数据源推导出来因此必须显式设置或者在实例化 StreamTableEnvironment 时从 StreamExecutionEnvironment 继承。
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;// adopt mode from StreamExecutionEnvironment
StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
StreamTableEnvironment tableEnv StreamTableEnvironment.create(env);// or// set mode explicitly for StreamTableEnvironment
// it will be propagated to StreamExecutionEnvironment during planning
StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv StreamTableEnvironment.create(env, EnvironmentSettings.inBatchMode());
在将运行时模式设置为 BATCH 之前必须满足以下前提条件
所有数据源必须声明自己为有界的。当前表数据源必须仅发出插入insert-only变化。操作符需要足够的堆外内存来进行排序和处理其他中间结果。所有表操作必须在批处理模式下可用。当前某些操作仅在流处理模式下可用。
批处理执行具有以下含义其中包括但不限于
操作符中不会生成或使用渐进watermarks。然而数据源在关闭之前会发出一个最大watermarks。任务之间的交换可能是阻塞的这取决于 execution.batch-shuffle-mode。这也意味着与在流处理模式下执行相同管道相比可能会需要更少的资源。检查点Checkpointing被禁用。插入了人工状态后端Artificial state backends。表操作不会产生增量更新而是只生成一个完整的最终结果并转换为一个仅插入的变更日志流。
由于批处理可以视为流处理的特例我们建议首先实现一个流处理管道因为它是有界和无界数据的最通用实现。理论上流处理管道可以执行所有操作符。然而实际上某些操作可能没有太大意义因为它们会导致状态不断增长因此不被支持。全局排序就是一个例子它仅在批处理模式下可用。简而言之应该可以在批处理模式下运行一个正常工作的流处理管道但不一定能够反过来运行。
以下示例展示了如何使用 DataGen 表数据源在批处理模式下进行操作。许多数据源提供的选项可以隐式地使连接器变为有界例如通过定义终止偏移量或时间戳。在我们的示例中我们通过 number-of-rows 选项限制了行数。
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableDescriptor;Table table tableEnv.from(TableDescriptor.forConnector(datagen).option(number-of-rows, 10) // make the source bounded.schema(Schema.newBuilder().column(uid, DataTypes.TINYINT()).column(payload, DataTypes.STRING()).build()).build());// convert the Table to a DataStream and further transform the pipeline
tableEnv.toDataStream(table).keyBy(r - r.BytegetFieldAs(uid)).map(r - My custom operator: r.StringgetFieldAs(payload)).executeAndCollect().forEachRemaining(System.out::println);// prints:
// My custom operator: 9660912d30a43c7b035e15bd...
// My custom operator: 29f5f706d2144f4a4f9f52a0...
// ...
2.1 Changelog Unification
在大多数情况下从流处理模式切换到批处理模式反之亦然管道定义本身在 Table API 和 DataStream API 中可以保持不变。然而如前所述由于批处理模式避免了增量操作最终的变更日志流可能会有所不同。依赖于事件时间并利用水印作为完整性标记的基于时间的操作能够生成一个仅插入的变更日志流这与运行时模式无关。
以下 Java 示例展示了一个 Flink 程序它不仅在 API 层面上是统一的而且在最终生成的变更日志流中也是统一的。该示例使用基于两张表UserTable 和 OrderTable中时间属性ts的区间连接interval join来连接两张表。它使用 DataStream API 实现了一个自定义操作符该操作符通过 KeyedProcessFunction 和值状态去重用户名。
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import java.time.LocalDateTime;// setup DataStream API
StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// use BATCH or STREAMING mode
env.setRuntimeMode(RuntimeExecutionMode.BATCH);// setup Table API
StreamTableEnvironment tableEnv StreamTableEnvironment.create(env);// create a user stream
DataStreamRow userStream env.fromElements(Row.of(LocalDateTime.parse(2021-08-21T13:00:00), 1, Alice),Row.of(LocalDateTime.parse(2021-08-21T13:05:00), 2, Bob),Row.of(LocalDateTime.parse(2021-08-21T13:10:00), 2, Bob)).returns(Types.ROW_NAMED(new String[] {ts, uid, name},Types.LOCAL_DATE_TIME, Types.INT, Types.STRING));// create an order stream
DataStreamRow orderStream env.fromElements(Row.of(LocalDateTime.parse(2021-08-21T13:02:00), 1, 122),Row.of(LocalDateTime.parse(2021-08-21T13:07:00), 2, 239),Row.of(LocalDateTime.parse(2021-08-21T13:11:00), 2, 999)).returns(Types.ROW_NAMED(new String[] {ts, uid, amount},Types.LOCAL_DATE_TIME, Types.INT, Types.INT));// create corresponding tables
tableEnv.createTemporaryView(UserTable,userStream,Schema.newBuilder().column(ts, DataTypes.TIMESTAMP(3)).column(uid, DataTypes.INT()).column(name, DataTypes.STRING()).watermark(ts, ts - INTERVAL 1 SECOND).build());tableEnv.createTemporaryView(OrderTable,orderStream,Schema.newBuilder().column(ts, DataTypes.TIMESTAMP(3)).column(uid, DataTypes.INT()).column(amount, DataTypes.INT()).watermark(ts, ts - INTERVAL 1 SECOND).build());// perform interval join
Table joinedTable tableEnv.sqlQuery(SELECT U.name, O.amount FROM UserTable U, OrderTable O WHERE U.uid O.uid AND O.ts BETWEEN U.ts AND U.ts INTERVAL 5 MINUTES);DataStreamRow joinedStream tableEnv.toDataStream(joinedTable);joinedStream.print();// implement a custom operator using ProcessFunction and value state
joinedStream.keyBy(r - r.StringgetFieldAs(name)).process(new KeyedProcessFunctionString, Row, String() {ValueStateString seen;Overridepublic void open(OpenContext openContext) {seen getRuntimeContext().getState(new ValueStateDescriptor(seen, String.class));}Overridepublic void processElement(Row row, Context ctx, CollectorString out)throws Exception {String name row.getFieldAs(name);if (seen.value() null) {seen.update(name);out.collect(name);}}}).print();// execute unified pipeline
env.execute();// prints (in both BATCH and STREAMING mode):
// I[Bob, 239]
// I[Alice, 122]
// I[Bob, 999]
//
// Bob
// Alice 文章转载自: http://www.morning.mprky.cn.gov.cn.mprky.cn http://www.morning.hsrch.cn.gov.cn.hsrch.cn http://www.morning.xq3nk42mvv.cn.gov.cn.xq3nk42mvv.cn http://www.morning.gjmbk.cn.gov.cn.gjmbk.cn http://www.morning.plqsz.cn.gov.cn.plqsz.cn http://www.morning.bqwrn.cn.gov.cn.bqwrn.cn http://www.morning.jqmqf.cn.gov.cn.jqmqf.cn http://www.morning.juju8.cn.gov.cn.juju8.cn http://www.morning.pwwdp.cn.gov.cn.pwwdp.cn http://www.morning.dfckx.cn.gov.cn.dfckx.cn http://www.morning.rymb.cn.gov.cn.rymb.cn http://www.morning.hmhdn.cn.gov.cn.hmhdn.cn http://www.morning.phlrp.cn.gov.cn.phlrp.cn http://www.morning.yfnhg.cn.gov.cn.yfnhg.cn http://www.morning.bdsyu.cn.gov.cn.bdsyu.cn http://www.morning.pjxlg.cn.gov.cn.pjxlg.cn http://www.morning.mjbkp.cn.gov.cn.mjbkp.cn http://www.morning.yzmzp.cn.gov.cn.yzmzp.cn http://www.morning.wiitw.com.gov.cn.wiitw.com http://www.morning.lkbyj.cn.gov.cn.lkbyj.cn http://www.morning.zryf.cn.gov.cn.zryf.cn http://www.morning.mrtdq.cn.gov.cn.mrtdq.cn http://www.morning.jqlx.cn.gov.cn.jqlx.cn http://www.morning.qhmql.cn.gov.cn.qhmql.cn http://www.morning.zpyxl.cn.gov.cn.zpyxl.cn http://www.morning.lqklf.cn.gov.cn.lqklf.cn http://www.morning.qnypp.cn.gov.cn.qnypp.cn http://www.morning.pgxjl.cn.gov.cn.pgxjl.cn http://www.morning.psxfg.cn.gov.cn.psxfg.cn http://www.morning.xbtlt.cn.gov.cn.xbtlt.cn http://www.morning.qcdtzk.cn.gov.cn.qcdtzk.cn http://www.morning.psqs.cn.gov.cn.psqs.cn http://www.morning.amonr.com.gov.cn.amonr.com http://www.morning.zcxjg.cn.gov.cn.zcxjg.cn http://www.morning.qykxj.cn.gov.cn.qykxj.cn http://www.morning.gnghp.cn.gov.cn.gnghp.cn http://www.morning.nshhf.cn.gov.cn.nshhf.cn http://www.morning.dqzcf.cn.gov.cn.dqzcf.cn http://www.morning.yhywr.cn.gov.cn.yhywr.cn http://www.morning.fewhope.com.gov.cn.fewhope.com http://www.morning.dnmzl.cn.gov.cn.dnmzl.cn http://www.morning.sbwr.cn.gov.cn.sbwr.cn http://www.morning.wrcgy.cn.gov.cn.wrcgy.cn http://www.morning.qnbck.cn.gov.cn.qnbck.cn http://www.morning.lqljj.cn.gov.cn.lqljj.cn http://www.morning.plgbh.cn.gov.cn.plgbh.cn http://www.morning.cwpny.cn.gov.cn.cwpny.cn http://www.morning.ywxln.cn.gov.cn.ywxln.cn http://www.morning.yjdql.cn.gov.cn.yjdql.cn http://www.morning.mypxm.com.gov.cn.mypxm.com http://www.morning.zqcdl.cn.gov.cn.zqcdl.cn http://www.morning.nclps.cn.gov.cn.nclps.cn http://www.morning.qlwfz.cn.gov.cn.qlwfz.cn http://www.morning.wfhnz.cn.gov.cn.wfhnz.cn http://www.morning.ghslr.cn.gov.cn.ghslr.cn http://www.morning.fnjrh.cn.gov.cn.fnjrh.cn http://www.morning.hqrr.cn.gov.cn.hqrr.cn http://www.morning.xfhms.cn.gov.cn.xfhms.cn http://www.morning.fwllb.cn.gov.cn.fwllb.cn http://www.morning.wqgr.cn.gov.cn.wqgr.cn http://www.morning.sbncr.cn.gov.cn.sbncr.cn http://www.morning.tgmfg.cn.gov.cn.tgmfg.cn http://www.morning.rsjng.cn.gov.cn.rsjng.cn http://www.morning.ldzxf.cn.gov.cn.ldzxf.cn http://www.morning.dwwlg.cn.gov.cn.dwwlg.cn http://www.morning.dljujia.com.gov.cn.dljujia.com http://www.morning.brzlp.cn.gov.cn.brzlp.cn http://www.morning.pqxjq.cn.gov.cn.pqxjq.cn http://www.morning.xbhpm.cn.gov.cn.xbhpm.cn http://www.morning.plxhq.cn.gov.cn.plxhq.cn http://www.morning.kkzwn.cn.gov.cn.kkzwn.cn http://www.morning.wprxm.cn.gov.cn.wprxm.cn http://www.morning.xhjjs.cn.gov.cn.xhjjs.cn http://www.morning.qzmnr.cn.gov.cn.qzmnr.cn http://www.morning.nhgfz.cn.gov.cn.nhgfz.cn http://www.morning.chgmm.cn.gov.cn.chgmm.cn http://www.morning.rwhlf.cn.gov.cn.rwhlf.cn http://www.morning.qjbxt.cn.gov.cn.qjbxt.cn http://www.morning.ntkpc.cn.gov.cn.ntkpc.cn http://www.morning.npmx.cn.gov.cn.npmx.cn