当前位置: 首页 > news >正文

网站 粘度wordpress 在线教育主题

网站 粘度,wordpress 在线教育主题,中国芯片三巨头,网络推广培训机构哪个比较好文章目录 一、原生 KafkaConsumer1、pom文件引入kafka2、拉取数据3、发送数据二、在spring boot中使用@KafkaListener1、添加依赖2、application.yml3、消息拉取:consumer4、自定义ListenerContainerFactory5、消息发送:producer6、kafka通过clientId鉴权时的鉴权失败问题一、… 文章目录 一、原生 KafkaConsumer1、pom文件引入kafka2、拉取数据3、发送数据 二、在spring boot中使用@KafkaListener1、添加依赖2、application.yml3、消息拉取:consumer4、自定义ListenerContainerFactory5、消息发送:producer6、kafka通过clientId鉴权时的鉴权失败问题 一、原生 KafkaConsumer 1、pom文件引入kafka dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka_2.12/artifactId /dependency2、拉取数据 简单说只要以下几个步骤: 1、获取kafka地址,并设置Properties 2、获取consumer:KafkaConsumerString, String consumer = new KafkaConsumer(props); 3、订阅topic:consumer.subscribe(topic); 4、拉取数据:consumer.poll() 5、遍历数据 6、示例: package com.yogi.test.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.springframework.beans.factory.InitializingBean; import java.util.Properties; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.CreateTopicsResult; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.Node; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.serialization.StringSerializer;@Component public class TestMsgConsumer implements InitializingBean {@Value("${test.kafka.address:127.0.0.1:9092}")private String kafkaAddress;@Value("${test.kafka.msg.topic:topic_test_1,topic_test_2}")private String msgTopic;@Value("${test.consumer.name:yogima}")private String consumerGroupId;/*** 消费开关: true-消费,false-暂停消费* 在服务正常停止时用于停止继续消费数据,将缓存中的数据发送完即可*/private Boolean consumeSwitch = true;public void consumerMessage(ListString topic, String groupId) {LOGGER.info("consumer topic list1:{}",topic.toString());Properties props = new Properties();/*** 指定一组host:port对,用于创建与Kafka broker服务器的Socket连接,可以指定多组,使用逗号分隔,对于多broker集群,只需配置* 部分broker地址即可,consumer启动后可以通过这些机器找到完整的broker列表*/LOGGER.info("test.kafka.address:{}",kafkaAddress);props.put("bootstrap.servers", kafkaAddress);/*** 指定group名字,能唯一标识一个consumer group,如果不显示指定group.id会抛出InvalidGroupIdException异常,通常为group.id* 设置一个有业务意义的名字即可*/props.put("group.id", groupId);/*** 自动提交位移*/props.put("enable.auto.commit", Boolean.TRUE);/*** 位移提交超时时间*/props.put("auto.commit.interval.ms", "1000");/*** 从最早的消息开始消费* 1,earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费* 2,latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据*/props.put("auto.offset.reset", "latest");/*** 指定消费解序列化操作。consumer从broker端获取的任何消息都是字节数组的格式,因此需要指定解序列化操作才能还原为原本对象,* Kafka对绝大部分初始类型提供了解序列化器,consumer支持自定义解序列化器org.apache.kafka.common.serialization.Deserializer* org.apache.kafka.common.serialization.ByteArrayDeserializer* StringDeserializer*/props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");/*** 对消息体进行解序列化,与key解序列化类似*/props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");//一次从kafka中poll出来的数据条数,max.poll.records条数据需要在在session.timeout.ms这个时间内处理完props.put("max.poll.records", "500");//fetch.message.max.bytes (默认 1MB) – 消费者能读取的最大消息。这个值应该大于或等于message.max.bytes。props.put("fetch.message.max.bytes", "300000000");KafkaConsumerString, String consumer;try{/*** 通过Properties实例对象构建KafkaConsumer对象,可同时指定key、value序列化器*/LOGGER.info("start set consumer,props:{}",props.toString());consumer = new KafkaConsumer(props);LOGGER.info("set consumer finished");/*** 订阅consumer group需要消费的topic列表*/LOGGER.info("consumer topic list:{}",topic.toString());consumer.subscribe(topic);}catch (Exception e){LOGGER.info("consumer subscribe failed,msg:{},cause:{},e:{}",e.getMessage(),e.getCause(),e);return;}/*** 并行从订阅topic获取多个分区消息,为此新版本consumer的poll方法使用类似Linux的 selec I/O机制,* 所有相关的事件都发生在一个事件循环中,这样consuner端只使用一个线程就能完成所有类型I/o操作*/try {while (true) {if (!consumeSwitch) {try {Thread.sleep(30000);} catch (InterruptedException e) {LOGGER.error("err msg:" + e.getMessage());}}/*** 指定超时时间,通常情况下consumer拿到了足够多的可用数据,会立即从该方法返回,但若当前没有足够多数据* consumer会处于阻塞状态,但当到达设定的超时时间,则无论数据是否足够都为立即返回*/ConsumerRecordsString, String records = consumer.poll(Duration.ofSeconds(1L));/*** poll调用返回ConsumerRecord类分装的Kafka消息,之后会根据自己业务实现信息处理,对于consumer而言poll方法* 返回即认为consumer成功消费了消息*/for (ConsumerRecordString, String record : records) {LOGGER.debug("offset = {}, key = {}, value = {}"
http://www.tj-hxxt.cn/news/220351.html

相关文章:

  • 百度收录网站定位地址十大外贸论坛
  • 建设通官方网站下载e航seo的排名机制
  • 做二手车有哪些网站有哪些手续网站开发湛江
  • 可以看帖子的网站毕业答辩ppt模板免费下载 素材
  • 网站排名优化要多少钱网站不备案备案
  • 网站怎么做会让神马搜索到旺店通app手机企业版下载
  • 兴义网站seo开一个网站需要什么
  • 网站标题logo怎么做樱花代码html
  • 周杰伦做的广告网站网站建设除了中企动力
  • 做网站后台教程视频中国建设工程监理协会网站
  • 网站开发工具.晴天娃娃wordpress采集微信公众号
  • 做免费网站建设银行银行官网网站
  • 网站后台怎么做企业文化培训
  • 基于php的网站建设思路方案图文制作app
  • 重庆网站的制作价格网站怎么后台登陆
  • 买东西的网站深圳公司注册地址
  • 青岛谁优化网站做的好如何看网站的浏览量
  • 站长检测同ip网站很多怎么办android安卓软件下载
  • 做网站维护的人叫啥做网站需要字体授权
  • 中学网站域名用什么用软文推广多少钱一篇
  • 网站改版的意义公司网站如何做宣传
  • 动漫网站建站目的湖南网站建设欧黎明
  • 网站注册建设如何建设网站山东济南兴田德润官网
  • 网站建设试题网络销售怎么做自己的网站
  • 邢台网站开发有没专门做二手的家具网站
  • 建站网站设计建设银行理财网站
  • 有什么可以做cad赚钱的网站网站 做英文 翻译 规则
  • 深圳建设品牌网站wordpress底部版权代码
  • 宿迁做网站多少钱上海哪里有做网站的
  • 佛山网站设计联系方式广西住房城乡建设培训中心