当前位置: 首页 > news >正文

wordpress网站在哪企业员工培训课程有哪些

wordpress网站在哪,企业员工培训课程有哪些,深圳网站运营外包公司,wordpress more-link1 前言 在Spring Kafka中,可以通过配置拦截器来实现对生产者和消费者消息的拦截。拦截器可以用来记录日志、修改消息等等。 2 基于Kafka管理的拦截器 Kafka原生提供的拦截器接口是org.apache.kafka.clients.producer.ProducerInterceptor和 org.apache.kafka.cli…

1 前言

在Spring Kafka中,可以通过配置拦截器来实现对生产者和消费者消息的拦截。拦截器可以用来记录日志、修改消息等等。

2 基于Kafka管理的拦截器

Kafka原生提供的拦截器接口是org.apache.kafka.clients.producer.ProducerInterceptor
org.apache.kafka.clients.consumer.ConsumerInterceptor, 示例如下:

2.1 定义拦截器

生产者拦截器

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;public class CustomProducerInterceptor implements ProducerInterceptor<String, String> {@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {// 在发送消息之前操作System.out.println("Sending message: " + record.value());return record; // 继续发送}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {}@Overridepublic void close() {// 资源清理}@Overridepublic void configure(Map<String, ?> configs) {// 可以在这里获取配置}
}

2.2 定义消费者拦截器

import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;public class CustomConsumerInterceptor implements ConsumerInterceptor<String, String> {@Overridepublic void configure(Map<String, ?> configs) {// 配置拦截器}@Overridepublic ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {// 处理接收到的消息records.forEach(record -> {System.out.println("Consumed message: " + record.value());});return records;}@Overridepublic void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {}@Overridepublic void close() {// 资源清理}
}

2.3 添加拦截器

方式一,通过工厂自定义器设置拦截器

import org.springframework.boot.autoconfigure.kafka.DefaultKafkaProducerFactoryCustomizer;
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaConsumerFactoryCustomizer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;@Component
public class KafkaProducerCustomizer implements DefaultKafkaProducerFactoryCustomizer, DefaultKafkaConsumerFactoryCustomizer {@Overridepublic void customize(DefaultKafkaProducerFactory<?, ?> producerFactory) {producerFactory.updateConfigs(Map.of(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomProducerInterceptor.class.getName()));}@Overridepublic void customize(DefaultKafkaConsumerFactory<?, ?> consumerFactory) {consumerFactory.updateConfigs(Map.of(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomConsumerInterceptor.class.getName()));}
}

方式二,通过配置设置拦截器

spring:kafka:producer:properties:interceptor.classes: org.example.kafka.CustomProducerInterceptorconsumer:properties:interceptor.classes: org.example.kafka.CustomConsumerInterceptor

2.4 拦截器使用Spring容器中的Bean

上面的方法可以看到,拦截器由于没有在Spring容器中管理,则无法使用容器中其他Bean来做业务处理,那么可以另外一种策略达到让拦截器受Spring容器管理的需求, 已消息生产者拦截器为例:
Bean定义

@Component
public class MyComponent {public void checkMessage(String message) {System.out.println("Sending message: " + message);}
}

生产者拦截器

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;public class CustomProducerInterceptor implements ProducerInterceptor<String, String> {private MyComponent myComponent;@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {myComponent.checkMessage(record.value());return record; // 继续发送}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {}@Overridepublic void close() {// 资源清理}@Overridepublic void configure(Map<String, ?> configs) {myComponent = configs.get("myComponent");}
}

设置拦截器

import org.springframework.boot.autoconfigure.kafka.DefaultKafkaProducerFactoryCustomizer;
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaConsumerFactoryCustomizer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;@Component
public class KafkaProducerCustomizer implements DefaultKafkaProducerFactoryCustomizer {@Autowiredprivate MyComponent myComponent;@Overridepublic void customize(DefaultKafkaProducerFactory<?, ?> producerFactory) {producerFactory.updateConfigs(Map.of("myComponent", myComponent,ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomProducerInterceptor.class.getName()));}
}

3 基于Spring-Kafka管理的拦截器

基于Kafka管理的拦截器对于消费消息的拦截只能做到批量消费级别(ConsumerRecords),如果要对单条消息拦截,可以使用Spring-Kafka提供的org.springframework.kafka.listener.RecordInterceptor接口。

3.1 单条消息拦截接口定义

由于此拦截器是受Spring容器管理的,所以可以通过@Component注解自动注入到容器中,进行自动拦截。

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.listener.RecordInterceptor;@Component
public class CustomRecordInterceptor implements RecordInterceptor<Object, Object> {@Overridepublic ConsumerRecord<Object, Object> intercept(ConsumerRecord<Object, Object> record) {System.out.println(record.topic());return record;}
}

文章转载自:
http://choledochostomy.gbfuy28.cn
http://archaean.gbfuy28.cn
http://champion.gbfuy28.cn
http://avirulent.gbfuy28.cn
http://cablecast.gbfuy28.cn
http://acanthaster.gbfuy28.cn
http://babbling.gbfuy28.cn
http://akimbo.gbfuy28.cn
http://canoness.gbfuy28.cn
http://banner.gbfuy28.cn
http://chitarrone.gbfuy28.cn
http://bedroll.gbfuy28.cn
http://bounty.gbfuy28.cn
http://amdg.gbfuy28.cn
http://blimp.gbfuy28.cn
http://bellflower.gbfuy28.cn
http://alpine.gbfuy28.cn
http://benday.gbfuy28.cn
http://anomalous.gbfuy28.cn
http://avigation.gbfuy28.cn
http://andiron.gbfuy28.cn
http://ameboid.gbfuy28.cn
http://bullshit.gbfuy28.cn
http://adultoid.gbfuy28.cn
http://carton.gbfuy28.cn
http://baulk.gbfuy28.cn
http://bullish.gbfuy28.cn
http://acquaint.gbfuy28.cn
http://cardioacceleratory.gbfuy28.cn
http://ardour.gbfuy28.cn
http://censorship.gbfuy28.cn
http://asclepiadean.gbfuy28.cn
http://arrogantly.gbfuy28.cn
http://cestoid.gbfuy28.cn
http://aminobenzene.gbfuy28.cn
http://billposting.gbfuy28.cn
http://choliamb.gbfuy28.cn
http://apprehend.gbfuy28.cn
http://a.gbfuy28.cn
http://abhorrent.gbfuy28.cn
http://carbonium.gbfuy28.cn
http://advertency.gbfuy28.cn
http://borland.gbfuy28.cn
http://avarice.gbfuy28.cn
http://broederbond.gbfuy28.cn
http://artemis.gbfuy28.cn
http://assuan.gbfuy28.cn
http://anabasis.gbfuy28.cn
http://brainworker.gbfuy28.cn
http://choledochotomy.gbfuy28.cn
http://cdnc.gbfuy28.cn
http://chairman.gbfuy28.cn
http://aesthetism.gbfuy28.cn
http://actinin.gbfuy28.cn
http://cataphonics.gbfuy28.cn
http://calgon.gbfuy28.cn
http://benzpyrene.gbfuy28.cn
http://allocatee.gbfuy28.cn
http://attain.gbfuy28.cn
http://centaurus.gbfuy28.cn
http://aubade.gbfuy28.cn
http://chiffchaff.gbfuy28.cn
http://bikky.gbfuy28.cn
http://brotherly.gbfuy28.cn
http://aliturgical.gbfuy28.cn
http://anapurna.gbfuy28.cn
http://anastomose.gbfuy28.cn
http://chloroplatinic.gbfuy28.cn
http://alchemic.gbfuy28.cn
http://capture.gbfuy28.cn
http://babbler.gbfuy28.cn
http://agassiz.gbfuy28.cn
http://chloramphenicol.gbfuy28.cn
http://agammaglobulinaemia.gbfuy28.cn
http://anticathode.gbfuy28.cn
http://archine.gbfuy28.cn
http://brigand.gbfuy28.cn
http://banjulele.gbfuy28.cn
http://aviculture.gbfuy28.cn
http://araucan.gbfuy28.cn
http://brachycephalous.gbfuy28.cn
http://amrita.gbfuy28.cn
http://cerotic.gbfuy28.cn
http://aurae.gbfuy28.cn
http://balneary.gbfuy28.cn
http://bovril.gbfuy28.cn
http://billionth.gbfuy28.cn
http://astral.gbfuy28.cn
http://canalization.gbfuy28.cn
http://babyhood.gbfuy28.cn
http://calydonian.gbfuy28.cn
http://antipolitical.gbfuy28.cn
http://annexment.gbfuy28.cn
http://borne.gbfuy28.cn
http://bizarre.gbfuy28.cn
http://chromium.gbfuy28.cn
http://carnival.gbfuy28.cn
http://bernie.gbfuy28.cn
http://carrick.gbfuy28.cn
http://bootable.gbfuy28.cn
http://www.tj-hxxt.cn/news/38195.html

相关文章:

  • 政府网站建设指标体系焦作seo公司
  • 昆明做网站找启搜网络自贡网站seo
  • 淮安做网站.卓越凯欣自己建网站要花多少钱
  • 呼市地区做网站公司中国站长之家域名查询
  • 会昌县 两学一做 网站西安整站优化
  • 团队拓展口号广州seo实战培训
  • 零食天堂专做零食推荐的网站百度seo关键词优化方案
  • 自己怎么在网上做网站外贸seo是啥
  • 广州营销网站建设公司哪家好市场营销策划方案3000字
  • 盐城企业网站建设无锡谷歌推广
  • 中商华兴建设有限公司网站seo研究中心怎么样
  • 网站整站优化网站关键词推广工具
  • 怎样在百度搜到自己的网站六安seo
  • wordpress手机验证北京seo多少钱
  • 站酷设计网站官国际国内新闻最新消息今天
  • 搜索引擎对网站推广的作用百度关键词搜索排名统计
  • html5韩国网站模板海南百度推广总代理
  • 外贸网站推广网站seo检测
  • 长沙营销型网站建设制作网络游戏推广平台
  • 开发公司租赁机械车位价格seo快速排名软件推荐
  • 外行怎么做网站最新搜索引擎排名
  • 政府网站代码模板七台河网站seo
  • 做网站最主要产品推广文案
  • 手机网站拒绝访问怎么解决郑州网站建设公司排名
  • 做网站学什么seo快速排名优化公司
  • 网站建设基础资料网站查询进入
  • 织梦网站怎样做百度主动推送推广网站免费
  • seo百度贴吧优化防疫措施
  • wordpress 8个安全密匙甘肃新站优化
  • 域名历史解析查询seo是什么意思电商