景德镇建设网站,中国建设招标网是什么网站,俄罗斯外贸网站,网页设计模板html代码音乐Kafka在大数据处理中的应用 一、Kafka简介1. 基础概念2. Kafka的主要功能3. Kafka的特点 二、应用场景1. 数据采集和消费2. 数据存储和持久化3. 实时数据处理和流计算4. 数据通信和协同 三、技术融合1. Kafka与Hadoop生态技术的融合1) 使用Kafka作为Hadoop的数据源2) 使用Hadoo… Kafka在大数据处理中的应用 一、Kafka简介1. 基础概念2. Kafka的主要功能3. Kafka的特点 二、应用场景1. 数据采集和消费2. 数据存储和持久化3. 实时数据处理和流计算4. 数据通信和协同 三、技术融合1. Kafka与Hadoop生态技术的融合1) 使用Kafka作为Hadoop的数据源2) 使用Hadoop作为Kafka的消费者 2. Kafka与Spark、Flink等流处理框架的融合3. Kafka与Elasticsearch等日志搜索引擎的融合 四、性能优化1. Kafka性能调优主要过程2. 生产者性能调优2.1 批量发送消息2.2 压缩消息2.3 异步发送消息 3. 消费者性能调优3.1 提高分区数和消费者数3.2 提高拉取数据大小3.3 提高拉取数据间隔 4. 集群性能调优4.1 提高恢复速度4.2 分配分区均衡 五、存储管理1. 消息压缩配置2. 存储清理策略3. 消息存储和检索原理 六、Kafka安全性1. 认证、授权和加密2. SSL/TLS加密保证数据传输安全3. ACL机制实现细粒度授权管理 七、Kafka管理工具1. ZooKeeper集群管理工具1.1 ZooKeeper简介1.2 ZooKeeper与Kafka 2. Kafka自带的管理工具2.1 Kafka Manager2.2 Kafka Connect 3. 第三方Kafka监控工具3.1 Burrow3.2 Kafka Web Console 一、Kafka简介
1. 基础概念
Kafka是一种高可用的分布式消息系统主要负责支持在不同应用程序之间进行可靠且持续的消息传输。这一过程中消息数据的分摊、均衡和存储都是由Kafka负责完成的。
2. Kafka的主要功能
Kafka的主要功能包括消息的生产和消费。在消息生产方面Kafka支持将消息发送到多个接收端实现了应用程序之间的高效传输在消息消费方面Kafka可以对消费进度进行控制确保每个消费者都能够按照其预期的方式接收到消息。
3. Kafka的特点
Kafka具有如下几个特点
高可用支持分区和副本机制可以保障高可用性。高伸缩性支持水平扩展可支持PB级别的数据处理。持久性消息被持久化到磁盘上并且在一定时间内不会失效。高性能Kafka的IO实现采用了顺序读写的方式有很高的读写速度能够满足高吞吐量的需求。多语言支持Kafka提供了诸如Java、C、Python等多种语言的API适用于各种不同的开发场景。
二、应用场景
1. 数据采集和消费
Kafka作为一种高效的消息传输机制在数据采集过程中有着广泛的应用。数据生产者可以将原始的数据发送到Kafka中各种数据消费者再通过Kafka进行消费从而构建起一个完整的数据采集和传输系统。
下面展示如何对Kafka进行生产和消费操作
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.clients.consumer.*;public class KafkaDemo {public static void main(String[] args) throws Exception {// 生产者Properties producerProps new Properties();producerProps.put(bootstrap.servers, localhost:9092);producerProps.put(key.serializer, org.apache.kafka.common.serialization.StringSerializer);producerProps.put(value.serializer, org.apache.kafka.common.serialization.StringSerializer);ProducerString, String producer new KafkaProducer(producerProps);producer.send(new ProducerRecord(test_topic, key, value));// 消费者Properties consumerProps new Properties();consumerProps.put(bootstrap.servers, localhost:9092);consumerProps.put(key.deserializer, org.apache.kafka.common.serialization.StringDeserializer);consumerProps.put(value.deserializer, org.apache.kafka.common.serialization.StringDeserializer);ConsumerString, String consumer new KafkaConsumer(consumerProps);consumer.subscribe(Arrays.asList(test_topic));while (true) {ConsumerRecordsString, String records consumer.poll(100);for (ConsumerRecordString, String record : records) {System.out.println(record.key() : record.value());}}}
}2. 数据存储和持久化
Kafka还可以作为一种高效的数据存储和持久化机制。利用Kafka提供的持久化机制可以将不同类型的数据以日志形式存储到Kafka Broker中并在需要的时候进行查找、检索。
3. 实时数据处理和流计算
Kafka通过支持流数据架构(Streaming Data Architecture)来进行实时数据处理和流计算。用户可以使用Kafka Streams API来实现实时应用程序同时Kafka也支持一些流式处理框架如Storm和Flink的集成。
4. 数据通信和协同
Kafka作为一种强大的消息队列系统可以支持不同分布式组件之间的数据通信和协同。例如用户可以使用Kafka将数据发送到各个端点从而实现不同组件之间的互动。
三、技术融合
1. Kafka与Hadoop生态技术的融合
Kafka作为一个高吞吐量的分布式发布-订阅消息系统可以很好地与Hadoop生态技术融合。常用的两种方式为
1) 使用Kafka作为Hadoop的数据源
我们可以将Kafka作为Hadoop的数据源用于数据采集、数据传输等场景
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.kafka.*;
import kafka.serializer.StringDecoder;public class KafkaStreamingApp {public static void main(String[] args) throws Exception {String brokers localhost:9092;String topics testTopic;SparkConf conf new SparkConf().setAppName(KafkaStreamingApp).setMaster(local[2]);JavaStreamingContext context new JavaStreamingContext(conf, Durations.seconds(10));MapString, String kafkaParams new HashMap();kafkaParams.put(metadata.broker.list, brokers);SetString topicsSet new HashSet(Arrays.asList(topics.split(,)));JavaPairInputDStreamString, String stream KafkaUtils.createDirectStream(context,String.class,String.class,StringDecoder.class,StringDecoder.class,kafkaParams,topicsSet);stream.print();context.start();context.awaitTermination();}
}2) 使用Hadoop作为Kafka的消费者
在Hadoop中使用Kafka作为数据源后我们还可以将Hadoop中的数据通过Kafka发送给其他消费者
import org.apache.kafka.clients.producer.*;
import java.util.Properties;public class KafkaProducerExample {public static void main(String[] args) throws Exception{String topicName testTopic;String key Key1;String value Value-99;Properties props new Properties();props.put(bootstrap.servers, localhost:9092);props.put(key.serializer,org.apache.kafka.common.serialization.StringSerializer); props.put(value.serializer, org.apache.kafka.common.serialization.StringSerializer);ProducerString, String producer new KafkaProducer(props);ProducerRecordString, String record new ProducerRecord(topicName,key,value);producer.send(record);producer.close();System.out.println(A message has been successfully sent!);}
}2. Kafka与Spark、Flink等流处理框架的融合
Kafka可以很好地与流处理框架如Spark、Flink等进行融合。在这些流处理框架中Kafka被广泛用于数据输入源和输出源并且具有高效率和稳定性。下面以Spark Streaming为例
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka.*;
import kafka.serializer.DefaultDecoder;
import scala.Tuple2;import java.util.*;public class KafkaStreamingApp {public static void main(String[] args) throws InterruptedException {String brokers localhost:9092;//设置Kafka连接信息 String topics testTopic;//设置订阅的主题名称 SparkConf conf new SparkConf().setAppName(KafkaStreamingApp).setMaster(local[2]);//设置Spark配置信息 JavaStreamingContext jssc new JavaStreamingContext(conf, new Duration(2000));//设置数据流间隔时间 MapString, String kafkaParams new HashMapString, String();kafkaParams.put(metadata.broker.list, brokers);//设置连接的Kafka Broker地址列表SetString topicsSet new HashSetString(Arrays.asList(topics.split(,)));//设置订阅主题集合 JavaPairInputDStreambyte[], byte[] messages KafkaUtils.createDirectStream(jssc,byte[].class,byte[].class,DefaultDecoder.class,DefaultDecoder.class,kafkaParams,topicsSet);//创建输入数据流 JavaDStreamString lines messages.map(new FunctionTuple2byte[], byte[], String() {public String call(Tuple2byte[], byte[] tuple2) {//将元组转化为字符串 return new String(tuple2._2());}});lines.print();//打印数据流中的数据 jssc.start();//开始运行Spark Streaming应用程序 jssc.awaitTermination();//等待应用程序终止}
}3. Kafka与Elasticsearch等日志搜索引擎的融合
Kafka可以很好地与日志搜索引擎如Elasticsearch进行融合用于实时处理和搜索。在使用过程中我们需要使用Kafka Connect来连接Kafka和Elasticsearch并对数据进行批量处理和导入具体代码如下
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;import java.util.*;public class ElasticsearchSinkExample implements SinkConnector {private MapString, String configProps;public void start(MapString, String props) {this.configProps props;}public Class? extends Task taskClass() {return ElasticsearchSinkTask.class;}public ListMapString, String taskConfigs(int i) {ListMapString, String configs new ArrayList(i);for (int x 0; x i; x) {configs.add(configProps);}return configs;}public void stop() {}public ConfigDef config() {return new ConfigDef();}public String version() {return 1;}public static class ElasticsearchSinkTask extends SinkTask {private String hostname;private int port;private String indexPrefix;public String version() {return 1;}public void start(MapString, String props) {hostname props.get(address);port Integer.parseInt(props.get(port));indexPrefix props.get(index);// Connect to Elasticsearch and create index if not exists//...}public void put(CollectionSinkRecord records) {// Convert records to JSONSchema schema SchemaBuilder.struct().name(record).version(1).field(id, Schema.STRING_SCHEMA).field(name, Schema.STRING_SCHEMA).field(age, Schema.INT32_SCHEMA).build()JsonConverter converter new JsonConverter();converter.configure(new HashMap(), false);ListMapString, String convertedRecords new ArrayList(records.size());for (SinkRecord record: records) {String json converter.fromConnectData(topic, schema, record.value())MapString, String convertedRecord new HashMap();convertedRecord.put(id, record.key().toString());convertedRecord.put(json, json);convertedRecords.add(convertedRecord);}// Write records to Elasticsearch//...}public void flush(MapTopicPartition, OffsetAndMetadata offsets) {}public void stop() {}}
}四、性能优化
Kafka在高并发、大流量场景下需要进行性能优化才能保障其稳定性和可靠性。本文将讨论Kafka的性能调优过程包括生产者、消费者、以及集群中的性能参数调整。
1. Kafka性能调优主要过程
Kafka性能调优主要分为以下两个过程
确定当前瓶颈在进行任何性能调优之前首先需要明确当前的瓶颈是什么。比如是由于网络传输速度慢导致的吞吐量下降还是由于消息生产和消费速度不匹配造成的堆积情况。调整性能参数当明确了当前的瓶颈之后就需要根据具体情况进行性能参数调整在优化瓶颈的同时提高Kafka的性能。
2. 生产者性能调优
2.1 批量发送消息
生产者在向Kafka发送消息时可以一次性发送多条消息而不是一条一条地发送。这样可以减少网络传输和IO的开销从而提高吞吐量。我们可以通过设置batch.size参数来控制批量发送的消息数量。
2.2 压缩消息
压缩消息也能大幅降低网络传输的开销从而提高吞吐量。Kafka支持多种压缩算法如gzip、snappy和lz4等。我们可以通过设置compression.type参数来控制要使用哪种压缩算法。
2.3 异步发送消息
在生产者发送消息时可以选择同步方式和异步方式同步方式会阻塞线程直到消息发送成功而异步方式则不会。异步方式可以极大地提高吞吐量但是会增加消息传递时失败的风险。我们可以通过设置linger.ms参数来调整异步发送消息的时间间隔。
3. 消费者性能调优
3.1 提高分区数和消费者数
在Kafka的消费者群组中每个消费者实例只能处理某些分区的消息如果某个分区数量过多可能会影响消费者的效率。我们可以通过增加分区数和消费者数来提高消费者的效率。
3.2 提高拉取数据大小
在消费者获取数据的过程中每次拉取的数据大小也会对性能产生影响一般情况下拉取的数据越多消费者的吞吐量就越高。我们可以通过设置 fetch.max.bytes 参数来增大每次拉取数据的大小。
3.3 提高拉取数据间隔
在消费者获取数据的过程中每个消费者拉取数据的间隔也是可以调整的。我们可以通过设置 fetch.max.wait.ms 参数来调整拉取数据的时间间隔。
4. 集群性能调优
4.1 提高恢复速度
Kafka的集群由多个Broker组成在其中一个Broker宕机时Kafka需要进行数据的恢复工作。为了提高恢复速度我们可以采用同步副本的方式使得副本和主节点之间的数据一致性更加保障在主节点宕机时能够快速切换到副本节点上。
4.2 分配分区均衡
在Kafka的集群中存在多个Broker和多个Topic为了保证各个Broker上的分区数相对均衡我们可以使用Kafka的工具包来处理分区的分配问题确保每个Broker负载均衡。
五、存储管理
1. 消息压缩配置
在 Kafka 中可以对消息进行压缩以节省磁盘空间和网络带宽。Kafka 支持多种压缩算法包括 GZIP、Snappy 和 LZ4。当 Kafka Broker 接收到写入的消息时可以根据 Kafka Producer 的配置对消息进行压缩。当 Consumer 从 Kafka 中读取消息时Kafka 会自动解压缩消息并将其传递给 Consumer 进行处理。
以下是一个使用 Java API 配置消息压缩的示例
Properties props new Properties();
props.put(compression.type, gzip); // 设置压缩类型为 GZIP
ProducerString, String producer new KafkaProducer(props, new StringSerializer(), new StringSerializer());2. 存储清理策略
Kafka 的消息是存储在 Broker 上的如果消息不断写入而不进行删除会导致磁盘空间占用越来越大。因此Kafka 提供了几种存储清理策略来控制 Broker 上的消息存储量。
Kafka 支持两种存储清理策略删除策略和压缩策略。删除策略会删除一些老的或过期的消息从而释放磁盘空间而压缩策略则会对数据进行压缩进一步节省磁盘空间。
以下是一个使用 Java API 配置存储清理策略的示例
Properties props new Properties();
props.put(log.cleanup.policy, delete); // 设置清理策略为删除策略
props.put(log.retention.hours, 24); // 设置留存的小时数为 24 小时
props.put(log.cleanup.policy, compact); // 设置清理策略为压缩策略
ProducerString, String producer new KafkaProducer(props, new StringSerializer(), new StringSerializer());3. 消息存储和检索原理
Kafka 的消息存储和检索原理非常简单。在 Kafka 中每个 Topic 都被分成多个 Partition。当 Producer 往某个 Topic 写入消息时Kafka 会将消息存储到该 Topic 指定的一个或多个 Partition 上。每个 Partition 中的消息都按顺序进行存储并且每个消息都有一个唯一的 offset。Consumer 可以从任意 offset 开始读取消息这使得 Consumer 可以逐条处理消息、实现重复消费等功能。
Kafka 的消息存储方式为日志型存储。在 Kafka 中每个 Partition 都维护了一个消息日志Message Log该日志是一个按时间排序的消息集合所有的消息都以追加方式写入该日志中。由于消息的写入操作只涉及追加操作而不涉及修改和删除操作因此能够实现高效的数据写入和读取。
Kafka 通过 mmap 操作将消息日志映射到内存中从而实现高效的消息读取。此外Kafka 在数据组织上采用了时间序列索引的方式可以快速地定位某个 offset 对应的消息。
六、Kafka安全性
Kafka提供了多种安全特性包括认证、授权和加密。其中SSL/TLS加密用于保证数据传输的安全ACL机制实现了细粒度的授权管理。
1. 认证、授权和加密
在Kafka中认证、授权和加密是允许配置的。其中认证和授权的实现主要依赖于JaasJava Authentication and Authorization Service框架而加密使用SSL/TLS协议。
认证可以使用Kafka内置的认证方式或者自定义的方式例如使用LDAP或者Kerberos等。授权则是通过ACLaccess control lists机制实现的用于限制用户对Kafka集群、topic、partition等资源的访问权限。
2. SSL/TLS加密保证数据传输安全
Kafka提供了SSL/TLS协议加密数据传输在网络传输过程中保护数据的安全性。可以通过配置SSL证书、私钥等参数启用SSL/TLS功能。
下面是一个使用SSL/TLS传输数据的示例代码
Properties props new Properties();
props.setProperty(bootstrap.servers, kafka1.example.com:9093);
props.setProperty(security.protocol, SSL);
props.setProperty(ssl.truststore.location, /path/to/truststore);
props.setProperty(ssl.truststore.password, password);ProducerString, String producer new KafkaProducer(props, new StringSerializer(), new StringSerializer());producer.send(new ProducerRecord(my-topic, my-message));在此示例代码中我们设置了security.protocol为“SSL”并指定了SSL证书所在的路径和密码。通过这些配置我们可以使用SSL/TLS协议传输数据。
3. ACL机制实现细粒度授权管理
Kafka的ACL机制提供了细粒度的授权管理可以限制用户对不同资源的访问权限。ACL机制是基于资源的可以对Kafka集群、topic、partition等资源进行授权管理。
下面是一个使用ACL机制进行授权管理的示例代码
Properties props new Properties();
props.setProperty(bootstrap.servers, kafka1.example.com:9092);AdminClient adminClient KafkaAdminClient.create(props);ResourcePattern pattern new ResourcePattern(ResourceType.TOPIC, my-topic);
AccessControlEntry entry new AccessControlEntry(User:alice, *, AclOperation.READ, AclPermissionType.ALLOW);
KafkaPrincipal principal new KafkaPrincipal(KafkaPrincipal.USER_TYPE, alice);
Resource resource new Resource(pattern.resourceType().name(), pattern.name());SetAclBinding acls new HashSet();
acls.add(new AclBinding(resource, entry));adminClient.createAcls(acls);在此示例代码中我们创建了一个名为“my-topic”的topic并为用户“alice”授予了对该topic的读取权限。使用ACL机制我们可以实现对用户在Kafka集群中的操作进行细粒度的控制和管理。
七、Kafka管理工具
1. ZooKeeper集群管理工具
1.1 ZooKeeper简介
ZooKeeper是一个分布式的开放源代码的分布式应用程序协调服务它是Google的Chubby一个开源的实现是Hadoop和Kafka等分布式系统的重要组件之一。
1.2 ZooKeeper与Kafka
在Kafka中ZooKeeper负责管理broker的状态信息以及选举controller管理Topic和Partition的元数据等。ZooKeeper对于Kafka的正常运行至关重要一旦ZooKeeper出现异常或故障会导致Kafka集群不可用。
2. Kafka自带的管理工具
2.1 Kafka Manager
Kafka Manager是由Yahoo开发的一个基于Web的Kafka管理工具能够方便地查看和管理Kafka集群中的Topic、Broker、Partition等信息。用户可以使用Kafka Manager来监控Kafka集群的健康状况并根据需要对其进行配置和管理。
2.2 Kafka Connect
Kafka Connect是Kafka提供的一个统一的数据传输框架用于将Kafka与其他数据源或数据存储系统进行连接。用户可以通过Kafka Connect快速地从数据源或到数据存储系统中读取或写入数据并将数据直接注入到Kafka中。
3. 第三方Kafka监控工具
3.1 Burrow
Burrow是由Linkedin开发的一个先进的Kafka Consumer监控工具可以用于监控和管理Kafka Consumer Group的健康状况以及消息处理的进度状态等信息。Burrow能够提供非常详细的分区信息、已消费消息数量、剩余消息数量等数据信息。
3.2 Kafka Web Console
Kafka Web Console是一款免费、开源、基于Web的Kafka管理工具可以方便地查看和管理Kafka集群的Topic、Broker、Partition等信息。用户可以通过Kafka Web Console随时了解集群的状态并进行相关配置和管理操作。 文章转载自: http://www.morning.lthgy.cn.gov.cn.lthgy.cn http://www.morning.rmpkn.cn.gov.cn.rmpkn.cn http://www.morning.xhqwm.cn.gov.cn.xhqwm.cn http://www.morning.lyhry.cn.gov.cn.lyhry.cn http://www.morning.clpfd.cn.gov.cn.clpfd.cn http://www.morning.fjzlh.cn.gov.cn.fjzlh.cn http://www.morning.fbqr.cn.gov.cn.fbqr.cn http://www.morning.tktcr.cn.gov.cn.tktcr.cn http://www.morning.rrhfy.cn.gov.cn.rrhfy.cn http://www.morning.lnsnyc.com.gov.cn.lnsnyc.com http://www.morning.hpmzs.cn.gov.cn.hpmzs.cn http://www.morning.drzkk.cn.gov.cn.drzkk.cn http://www.morning.hncrc.cn.gov.cn.hncrc.cn http://www.morning.tbjb.cn.gov.cn.tbjb.cn http://www.morning.drwpn.cn.gov.cn.drwpn.cn http://www.morning.lwwnq.cn.gov.cn.lwwnq.cn http://www.morning.xpzrx.cn.gov.cn.xpzrx.cn http://www.morning.kjyfq.cn.gov.cn.kjyfq.cn http://www.morning.qbwbs.cn.gov.cn.qbwbs.cn http://www.morning.yxdrf.cn.gov.cn.yxdrf.cn http://www.morning.ytnn.cn.gov.cn.ytnn.cn http://www.morning.rjrz.cn.gov.cn.rjrz.cn http://www.morning.zylrk.cn.gov.cn.zylrk.cn http://www.morning.xjtnp.cn.gov.cn.xjtnp.cn http://www.morning.grjh.cn.gov.cn.grjh.cn http://www.morning.qinhuangdjy.cn.gov.cn.qinhuangdjy.cn http://www.morning.kyhnl.cn.gov.cn.kyhnl.cn http://www.morning.rpkg.cn.gov.cn.rpkg.cn http://www.morning.srwny.cn.gov.cn.srwny.cn http://www.morning.ykgp.cn.gov.cn.ykgp.cn http://www.morning.gpnfg.cn.gov.cn.gpnfg.cn http://www.morning.qgjp.cn.gov.cn.qgjp.cn http://www.morning.ngcth.cn.gov.cn.ngcth.cn http://www.morning.rbkml.cn.gov.cn.rbkml.cn http://www.morning.rmxgk.cn.gov.cn.rmxgk.cn http://www.morning.mrccd.cn.gov.cn.mrccd.cn http://www.morning.sxbgc.cn.gov.cn.sxbgc.cn http://www.morning.dmzfz.cn.gov.cn.dmzfz.cn http://www.morning.jhgxh.cn.gov.cn.jhgxh.cn http://www.morning.xqmd.cn.gov.cn.xqmd.cn http://www.morning.mytmx.cn.gov.cn.mytmx.cn http://www.morning.yqrfn.cn.gov.cn.yqrfn.cn http://www.morning.znpyw.cn.gov.cn.znpyw.cn http://www.morning.cwrpd.cn.gov.cn.cwrpd.cn http://www.morning.nmnhs.cn.gov.cn.nmnhs.cn http://www.morning.srbsr.cn.gov.cn.srbsr.cn http://www.morning.rdnjc.cn.gov.cn.rdnjc.cn http://www.morning.zsgbt.cn.gov.cn.zsgbt.cn http://www.morning.stlgg.cn.gov.cn.stlgg.cn http://www.morning.bkqw.cn.gov.cn.bkqw.cn http://www.morning.wlqll.cn.gov.cn.wlqll.cn http://www.morning.leeong.com.gov.cn.leeong.com http://www.morning.wgzzj.cn.gov.cn.wgzzj.cn http://www.morning.gwmjy.cn.gov.cn.gwmjy.cn http://www.morning.znnsk.cn.gov.cn.znnsk.cn http://www.morning.jtjmz.cn.gov.cn.jtjmz.cn http://www.morning.ryysc.cn.gov.cn.ryysc.cn http://www.morning.tyjnr.cn.gov.cn.tyjnr.cn http://www.morning.tlbhq.cn.gov.cn.tlbhq.cn http://www.morning.swkpq.cn.gov.cn.swkpq.cn http://www.morning.ytnn.cn.gov.cn.ytnn.cn http://www.morning.fkwp.cn.gov.cn.fkwp.cn http://www.morning.qztsq.cn.gov.cn.qztsq.cn http://www.morning.mdpkf.cn.gov.cn.mdpkf.cn http://www.morning.hbqhz.cn.gov.cn.hbqhz.cn http://www.morning.pjrgb.cn.gov.cn.pjrgb.cn http://www.morning.qwdqq.cn.gov.cn.qwdqq.cn http://www.morning.mmxt.cn.gov.cn.mmxt.cn http://www.morning.wxgd.cn.gov.cn.wxgd.cn http://www.morning.cdlewan.com.gov.cn.cdlewan.com http://www.morning.qpzjh.cn.gov.cn.qpzjh.cn http://www.morning.kndt.cn.gov.cn.kndt.cn http://www.morning.qkqzm.cn.gov.cn.qkqzm.cn http://www.morning.fhtbk.cn.gov.cn.fhtbk.cn http://www.morning.xcnwf.cn.gov.cn.xcnwf.cn http://www.morning.shyqcgw.cn.gov.cn.shyqcgw.cn http://www.morning.hxwhyjh.com.gov.cn.hxwhyjh.com http://www.morning.gbwfx.cn.gov.cn.gbwfx.cn http://www.morning.rngyq.cn.gov.cn.rngyq.cn http://www.morning.mm27.cn.gov.cn.mm27.cn