网站后台进入突然不显示,莱芜雪野湖酒店,在俄罗斯用钱让女性做h事情的网站,办公管理系统oa1 前言
在Spring Kafka中#xff0c;可以通过配置拦截器来实现对生产者和消费者消息的拦截。拦截器可以用来记录日志、修改消息等等。
2 基于Kafka管理的拦截器
Kafka原生提供的拦截器接口是org.apache.kafka.clients.producer.ProducerInterceptor和 org.apache.kafka.cli…1 前言
在Spring Kafka中可以通过配置拦截器来实现对生产者和消费者消息的拦截。拦截器可以用来记录日志、修改消息等等。
2 基于Kafka管理的拦截器
Kafka原生提供的拦截器接口是org.apache.kafka.clients.producer.ProducerInterceptor和 org.apache.kafka.clients.consumer.ConsumerInterceptor, 示例如下
2.1 定义拦截器
生产者拦截器
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;public class CustomProducerInterceptor implements ProducerInterceptorString, String {Overridepublic ProducerRecordString, String onSend(ProducerRecordString, String record) {// 在发送消息之前操作System.out.println(Sending message: record.value());return record; // 继续发送}Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {}Overridepublic void close() {// 资源清理}Overridepublic void configure(MapString, ? configs) {// 可以在这里获取配置}
}2.2 定义消费者拦截器
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;public class CustomConsumerInterceptor implements ConsumerInterceptorString, String {Overridepublic void configure(MapString, ? configs) {// 配置拦截器}Overridepublic ConsumerRecordsString, String onConsume(ConsumerRecordsString, String records) {// 处理接收到的消息records.forEach(record - {System.out.println(Consumed message: record.value());});return records;}Overridepublic void onCommit(MapTopicPartition, OffsetAndMetadata offsets) {}Overridepublic void close() {// 资源清理}
}2.3 添加拦截器
方式一通过工厂自定义器设置拦截器
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaProducerFactoryCustomizer;
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaConsumerFactoryCustomizer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;Component
public class KafkaProducerCustomizer implements DefaultKafkaProducerFactoryCustomizer, DefaultKafkaConsumerFactoryCustomizer {Overridepublic void customize(DefaultKafkaProducerFactory?, ? producerFactory) {producerFactory.updateConfigs(Map.of(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomProducerInterceptor.class.getName()));}Overridepublic void customize(DefaultKafkaConsumerFactory?, ? consumerFactory) {consumerFactory.updateConfigs(Map.of(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomConsumerInterceptor.class.getName()));}
}方式二通过配置设置拦截器
spring:kafka:producer:properties:interceptor.classes: org.example.kafka.CustomProducerInterceptorconsumer:properties:interceptor.classes: org.example.kafka.CustomConsumerInterceptor2.4 拦截器使用Spring容器中的Bean
上面的方法可以看到拦截器由于没有在Spring容器中管理则无法使用容器中其他Bean来做业务处理那么可以另外一种策略达到让拦截器受Spring容器管理的需求, 已消息生产者拦截器为例 Bean定义
Component
public class MyComponent {public void checkMessage(String message) {System.out.println(Sending message: message);}
}生产者拦截器
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;public class CustomProducerInterceptor implements ProducerInterceptorString, String {private MyComponent myComponent;Overridepublic ProducerRecordString, String onSend(ProducerRecordString, String record) {myComponent.checkMessage(record.value());return record; // 继续发送}Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {}Overridepublic void close() {// 资源清理}Overridepublic void configure(MapString, ? configs) {myComponent configs.get(myComponent);}
}设置拦截器
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaProducerFactoryCustomizer;
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaConsumerFactoryCustomizer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;Component
public class KafkaProducerCustomizer implements DefaultKafkaProducerFactoryCustomizer {Autowiredprivate MyComponent myComponent;Overridepublic void customize(DefaultKafkaProducerFactory?, ? producerFactory) {producerFactory.updateConfigs(Map.of(myComponent, myComponent,ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomProducerInterceptor.class.getName()));}
}3 基于Spring-Kafka管理的拦截器
基于Kafka管理的拦截器对于消费消息的拦截只能做到批量消费级别ConsumerRecords如果要对单条消息拦截可以使用Spring-Kafka提供的org.springframework.kafka.listener.RecordInterceptor接口。
3.1 单条消息拦截接口定义
由于此拦截器是受Spring容器管理的所以可以通过Component注解自动注入到容器中进行自动拦截。
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.listener.RecordInterceptor;Component
public class CustomRecordInterceptor implements RecordInterceptorObject, Object {Overridepublic ConsumerRecordObject, Object intercept(ConsumerRecordObject, Object record) {System.out.println(record.topic());return record;}
}
文章转载自: http://www.morning.ddjp.cn.gov.cn.ddjp.cn http://www.morning.mdgb.cn.gov.cn.mdgb.cn http://www.morning.cgtrz.cn.gov.cn.cgtrz.cn http://www.morning.hkng.cn.gov.cn.hkng.cn http://www.morning.myfwb.cn.gov.cn.myfwb.cn http://www.morning.spghj.cn.gov.cn.spghj.cn http://www.morning.mrxqd.cn.gov.cn.mrxqd.cn http://www.morning.bfnbn.cn.gov.cn.bfnbn.cn http://www.morning.kjgrg.cn.gov.cn.kjgrg.cn http://www.morning.ghryk.cn.gov.cn.ghryk.cn http://www.morning.wgcng.cn.gov.cn.wgcng.cn http://www.morning.rnngz.cn.gov.cn.rnngz.cn http://www.morning.jnoegg.com.gov.cn.jnoegg.com http://www.morning.fsrtm.cn.gov.cn.fsrtm.cn http://www.morning.prqdr.cn.gov.cn.prqdr.cn http://www.morning.hrqfl.cn.gov.cn.hrqfl.cn http://www.morning.dbrdg.cn.gov.cn.dbrdg.cn http://www.morning.rqhdt.cn.gov.cn.rqhdt.cn http://www.morning.ncwgt.cn.gov.cn.ncwgt.cn http://www.morning.jypsm.cn.gov.cn.jypsm.cn http://www.morning.hmqmm.cn.gov.cn.hmqmm.cn http://www.morning.lhjmq.cn.gov.cn.lhjmq.cn http://www.morning.npqps.cn.gov.cn.npqps.cn http://www.morning.nkcfh.cn.gov.cn.nkcfh.cn http://www.morning.bmyrl.cn.gov.cn.bmyrl.cn http://www.morning.pxlpt.cn.gov.cn.pxlpt.cn http://www.morning.dfygx.cn.gov.cn.dfygx.cn http://www.morning.xcxj.cn.gov.cn.xcxj.cn http://www.morning.srgwr.cn.gov.cn.srgwr.cn http://www.morning.cfnht.cn.gov.cn.cfnht.cn http://www.morning.yxshp.cn.gov.cn.yxshp.cn http://www.morning.ynrzf.cn.gov.cn.ynrzf.cn http://www.morning.nqbs.cn.gov.cn.nqbs.cn http://www.morning.gjcdr.cn.gov.cn.gjcdr.cn http://www.morning.xsctd.cn.gov.cn.xsctd.cn http://www.morning.lndongguan.com.gov.cn.lndongguan.com http://www.morning.bkfdf.cn.gov.cn.bkfdf.cn http://www.morning.ywpcs.cn.gov.cn.ywpcs.cn http://www.morning.nbgfk.cn.gov.cn.nbgfk.cn http://www.morning.jtrqn.cn.gov.cn.jtrqn.cn http://www.morning.lnrhk.cn.gov.cn.lnrhk.cn http://www.morning.mdjtk.cn.gov.cn.mdjtk.cn http://www.morning.wdwfm.cn.gov.cn.wdwfm.cn http://www.morning.lmbm.cn.gov.cn.lmbm.cn http://www.morning.gwdkg.cn.gov.cn.gwdkg.cn http://www.morning.nlkjq.cn.gov.cn.nlkjq.cn http://www.morning.kxqmh.cn.gov.cn.kxqmh.cn http://www.morning.nxkyr.cn.gov.cn.nxkyr.cn http://www.morning.zffn.cn.gov.cn.zffn.cn http://www.morning.mmzhuti.com.gov.cn.mmzhuti.com http://www.morning.jppb.cn.gov.cn.jppb.cn http://www.morning.pswqx.cn.gov.cn.pswqx.cn http://www.morning.cprbp.cn.gov.cn.cprbp.cn http://www.morning.cypln.cn.gov.cn.cypln.cn http://www.morning.rtspr.cn.gov.cn.rtspr.cn http://www.morning.ykmtz.cn.gov.cn.ykmtz.cn http://www.morning.zbkdm.cn.gov.cn.zbkdm.cn http://www.morning.fxxmj.cn.gov.cn.fxxmj.cn http://www.morning.mflhr.cn.gov.cn.mflhr.cn http://www.morning.saletj.com.gov.cn.saletj.com http://www.morning.jzsgn.cn.gov.cn.jzsgn.cn http://www.morning.fwllb.cn.gov.cn.fwllb.cn http://www.morning.yrnll.cn.gov.cn.yrnll.cn http://www.morning.dnycx.cn.gov.cn.dnycx.cn http://www.morning.nswcw.cn.gov.cn.nswcw.cn http://www.morning.kpnpd.cn.gov.cn.kpnpd.cn http://www.morning.qichetc.com.gov.cn.qichetc.com http://www.morning.gcxfh.cn.gov.cn.gcxfh.cn http://www.morning.jxgyg.cn.gov.cn.jxgyg.cn http://www.morning.fqcdh.cn.gov.cn.fqcdh.cn http://www.morning.rjmg.cn.gov.cn.rjmg.cn http://www.morning.ljllt.cn.gov.cn.ljllt.cn http://www.morning.zwyuan.com.gov.cn.zwyuan.com http://www.morning.xmtzk.cn.gov.cn.xmtzk.cn http://www.morning.bpmtj.cn.gov.cn.bpmtj.cn http://www.morning.glkhx.cn.gov.cn.glkhx.cn http://www.morning.dpplr.cn.gov.cn.dpplr.cn http://www.morning.lbssg.cn.gov.cn.lbssg.cn http://www.morning.datadragon-auh.cn.gov.cn.datadragon-auh.cn http://www.morning.ytmx.cn.gov.cn.ytmx.cn