当前位置: 首页 > news >正文

尤溪网站开发公司如何建立网站

尤溪网站开发,公司如何建立网站,网页美工设计参考文献,谷歌做公司网站需要多少钱Interceptors ProducerInterceptor https://www.cnblogs.com/huxi2b/p/7072447.html Producer拦截器(interceptor)是个相当新的功能#xff0c;它和consumer端interceptor是在Kafka 0.10版本被引入的#xff0c;主要用于实现clients端的定制化控制逻辑。 对于producer而言它和consumer端interceptor是在Kafka 0.10版本被引入的主要用于实现clients端的定制化控制逻辑。 对于producer而言interceptor使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求比如修改消息等。同时producer允许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain),按照指定顺序调用它们. API public interface ProducerInterceptorK, V extends Configurable, AutoCloseable {//该方法封装进KafkaProducer.send方法中即它运行在用户主线程中的。Producer确保在消息被序列化以计算分区**前**调用该方法。用户可以在该方法中对消息做任何操作但最好保证不要修改消息所属的topic和分区否则会影响目标分区的计算ProducerRecordK, V onSend(ProducerRecordK, V record);//该方法会在消息被应答之前或消息发送失败时调用并且通常都是在producer回调逻辑触发之前。onAcknowledgement运行在producer的IO线程中因此不要在该方法中放入很重的逻辑否则会拖慢producer的消息发送效率void onAcknowledgement(RecordMetadata metadata, Exception exception);//关闭interceptor主要用于执行一些资源清理工作void close(); } demo public static void main(String[] args) throws ExecutionException, InterruptedException {MapString, Object props new HashMap();props.put(bootstrap.servers, localhost:9092);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, RawSerializer.class);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, RawSerializer.class);ListString interceptors new ArrayList();interceptors.add(cn.jhs.kakfa.p.interceptor.TimeStampInterceptor); // interceptor 1interceptors.add(cn.jhs.kakfa.p.interceptor.CounterInterceptor); // interceptor 2props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);String topic test-topic;ProducerString, String producer new KafkaProducer(props);for (int i 0; i 10; i) {ProducerRecordString, String record new ProducerRecord(topic, message: i);producer.send(record).get();}// 一定要关闭producer这样才会调用interceptor的close方法producer.close();} }ConsumerInterceptor https://blog.csdn.net/warybee/article/details/121980296 消费者Consumer在拉取了分区消息之后要首先经过反序列化器对key和value进行反序列化处理处理完之后如果消费端设置了拦截器则需要经过拦截器的处理之后才能返回给消费者应用程 序进行处理。 ConsumerInterceptor允许拦截甚至更改消费者接收到的消息。常用在于将第三方组件引入 消费者应用程序用于定制的监控、日志处理等。ConsumerInterceptor方法抛出的异常会被捕获、记录但是不会向下传播。如果用户配置了 错误的key或value类型参数消费者不会抛出异常而仅仅是记录下来。如果有多个拦截器则该方法按照KafkaConsumer的configs中配置的顺序调用。从调用 KafkaConsumer.poll(long) 的同一线程调用 ConsumerInterceptor 回调。 API public interface ConsumerInterceptorK, V extends Configurable, AutoCloseable {/**该方法在poll方法返回之前调用。调用结束后poll方法就返回消息了。该方法可以修改消费者消息返回新的消息。拦截器可以过滤收到的消息或生成新的消息。*/ConsumerRecordsK, V onConsume(ConsumerRecordsK, V records);/**当消费者提交偏移量时调用该方法。通常你可以在该方法中做一些记账类的动作比如打日志等。调用者将忽略此方法抛出的任何异常。*/void onCommit(MapTopicPartition, OffsetAndMetadata offsets);/*** 关闭Interceptor之前调用*/void close(); } 配置 //如果有多个拦截器用分割即可 configs.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,com.warybee.interceptor.MyConsumerInterceptor);Listeners ProducerListener https://blog.csdn.net/u014494148/article/details/125344184 Kafka提供了生产者监听器 ProducerListener他的作用类似于带回调的KafkaTemplate#send(callback) ; 可以监听到消息发送成功或者失败。ProducerListener 提供了onSuccess 成功回调和 onError 失败回调如下 API public interface ProducerListenerK, V {/*** Invoked after the successful send of a message (that is, after it has been acknowledged by the broker).*/default void onSuccess(ProducerRecordK, V producerRecord, RecordMetadata recordMetadata) {}/*** Invoked after an attempt to send a message has failed.*/default void onError(ProducerRecordK, V producerRecord, Nullable RecordMetadata recordMetadata,Exception exception) {}}自定义Listener public class MyProducerListenerK, V implements ProducerListenerK, V {private FallbackHandlerK, V fallbackHandler;Overridepublic void onError(ProducerRecordK, V producerRecord, Nullable RecordMetadata recordMetadata, Exception exception) {//fallbackHandler.process.//write error metrics...}Overridepublic void onSuccess(ProducerRecordK, V producerRecord, RecordMetadata recordMetadata) {//write success metrics...} }demo(KafkaTemplate.setProducerListener()) public KafkaTemplateObject, Object buildKafkaTemplate(MapString, Object props) {ProducerFactoryObject, Object factory new DefaultKafkaProducerFactory(props);KafkaTemplateObject, Object kafkaTemplate new KafkaTemplate(factory);MyProducerListenerObject, Object listener1 new MyProducerListener();listener1.setFallbackHandler(fallbackHandler);kafkaTemplate.setProducerListener(listener1);return kafkaTemplate;}KafkaListenerErrorHandler 当KafkaListener方法抛出异常时调用的错误处理程序. API FunctionalInterface public interface KafkaListenerErrorHandler {/*** Handle the error.*/Object handleError(Message? message, ListenerExecutionFailedException exception); }自定义CustomKafkaListenerErrorHandler(当异常过多时,暂停消费) /*** 可以通过* org.springframework.kafka.annotation.KafkaListener(errorHandlercustomKafkaListenerErrorHandler)* 来引入该配置*/ Component public class CustomKafkaListenerErrorHandler implements KafkaListenerErrorHandler {//记录了所有的 kafka MessageListenerContainerprivate final KafkaListenerEndpointRegistry endpointRegistry;public CustomKafkaListenerErrorHandler(KafkaListenerEndpointRegistry endpointRegistry) {this.endpointRegistry endpointRegistry;}Overridepublic Object handleError(Message? message, ListenerExecutionFailedException exception) {// 处理异常// 暂停消费者String listenerId exception.getGroupId();MessageListenerContainer listenerContainer endpointRegistry.getListenerContainer(listenerId);listenerContainer.pause();//滑动窗口算法 ---// 休眠一段时间例如 30秒try {Thread.sleep(30000); // 暂停 30 秒} catch (InterruptedException e) {Thread.currentThread().interrupt();}// 恢复消费者listenerContainer.resume();return null;} }demo org.springframework.kafka.annotation.KafkaListener(errorHandlercustomKafkaListenerErrorHandler)Callback producer.Callback public interface Callback {//processed befeore listener...void onCompletion(RecordMetadata metadata, Exception exception); }demo producer.send(producerRecord, (recordMetadata, exception) - {if (exception null) {System.out.println(Record written to offset recordMetadata.offset() timestamp recordMetadata.timestamp());} else {System.err.println(An error occurred);exception.printStackTrace(System.err);} });
http://www.tj-hxxt.cn/news/134130.html

相关文章:

  • 常州网站开发培训郴州做网站的公司
  • 蛋品 东莞网站建设网站设计是用什么做的
  • 排名好的青岛网站建设营销网站建设专业服务公司
  • 洛阳网站推广怎么做永康住房城乡建设局网站
  • 免费的黄冈网站有哪些平台呢永久久微信网页版手机端
  • 电商网站开发实战视频教程深圳网站建设最专业
  • html网站开发基础siteapp wordpress
  • 做网站有什么js特效室内设计软件免费下载
  • 公司网站的建设与运营管理制度网页升级访问新域名
  • 电子商务网站建设需求概述网站制作是怎么学的
  • 如何做国外网站推广wordpress增加底部导航
  • 做本地网站赚钱吗?泉州app开发
  • 苏州网站网络推广WordPress友情链接添加
  • 怎么把做的网站放到腾讯云里面什么网站可以做实验室
  • 婚纱摄影网站论文电子商务网站建设的期中考试
  • 气象网站建设管理的不足如何做网站seo优化
  • 网站域名怎么写好建立外贸网站
  • 酒店网站建设策划书wordpress 做问卷
  • 电子商务网站开发与设计报告上海公司网站建设价格
  • 建设信用网站的目的网页设计师培训班招生
  • 高密市建设局网站软文广告经典案例300
  • dw做网站如何让用户可编辑搜索引擎营销方案
  • 南山做网站教程四川省建设厅招标网站
  • 文明网站建设方案榆林市建设局网站
  • 开封市建设局网站wordpress tag调用文章
  • 百度seo优化网站wordpress文章位置
  • 上海奉贤网站建设建设工程法律法规
  • 动效网站建筑人才网官网96877
  • 宣传电脑的网站开发百度seo排名优化
  • 站长工具高清有吗网络组建与维护实训报告