当前位置: 首页 > news >正文

响应式网站布局怎么给企业制作网站

响应式网站布局,怎么给企业制作网站,wordpress付费阅读chajian,wordpress菜单横排前言 由于 Kafka 的写性能非常高#xff0c;因此项目经常会碰到 Kafka 消息队列拥堵的情况。遇到这种情况#xff0c;我们可以通过并发消费、批量消费的方法进行解决。 一、新建一个maven工程#xff0c;添加kafka依赖 dependencygroupIdorg.springframe…前言 由于 Kafka 的写性能非常高因此项目经常会碰到 Kafka 消息队列拥堵的情况。遇到这种情况我们可以通过并发消费、批量消费的方法进行解决。 一、新建一个maven工程添加kafka依赖 dependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactId /dependency 二、yaml配置文件 spring:kafka:bootstrap-servers: 127.0.0.1:9002producer:key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerconsumer:group-id: test-consumer-group# 当 Broker 端没有 offset如第一次消费或 offset 超过7天过期时如何初始化 offset当收到 OFFSET_OUT_OF_RANGE 错误时如何重置 Offset# earliest表示自动重置到 partition 的最小 offset# latest默认为 latest表示自动重置到 partition 的最大 offset# none不自动进行 offset 重置抛auto-offset-reset: latest# 是否在消费消息后将 offset 同步到 Broker当 Consumer 失败后就能从 Broker 获取最新的 offsetenable-auto-commit: false## 当 auto.commit.enabletrue 时自动提交 Offset 的时间间隔建议设置至少1000auto-commit-interval: 2000max-poll-records: 30heartbeat-interval: 3000key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproperties:# 使用 Kafka 消费分组机制时消费者超时时间。当 Broker 在该时间内没有收到消费者的心跳时认为该消费者故障失败Broker 发起重新 Rebalance 过程。目前该值的配置必须在 Broker 配置group.min.session.timeout.ms6000和group.max.session.timeout.ms300000 之间session.timeout.ms: 60000# 使用 Kafka 消费分组机制时消费者发送心跳的间隔。这个值必须小于 session.timeout.ms一般小于它的三分之一heartbeat.interval.ms: 3000# 使用 Kafka 消费分组机制时再次调用 poll 允许的最大间隔。如果在该时间内没有再次调用 poll则认为该消费者已经失败Broker 会重新发起 Rebalance 把分配给它的 partition 分配给其他消费者max.poll.interval.ms: 300000request.timeout.ms: 600000listener:# 在侦听器容器中运行的线程数。concurrency: 2type: batchmax-poll-records: 50#当 auto.commit.enable 设置为false时表示kafak的offset由customer手动维护#spring-kafka提供了通过ackMode的值表示不同的手动提交方式#手动调用Acknowledgment.acknowledge()后立即提交ack-mode: manual_immediate# 消费者监听的topic不存在时项目会报错设置为falsemissing-topics-fatal: false 三、消息消费 手动提交非批量消费 String 类型接入 KafkaListener(topics {test-topic}, groupId test-consumer-group)public void onMessage(String message, Consumer consumer) {System.out.println(接收到的消息 message);consumer.commitSync();} 使用注解方式获取消息头、消息体 /*** 处理消息*/KafkaListener(topics test-topic, groupId test-consumer-group)public void onMessage(Payload String message,Header(KafkaHeaders.RECEIVED_TOPIC) String topic,Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,Header(name KafkaHeaders.RECEIVED_MESSAGE_KEY, required false) String key,Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts,Acknowledgment ack) {try {ack.acknowledge();log.info(Consumerend);} catch (Exception e) {log.error(Consumer.onMessage#error . message{}, message, e);throw new BizException(事件消息消费失败, e);}} 手动提交批量消费 想要批量消费首先要开启批量消费通过listener.type属性设置为batch即可开启看下代码吧 spring:kafka:consumer:group-id: test-consumer-groupbootstrap-servers: 127.0.0.1:9092max-poll-records: 50 # 一次 poll 最多返回的记录数listener:type: batch # 开启批量消费 如上设置了启用批量消费和批量消费每次最多消费记录数。这里设置 max-poll-records是50并不是说如果没有达到50条消息我们就一直等待。而是说一次poll最多返回的记录数为50 ConsumerRecord类接收 /*** kafka的批量消费监听器*/KafkaListener(topics test-topic, groupId test-consumer-group)public void onMessage(ListConsumerRecordString, String records, Consumer consumer) {try {log.info(Consumer.batch#size{}, records null ? 0 : records.size());if (CollectionUtil.isEmpty(records)) {//分别是commitSync同步提交和commitAsync异步提交consumer.commitSync();return;}for (ConsumerRecordString, String record : records) {String message record.value();if (StringUtils.isBlank(message)) {continue;}//处理业务数据//doBuiness();}consumer.commitSync();log.info(Consumerend);} catch (Exception e) {log.error(Consumer.onMessage#error ., e);throw new BizException(事件消息消费失败, e);}} String类接收 KafkaListener(topics {test-topic}, groupId test-consumer-group)public void onMessage(ListString message, Consumer consumer) {System.out.println(接收到的消息 message);consumer.commitSync();} 使用注解方式获取消息头、消息体则也是使用 List 来接收 Component public class KafkaConsumer {// 消费监听KafkaListener(topics {test-topic})public void listen2(Payload ListString data,Header(KafkaHeaders.RECEIVED_TOPIC) ListString topics,Header(KafkaHeaders.RECEIVED_PARTITION_ID) ListInteger partitions,Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) ListString keys,Header(KafkaHeaders.RECEIVED_TIMESTAMP) ListLong tss) {System.out.println(收到 data.size() 条消息);System.out.println(data);System.out.println(topics);System.out.println(partitions);System.out.println(keys);System.out.println(tss);} } 并发消费  再来看下并发消费为了加快消费我们可以提高并发数比如下面配置我们将并发设置为 3。注意并发量根据实际分区数决定必须小于等于分区数否则会有线程一直处于空闲状态 spring:kafka:consumer:group-id: test-consumer-groupbootstrap-servers: 127.0.0.1:9092max-poll-records: 50 # 一次 poll 最多返回的记录数listener:type: batch # 开启批量监听concurrency: 3 # 设置并发数 我们设置concurrency为3也就是将会启动3条线程进行监听而要监听的topic有5个partition意味着将有2条线程都是分配到2个partition还有1条线程分配到1个partition 配置类方式 通过自定义配置类的方式也是可以的但是相对yml配置来说还是有点麻烦的(不提倡) /*** 消费者配置*/ Configuration public class KafkaConsumerConfig {/*** 消费者配置* return*/public MapString,Object consumerConfigs(){MapString,Object props new HashMap();props.put(ConsumerConfig.GROUP_ID_CONFIG, test-consumer-group);props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 127.0.0.1:9002);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return props;}Beanpublic KafkaListenerContainerFactoryConcurrentMessageListenerContainerString, Object batchFactory() {ConcurrentKafkaListenerContainerFactoryString, Object factory new ConcurrentKafkaListenerContainerFactory();factory.setConsumerFactory(new DefaultKafkaConsumerFactory(consumerConfigs()));//并发数量factory.setConcurrency(3);//开启批量监听factory.setBatchListener(true);return factory;} } 同时监听器通过KafkaListener注解的containerFactory 配置指定批量消费的工厂即可,如下 同时监听器通过KafkaListener注解的containerFactory 配置指定批量消费的工厂即可,如下 四、Kafka参数调优 一、Consumer参数说明 1、enable.auto.commit 该属性指定了消费者是否自动提交偏移量默认值是true。 为了尽量避免出现重复数据假如某个消费者poll消息后应用正在处理消息在3秒后kafka进行了重平衡那么由于没有更新位移导致重平衡后这部分消息重复消费和数据丢失可以把它设为 false由自己控制何时提交偏移量。 如果把它设为true还可以通过配置 auto.commit.interval.ms 属性来控制提交的频率。   2、auto.commit.interval.ms 自动提交间隔。范围[0,Integer.MAX]默认值是 5000 5 s 3、手动提交commitSync/commitAsync 手动提交offset的方法有两种分别是commitSync同步提交和commitAsync异步提交。 相同点都会将本次poll的一批数据最大的偏移量提交。 不同点commitSync会阻塞当前线程一直到提交成功并且会自动失败重试由不可控因素导致也会出现提交失败而commitAsync则没有失败重试机制故有可能提交失败导致重复消费。 4、max.poll.records Consumer每次调用poll()时取到的records的最大数。 二、Kafka消息积压、消费能力不足怎么解决 如果是Kafka消费能力不足则可以考虑增加Topic的分区数同时相应的增加消费者实例消费者数分区数二者缺一不可。 如果是下游的数据处理不及时则可以提高每批次拉取的数量通过max.poll.records这个参数可以调整。 单个消费者实例的消费能力提升可以用多线程/线程池的方式并发消费提高单机的消费能力。 三、Kafka消费者如何进行流控 将自动提交改成手动提交enable.auto.commitfalse每次消费完再手动异步提交offset之后消费者再去Broker拉取新消息这样可以做到按照消费能力拉取消息减轻消费者的压力。
http://www.tj-hxxt.cn/news/228184.html

相关文章:

  • h5网站开发培训哪里好广告设计在线
  • 佛山哪里有做网站的上海网站开发网站开发公司
  • 海洋专业做网站论坛网站建设方案
  • 网站建设上海哪家公司好黄骅市找工作
  • asp网站模板源码免费无限下载做一个网站花2万贵吗
  • 网站流量下滑有服务器如何做网站
  • 五年级信息做网站的软件html用什么软件
  • 玉溪哪有网站建设开发百度推广怎么收费
  • 校园网站方案建筑行业网
  • 做店铺首页的网站互联网运营平台
  • 网站免费建站2wordpress 评论模板
  • 厂家网站怎么做基于wed的网站开发
  • 深圳做微商网站的公司网站策划书格式及范文
  • wordpress的站点地址怎么设置文字变形logo设计
  • 二手交易平台网站的建设网站可以做被告嘛
  • 南宁网站seo大概多少钱seo的搜索排名影响因素有哪些
  • 南宁哪里有做网站的公司设计企业网店推广策略
  • 国外专业做汽配的网站广州网站建设 中网科技
  • 石家庄做网站推广排名的公司中国优秀网页设计案例
  • 网站建设门户wordpress 注册邮箱
  • 医疗机构网站备案asp网站源码下载
  • 用自己的电脑做网站服务器优化网站的技巧
  • 潍坊作风建设网站内部网站的作用
  • 做网站挣钱来个好心人指点一下呗网页制作q元素
  • 做网站要学什么软件黑蒜东莞网站建设
  • 电子商务网站用户协议百度认证平台
  • 东台网站网站建设网站开发和ipv6
  • 湛江制作企业网站信息发布网站建设
  • 网店平台网站建设需求wordpress建站的教程
  • 清华大学绿色大学建设网站微官网建设公司排行