当前位置: 首页 > news >正文

烟台广告公司网站建设p2p网站建设公司哪家好

烟台广告公司网站建设,p2p网站建设公司哪家好,wordpress禁止用户留言,建设网站收费标准文章目录 一、Docker安装Kafka1. 创建网络2. 安装zookeeper3. 安装Kafka 二、Kafka介绍1. Kafka简介 三、SpringBoot整合Kafka1. 引入pom依赖2. application.propertise配置3. Hello Kafka(Producer)4. Consumer Kafka5. 带回调的生产者6. 自定义分区器7. kafka事务提交8. 指定… 文章目录 一、Docker安装Kafka1. 创建网络2. 安装zookeeper3. 安装Kafka 二、Kafka介绍1. Kafka简介 三、SpringBoot整合Kafka1. 引入pom依赖2. application.propertise配置3. Hello Kafka(Producer)4. Consumer Kafka5. 带回调的生产者6. 自定义分区器7. kafka事务提交8. 指定topic、partition、offset消费9. ConsumerAwareListenerErrorHandler 异常处理器10. 消息过滤器11. 消息转发12. 定时启动、停止监听器 一、Docker安装Kafka 1. 创建网络 app-tier网络名称 –driver网络类型为bridge docker network create app-tier --driver bridge2. 安装zookeeper Kafka依赖zookeeper所以先安装zookeeper -p设置映射端口默认2181 -d后台启动 docker run -d --name zookeeper-server \--network app-tier \-e ALLOW_ANONYMOUS_LOGINyes \bitnami/zookeeper:latest3. 安装Kafka 安装并运行Kafka –name容器名称 -p设置映射端口默认9092 -d后台启动 ALLOW_PLAINTEXT_LISTENER任何人可以访问 KAFKA_CFG_ZOOKEEPER_CONNECT链接的zookeeper KAFKA_ADVERTISED_HOST_NAME当前主机IP或地址重点如果是服务器部署则配服务器IP或域名否则客户端监听消息会报地址错误 -e KAFKA_CFG_ADVERTISED_LISTENERSPLAINTEXT://192.168.0.101:9092 \ docker run -d --name kafka-server \--network app-tier \-p 9092:9092 \-e ALLOW_PLAINTEXT_LISTENERyes \-e KAFKA_CFG_ZOOKEEPER_CONNECTzookeeper-server:2181 \-e KAFKA_CFG_ADVERTISED_LISTENERSPLAINTEXT://127.0.0.1:9092 \bitnami/kafka:latest二、Kafka介绍 1. Kafka简介 Kafka是一种消息队列主要用来处理大量数据状态下的消息队列一般用来做日志的处理。既然是消息队列那么Kafka也就拥有消息队列的相应的特性了。 消息队列的好处 解耦合 耦合的状态表示当你实现某个功能的时候是直接接入当前接口而利用消息队列可以将相应的消息发送到消息队列这样的话如果接口出了问题将不会影响到当前的功能。 异步处理 异步处理替代了之前的同步处理异步处理不需要让流程走完就返回结果可以将消息发送到消息队列中然后返回结果剩下让其他业务处理接口从消息队列中拉取消费处理即可。 流量削峰 高流量的时候使用消息队列作为中间件可以将流量的高峰保存在消息队列中从而防止了系统的高请求减轻服务器的请求处理压力。 Kafka像其他Mq一样也有自己的基础架构主要存在生产者Producer、Kafka集群Broker、消费者Consumer、注册消息Zookeeper. Producer消息生产者向Kafka中发布消息的角色。Consumer消息消费者即从Kafka中拉取消息消费的客户端。Consumer Group消费者组消费者组则是一组中存在多个消费者消费者消费Broker中当前Topic的不同分区中的消息消费者组之间互不影响所有的消费者都属于某个消费者组即消费者组是逻辑上的一个订阅者。某一个分区中的消息只能够一个消费者组中的一个消费者所消费Broker经纪人一台Kafka服务器就是一个Broker一个集群由多个Broker组成一个Broker可以容纳多个Topic。Topic主题可以理解为一个队列生产者和消费者都是面向一个TopicPartition分区为了实现扩展性一个非常大的Topic可以分布到多个Broker上一个Topic可以分为多个Partition每个Partition是一个有序的队列(分区有序不能保证全局有序)Replica副本Replication为保证集群中某个节点发生故障节点上的Partition数据不丢失Kafka可以正常的工作Kafka提供了副本机制一个Topic的每个分区有若干个副本一个Leader和多个FollowerLeader每个分区多个副本的主角色生产者发送数据的对象以及消费者消费数据的对象都是Leader。Follower每个分区多个副本的从角色实时的从Leader中同步数据保持和Leader数据的同步Leader发生故障的时候某个Follower会成为新的Leader。 Kafka 基础概念 生产者与消费者 对于 Kafka 来说客户端有两种基本类型生产者Producer和消费者Consumer。除此之外还有用来做数据集成的 Kafka Connect API 和流式处理的 Kafka Streams 等高阶客户端但这些高阶客户端底层仍然是生产者和消费者API它们只不过是在上层做了封装。 这很容易理解生产者也称为发布者创建消息而消费者也称为订阅者负责消费or读取消息。 主题Topic与分区Partition 在 Kafka 中消息以主题Topic来分类每一个主题都对应一个「消息队列」这有点儿类似于数据库中的表。但是如果我们把所有同类的消息都塞入到一个“中心”队列中势必缺少可伸缩性无论是生产者/消费者数目的增加还是消息数量的增加都可能耗尽系统的性能或存储。 我们使用一个生活中的例子来说明现在 A 城市生产的某商品需要运输到 B 城市走的是公路那么单通道的高速公路不论是在「A 城市商品增多」还是「现在 C 城市也要往 B 城市运输东西」这样的情况下都会出现「吞吐量不足」的问题。所以我们现在引入分区Partition的概念类似“允许多修几条道”的方式对我们的主题完成了水平扩展。 Broker 和集群Cluster 一个 Kafka 服务器也称为 Broker它接受生产者发送的消息并存入磁盘Broker 同时服务消费者拉取分区消息的请求返回目前已经提交的消息。使用特定的机器硬件一个 Broker 每秒可以处理成千上万的分区和百万量级的消息。现在动不动就百万量级…我特地去查了一把好像确实集群的情况下吞吐量挺高的…摁… 若干个 Broker 组成一个集群Cluster其中集群内某个 Broker 会成为集群控制器Cluster Controller它负责管理集群包括分配分区到 Broker、监控 Broker 故障等。在集群内一个分区由一个 Broker 负责这个 Broker 也称为这个分区的 Leader当然一个分区可以被复制到多个 Broker 上来实现冗余这样当存在 Broker 故障时可以将其分区重新分配到其他 Broker 来负责。下图是一个样例 Kafka 的一个关键性质是日志保留retention我们可以配置主题的消息保留策略譬如只保留一段时间的日志或者只保留特定大小的日志。当超过这些限制时老的消息会被删除。我们也可以针对某个主题单独设置消息过期策略这样对于不同应用可以实现个性化。 多集群 随着业务发展我们往往需要多集群通常处于下面几个原因 基于数据的隔离 基于安全的隔离 多数据中心容灾 当构建多个数据中心时往往需要实现消息互通。举个例子假如用户修改了个人资料那么后续的请求无论被哪个数据中心处理这个更新需要反映出来。又或者多个数据中心的数据需要汇总到一个总控中心来做数据分析。 上面说的分区复制冗余机制只适用于同一个 Kafka 集群内部对于多个 Kafka 集群消息同步可以使用 Kafka 提供的 MirrorMaker 工具。本质上来说MirrorMaker 只是一个 Kafka 消费者和生产者并使用一个队列连接起来而已。它从一个集群中消费消息然后往另一个集群生产消息。 三、SpringBoot整合Kafka 1. 引入pom依赖 dependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactId /dependency2. application.propertise配置 ###########【Kafka集群】########### spring.kafka.bootstrap-servers112.126.74.249:9092,112.126.74.249:9093 ###########【初始化生产者配置】########### # 重试次数 spring.kafka.producer.retries0 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1) spring.kafka.producer.acks1 # 批量大小 spring.kafka.producer.batch-size16384 # 提交延时 spring.kafka.producer.properties.linger.ms0 # 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka # linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了 ​ # 生产端缓冲区大小 spring.kafka.producer.buffer-memory 33554432 # Kafka提供的序列化和反序列化类 spring.kafka.producer.key-serializerorg.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializerorg.apache.kafka.common.serialization.StringSerializer # 自定义分区器 # spring.kafka.producer.properties.partitioner.classcom.felix.kafka.producer.CustomizePartitioner ​ ###########【初始化消费者配置】########### # 默认的消费组ID spring.kafka.consumer.properties.group.iddefaultConsumerGroup # 是否自动提交offset spring.kafka.consumer.enable-auto-committrue # 提交offset延时(接收到消息后多久提交offset) spring.kafka.consumer.auto.commit.interval.ms1000 # 当kafka中没有初始offset或offset超出范围时将自动重置offset # earliest:重置为分区中最小的offset; # latest:重置为分区中最新的offset(消费分区中新产生的数据); # none:只要有一个分区不存在已提交的offset,就抛出异常; spring.kafka.consumer.auto-offset-resetlatest # 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作) spring.kafka.consumer.properties.session.timeout.ms120000 # 消费请求超时时间 spring.kafka.consumer.properties.request.timeout.ms180000 # Kafka提供的序列化和反序列化类 spring.kafka.consumer.key-deserializerorg.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializerorg.apache.kafka.common.serialization.StringDeserializer # 消费端监听的topic不存在时项目启动会报错(关掉) spring.kafka.listener.missing-topics-fatalfalse # 设置批量消费 # spring.kafka.listener.typebatch # 批量消费每次最多消费多少条消息 # spring.kafka.consumer.max-poll-records503. Hello Kafka(Producer) 创建生产者用于给Kafka发送消息 RestController public class KafkaProducer {Autowiredprivate KafkaTemplateString, Object kafkaTemplate; ​// 发送消息GetMapping(/kafka/normal/{message})public void sendMessage1(PathVariable(message) String normalMessage) {kafkaTemplate.send(topic1, normalMessage);} }4. Consumer Kafka 消费者 Component public class KafkaConsumer {// 消费监听KafkaListener(topics {topic1})public void onMessage1(ConsumerRecord?, ? record){// 消费的哪个topic、partition的消息,打印出消息内容System.out.println(简单消费record.topic()-record.partition()-record.value());} }上面示例创建了一个生产者发送消息到topic1消费者监听topic1消费消息。监听器用KafkaListener注解topics表示监听的topic支持同时监听多个用英文逗号分隔。启动项目postman调接口触发生产者发送消息 5. 带回调的生产者 kafkaTemplate提供了一个回调方法addCallback我们可以在回调方法中监控消息是否发送成功 或 失败时做补偿处理. GetMapping(/kafka/callbackOne/{message}) public void sendMessage2(PathVariable(message) String callbackMessage) {kafkaTemplate.send(topic1, callbackMessage).addCallback(success - {// 消息发送到的topicString topic success.getRecordMetadata().topic();// 消息发送到的分区int partition success.getRecordMetadata().partition();// 消息在分区内的offsetlong offset success.getRecordMetadata().offset();System.out.println(发送消息成功: topic - partition - offset);}, failure - {System.out.println(发送消息失败: failure.getMessage());}); }6. 自定义分区器 我们知道kafka中每个topic被划分为多个分区那么生产者将消息发送到topic时具体追加到哪个分区呢这就是所谓的分区策略Kafka 为我们提供了默认的分区策略同时它也支持自定义分区策略。其路由机制为 ① 若发送消息时指定了分区即自定义分区策略则直接将消息append到指定分区 ② 若发送消息时未指定 patition但指定了 keykafka允许为每条消息设置一个key则对key值进行hash计算根据计算结果路由到指定分区这种情况下可以保证同一个 Key 的所有消息都进入到相同的分区 ③ patition 和 key 都未指定则使用kafka默认的分区策略轮询选出一个 patition ※ 我们来自定义一个分区策略将消息发送到我们指定的partition首先新建一个分区器类实现Partitioner接口重写方法其中partition方法的返回值就表示将消息发送到几号分区 public class CustomizePartitioner implements Partitioner {Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 自定义分区规则(这里假设全部发到0号分区)// ......return 0;} ​Overridepublic void close() { ​} ​Overridepublic void configure(MapString, ? configs) { ​} }在application.propertise中配置自定义分区器配置的值就是分区器类的全路径名 spring.kafka.producer.properties.partitioner.classcom.example.kafkademo.conf.CustomizePartitioner7. kafka事务提交 如果在发送消息时需要创建事务可以使用 KafkaTemplate 的 executeInTransaction 方法来声明事务. GetMapping(/kafka/transaction) public void sendMessage7(){// 声明事务后面报错消息不会发出去kafkaTemplate.executeInTransaction(operations - {operations.send(topic1,test executeInTransaction);throw new RuntimeException(fail);}); ​// 不声明事务后面报错但前面消息已经发送成功了kafkaTemplate.send(topic1,test executeInTransaction);throw new RuntimeException(fail); }8. 指定topic、partition、offset消费 /*** Title 指定topic、partition、offset消费* Description 同时监听topic1和topic2监听topic1的0号分区、topic2的 0号和1号 分区指向1号分区的offset初始值为8* Author long.yuan* Date 2020/3/22 13:38* Param [record]* return void**/ KafkaListener(id consumer1,groupId felix-group,topicPartitions {TopicPartition(topic topic1, partitions { 0 }),TopicPartition(topic topic2, partitions 0, partitionOffsets PartitionOffset(partition 1, initialOffset 8)) }) public void onMessage2(ConsumerRecord?, ? record) {System.out.println(topic:record.topic()|partition:record.partition()|offset:record.offset()|value:record.value()); }9. ConsumerAwareListenerErrorHandler 异常处理器 通过异常处理器我们可以处理consumer在消费时发生的异常。 新建一个 ConsumerAwareListenerErrorHandler 类型的异常处理方法用Bean注入,BeanName默认就是方法名然后我们将这个异常处理器的BeanName放到KafkaListener注解的errorHandler属性里面当监听抛出异常的时候则会自动调用异常处理器 // 新建一个异常处理器用Bean注入 Bean public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() {return (message, exception, consumer) - {System.out.println(消费异常message.getPayload());return null;}; } ​ // 将这个异常处理器的BeanName放到KafkaListener注解的errorHandler属性里面 KafkaListener(topics {topic1},errorHandler consumerAwareErrorHandler) public void onMessage4(ConsumerRecord?, ? record) throws Exception {throw new Exception(简单消费-模拟异常); } ​ // 批量消费也一样异常处理器的message.getPayload()也可以拿到各条消息的信息 KafkaListener(topics topic1,errorHandlerconsumerAwareErrorHandler) public void onMessage5(ListConsumerRecord?, ? records) throws Exception {System.out.println(批量消费一次...);throw new Exception(批量消费-模拟异常); }10. 消息过滤器 消息过滤器可以在消息抵达consumer之前被拦截在实际应用中我们可以根据自己的业务逻辑筛选出需要的信息再交由KafkaListener处理不需要的消息则过滤掉。 配置消息过滤只需要为 监听器工厂 配置一个RecordFilterStrategy消息过滤策略返回true的时候消息将会被抛弃返回false时消息能正常抵达监听容器。 Component public class KafkaConsumer {AutowiredConsumerFactory consumerFactory; ​// 消息过滤器Beanpublic ConcurrentKafkaListenerContainerFactory filterContainerFactory() {ConcurrentKafkaListenerContainerFactory factory new ConcurrentKafkaListenerContainerFactory();factory.setConsumerFactory(consumerFactory);// 被过滤的消息将被丢弃factory.setAckDiscarded(true);// 消息过滤策略factory.setRecordFilterStrategy(consumerRecord - {if (Integer.parseInt(consumerRecord.value().toString()) % 2 0) {return false;}//返回true消息则被过滤return true;});return factory;} ​// 消息过滤监听KafkaListener(topics {topic1},containerFactory filterContainerFactory)public void onMessage6(ConsumerRecord?, ? record) {System.out.println(record.value());} }11. 消息转发 在实际开发中我们可能有这样的需求应用A从TopicA获取到消息经过处理后转发到TopicB再由应用B监听处理消息即一个应用处理完成后将该消息转发至其他应用完成消息的转发。 在SpringBoot集成Kafka实现消息的转发也很简单只需要通过一个SendTo注解被注解方法的return值即转发的消息内容 /*** Title 消息转发* Description 从topic1接收到的消息经过处理后转发到topic2* Author long.yuan* Date 2020/3/23 22:15* Param [record]* return void**/ KafkaListener(topics {topic1}) SendTo(topic2) public String onMessage7(ConsumerRecord?, ? record) {return record.value()-forward message; }12. 定时启动、停止监听器 默认情况下当消费者项目启动的时候监听器就开始工作监听消费发送到指定topic的消息那如果我们不想让监听器立即工作想让它在我们指定的时间点开始工作或者在我们指定的时间点停止工作该怎么处理呢——使用KafkaListenerEndpointRegistry下面我们就来实现 ① 禁止监听器自启动 ② 创建两个定时任务一个用来在指定时间点启动定时器另一个在指定时间点停止定时器 新建一个定时任务类用注解EnableScheduling声明KafkaListenerEndpointRegistry 在SpringIO中已经被注册为Bean直接注入设置禁止KafkaListener自启动 EnableScheduling Component public class CronTimer { ​/*** KafkaListener注解所标注的方法并不会在IOC容器中被注册为Bean* 而是会被注册在KafkaListenerEndpointRegistry中* 而KafkaListenerEndpointRegistry在SpringIOC中已经被注册为Bean**/Autowiredprivate KafkaListenerEndpointRegistry registry; ​Autowiredprivate ConsumerFactory consumerFactory; ​// 监听器容器工厂(设置禁止KafkaListener自启动)Beanpublic ConcurrentKafkaListenerContainerFactory delayContainerFactory() {ConcurrentKafkaListenerContainerFactory container new ConcurrentKafkaListenerContainerFactory();container.setConsumerFactory(consumerFactory);//禁止KafkaListener自启动container.setAutoStartup(false);return container;} ​// 监听器KafkaListener(idtimingConsumer,topics topic1,containerFactory delayContainerFactory)public void onMessage1(ConsumerRecord?, ? record){System.out.println(消费成功record.topic()-record.partition()-record.value());} ​// 定时启动监听器Scheduled(cron 0 42 11 * * ? )public void startListener() {System.out.println(启动监听器...);// timingConsumer是KafkaListener注解后面设置的监听器ID,标识这个监听器if (!registry.getListenerContainer(timingConsumer).isRunning()) {registry.getListenerContainer(timingConsumer).start();}//registry.getListenerContainer(timingConsumer).resume();} ​// 定时停止监听器Scheduled(cron 0 45 11 * * ? )public void shutDownListener() {System.out.println(关闭监听器...);registry.getListenerContainer(timingConsumer).pause();} }
http://www.tj-hxxt.cn/news/224211.html

相关文章:

  • 梅州企业网站该工具支持 preview 功能
  • 商贸公司的网站建设2021网站建设前景怎么样
  • 四川省建设厅网站打不开网站运营推广该如何做
  • 淘宝便宜的团购网站建设制作网站公司合同注意事项
  • wordpress建的大型网站哪个网站帮别人做ppt
  • 新闻门户网站psd模板泸州网站公司
  • 成都网站建设新线加如何进入wordpress
  • 做个淘宝客网站怎么做的电子商务网站建设方案范文
  • 住房城乡建设部官网站做海报免费素材网站有哪些
  • 模板网站可以优化吗电子请柬网站开发
  • 网站开发佛山哈尔滨网站建设培训学校
  • 国外 素材 网站微信小程序官网网址
  • 电商网站开发分享wordpress 如何添加模板
  • 嘉兴建网站网站模板用什么软件做
  • 服务器搭建网站视频教程重庆电力建设设计公司网站
  • 新网网站十堰网站设计公司
  • 网站用开源cms哔哩哔哩h5播放器
  • 福州p2p网站建设公司网站功能模块是什么
  • 网站开发合同下载京东网站是哪个公司做的
  • 旅游公司网站开发用dw做的网页怎么连到网站上
  • 网站外链建设记住5种外链方式不可用泉州做网站工作室
  • 亚马逊网站做外贸浮雕模东莞网站建设
  • 做网站一个人能做吗wordpress 微信二维码登录
  • 购物网站如何建设廊坊关键词排名推广
  • 专注七星彩网站开发网站建设费如何入帐
  • 学做网站在什么地方学品牌公司设计
  • 网站建设和网络营销温州哪里有网站
  • 住房城乡建设厅网站准考证温州 建网站的公司 新
  • 合肥建设工程交易网站wordpress 默认图片路径
  • frontpage新建网站中国城乡建设部网站证书查询