当前位置: 首页 > news >正文

温州网站设计服务商个人网站备案信息

温州网站设计服务商,个人网站备案信息,qian p.wordpress/,wordpress升级提示文件流的目标1、生产者 从编程的角度而言#xff0c;生产者是一个消息的生产者#xff0c;它负责创建消息并发送到Kafka集群中的一个或多个topic中。 1.1、客户端开发 一个正常的生产逻辑需要具备以下几个步骤#xff1a; 配置生产者客户端参数及创建相应的生产者实例构建待发送的消…1、生产者 从编程的角度而言生产者是一个消息的生产者它负责创建消息并发送到Kafka集群中的一个或多个topic中。 1.1、客户端开发 一个正常的生产逻辑需要具备以下几个步骤 配置生产者客户端参数及创建相应的生产者实例构建待发送的消息 public ProducerRecord(String topic, V value)public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable headers) public ProducerRecord(String topic, Integer partition, K key, V value)public ProducerRecord(String topic, K key, V value)public ProducerRecord(String topic, V value, Iterable headers) public ProducerRecord(String topic, K key, V value, Iterable headers) public ProducerRecord(String topic, Integer partition, K key, V value, Iterable headers) public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) 发送消息 同步发送sync异步发送async producer.send(record).get() 发后即忘fire-and-forget 关闭生产者实例 配置生产者客户端参数及创建相应的生产者实例 /*** author supanpan* date 2023/11/20*/ public class KafkaProducerAnalysis {public static final String brokerList localhost:9092;public static final String topic topic-demo;/*** bootstrap.servers 该参数用来指定生产者客户端连接Kafka集群所需的broker地址清单格式为host:port,host2:port2* serializer 该参数指定了用来对消息key进行序列化的序列化器类key.serializer和value.serializer两个参数需要设置必须填写序列化器的全限定类名* client.id 该参数用来设置生产者客户端的ID是一个字符串如果不设置KafkaProducer会自动生成一个非空字符串格式为producer-1、producer-2等**/public static Properties initConfig() {Properties props new Properties();props.put(bootstrap.servers, brokerList);props.put(key.serializer,org.apache.kafka.common.serialization.StringSerializer);props.put(value.serializer,org.apache.kafka.common.serialization.StringSerializer);props.put(client.id, producer.client.id.demo);return props;}/*** 防止配置书写错误使用ProducerConfig类中的常量来设置参数* return*/public static Properties initNewConfig() {Properties props new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);props.put(ProducerConfig.CLIENT_ID_CONFIG, producer.client.id.demo);return props;}/*** 通过反射的方式来设置参数获取序列化器的全限定类名**/public static Properties initPerferConfig() {Properties props new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());return props;}public static void main(String[] args) throws InterruptedException {Properties props initConfig();KafkaProducerString, String producer new KafkaProducer(props);// KafkaProducerString, String producer new KafkaProducer(props, // new StringSerializer(), new StringSerializer());// 创建ProducerRecord对象其中topic、value是必填项其余属性都是可选项partition、timestamp、key、headersProducerRecordString, String record new ProducerRecord(topic, hello, Kafka!);try {producer.send(record);// 异步发送,获取回调对象获取发送结果 // producer.send(record, new Callback() { // Override // public void onCompletion(RecordMetadata metadata, Exception exception) { // if (exception null) { // System.out.println(metadata.partition() : metadata.offset()); // } // } // });} catch (Exception e) {e.printStackTrace();}finally {// 关闭生产者实例producer.close();}// TimeUnit.SECONDS.sleep(5);} }1.2、序列化 生产者需要用序列化器Serializer把对象转换成字节数组才能通过网路发送给Kafka。 消费者需要用反序列化器Deserializer把字节数组转换成相应的对象才能使用。 生产者使用的序列化器和消费者使用的反序列化器必须是一致的否则消费者无法正常消费生产者发送的消息。 常见序列化器 ByteArrayByteBufferBytesDoubleIntegerLongString 上面列举的序列化器都是Kafka提供的如果需要自定义序列化器需要实现Serializer接口 org.apache.kafka.common.serialization.Serializer,此接口有三个方法 configure(MapString, ? configs, boolean isKey) 该方法主要用来配置当前类通过传入的configs参数获取配置信息isKey参数用来指明当前配置的是key的序列化器还是value的序列化器 serialize(String topic, T data) 该方法用来将给定的对象序列化成字节数组 close() 该方法用来关闭当前序列化器一般情况下可以空实现如果实现了此方法则必须保证此方法的幂等性 1.3、分区器 分区器Partitioner是生产者在将消息发送到Kafka集群时根据分区策略选择消息发送的分区。 Kafka提供了默认的分区策略即DefaultPartitioner该分区器会根据ProducerRecord对象中的key来计算分区号。 如果key为null则使用轮询的方式选择分区如果key不为null则使用key的hash值来计算分区号。如果需要自定义分区器需要实现Partitioner接口org.apache.kafka.clients.producer.Partitioner该接口有两个方法 Partitioner接口的方法 configure(MapString, ? configs) 该方法主要用来配置当前类通过传入的configs参数获取配置信息该方法在创建分区器实例时调用一次用来初始化分区器这个方法来自Partitioner的父接口Configurable该接口还有一个方法void close()用来关闭当前分区器一般情况下可以空实现 partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) 该方法用来计算分区号topic当前消息所属的topickey当前消息的keykeyBytes当前消息key的字节数组value当前消息的valuevalueBytes当前消息value的字节数组cluster当前Kafka集群的信息返回值当前消息的分区号 自定义分区器 /*** 自定义分区器**/ public class DemoPartitioner implements Partitioner {private final AtomicInteger counter new AtomicInteger(0);Overridepublic int partition(String topic, Object key, byte[] keyBytes,Object value, byte[] valueBytes, Cluster cluster) {ListPartitionInfo partitions cluster.partitionsForTopic(topic);int numPartitions partitions.size();if (null keyBytes) {return counter.getAndIncrement() % numPartitions;} elsereturn Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;}Overridepublic void close() {}Overridepublic void configure(MapString, ? configs) {} }实现自定义的DemoPartitioner分区器后需要在配置文件中指定分区器的全限定类名即partitioner.class属性。 配置添加方式 props.put(“partitioner.class”, “com.supanpan.kafka.demo.partitioner.DemoPartitioner”); props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DemoPartitioner.class.getName()); 1.4、生产者拦截器 拦截器Interceptor是在消息在序列化和反序列化过程中对消息进行处理的组件它是在消息生产者和消费者与Kafka集群之间的一个拦截点可以在消息发送前和消费之后对消息进行一些定制化的操作。 Kafka拦截器有两种类型 生产者拦截器消费者拦截器 拦截器是Producer和Consumer的一个公共接口分别对应两个子接口ProducerInterceptor和ConsumerInterceptor。 ProducerInterceptor public ProducerRecordK, V onSend(ProducerRecordK, V record);public void onAcknowledgement(RecordMetadata metadata, Exception exception);public void close();public void configure(MapString, ? configs); ConsumerInterceptor public ConsumerRecordsK, V onConsume(ConsumerRecordsK, V records);public void onCommit(MapTopicPartition, OffsetAndMetadata offsets);public void close();public void configure(MapString, ? configs); onSend方法会在消息被序列化以前和封装成ProducerRecord对象之后调用可以利用该方法对消息进行定制化操作比如修改消息的某些内容或者增加消息的头部信息等。 生产者拦截器示例 public class ProducerInterceptorPrefix implements ProducerInterceptorString, String {private volatile long sendSuccess 0;private volatile long sendFailure 0;Overridepublic ProducerRecordString, String onSend(ProducerRecordString, String record) {String modifiedValue prefix1- record.value();return new ProducerRecord(record.topic(),record.partition(), record.timestamp(),record.key(), modifiedValue, record.headers()); // if (record.value().length() 5) { // throw new RuntimeException(); // } // return record;}Overridepublic void onAcknowledgement(RecordMetadata recordMetadata,Exception e) {if (e null) {sendSuccess;} else {sendFailure;}}Overridepublic void close() {double successRatio (double) sendSuccess / (sendFailure sendSuccess);System.out.println([INFO] 发送成功率 String.format(%f, successRatio * 100) %);}Overridepublic void configure(MapString, ? map) {} }在KafkaProducer的配置参数中指定拦截器的全限定类名即interceptor.classes属性。 配置方式 props.put(“interceptor.classes”, “com.supanpan.kafka.demo.interceptor.ProducerInterceptorPrefix”); props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptorPrefix.class.getName()) KafkaProducer中不仅可以指定一个拦截器还可以指定多个拦截器形成拦截链 多个拦截器的执行顺序与它们在配置文件中的顺序一致即先配置的拦截器先执行后配置的拦截器后执行 配置的时候各个拦截器之间使用逗号隔开 1.5、原理分析 整个生产者客户端由两个线程协调运行这两个线程分别是main线程主线程和Sender线程发送线程。 main线程负责接收客户端的请求将请求转发给Sender线程然后等待Sender线程的响应结果。 在主线程中由KafkaProducer创建消息然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器RecordAccumulator也称为消息收集器中。 Sender线程负责从RecordAccumulator中拉取消息批次Batch并将消息批次发送给Kafka集群。 Sender线程将消息批次发送给Kafka集群后会根据Kafka集群的响应结果对消息批次中的消息进行分类分为发送成功的消息和发送失败的消息。Sender线程会将发送失败的消息重新放入RecordAccumulator中等待下次发送。Sender线程会将发送成功的消息提交给RecordAccumulatorRecordAccumulator会将消息从消息缓冲区中移除。 RecordAccumulator 主要用来缓存消息以便Sender线程可以批量发送进而减少网络传输的资源消耗以提升性能 RecordAccumulator内部维护了一个消息缓冲区该缓冲区由多个消息批次组成每个消息批次中可以存放多条消息。 RecordAccumulator内部的消息缓冲区是一个双端队列每个消息批次都是一个双端队列中的一个元素。 主线程中发送过来的消息都会被追加到RecordAccumulator的某个双端队列Deque中在RecordAccumulator的内部为每个分区都维护了一个双端队列队列中的内容就是ProducerBatch即Deque ProducerBatch 。消息写入缓存时追加到双端队列的尾部Sender读取消息时从双端队列的头部读取 RecordAccumulator内部的消息缓冲区中的消息批次是按照消息的topic和partition进行组织的即每个topic-partition对应一个消息批次。 消息发送流程 KafkaProducer.send()方法将消息发送给KafkaProducer内部的RecordAccumulator消息累加器。KafkaProducer内部的Sender线程不断从RecordAccumulator中拉取消息批次Batch并将消息批次发送给Kafka集群。KafkaProducer内部的Sender线程将消息批次发送给Kafka集群后会根据Kafka集群的响应结果对消息批次中的消息进行分类分为发送成功的消息和发送失败的消息。KafkaProducer内部的Sender线程会将发送失败的消息重新放入RecordAccumulator中等待下次发送。KafkaProducer内部的Sender线程会将发送成功的消息提交给RecordAccumulatorRecordAccumulator会将消息从消息缓冲区中移除。 消息发送失败的情况 消息发送失败的情况 消息发送失败的情况主要有两种 一种是消息发送失败但是可以重试比如网络异常等。另一种是消息发送失败且不可重试比如消息太大、消息格式错误等。 对于第一种情况KafkaProducer内部的Sender线程会将发送失败的消息重新放入RecordAccumulator中等待下次发送。对于第二种情况KafkaProducer内部的Sender线程会将发送失败的消息放入RecordAccumulator中但是不会重试发送因为这种情况下消息是不可恢复的。 消息发送失败的处理 消息发送失败的处理主要有两种方式 一种是将消息发送失败的异常抛出给用户由用户来决定如何处理。另一种是将消息发送失败的异常记录到日志中然后由KafkaProducer内部的Sender线程来处理。 KafkaProducer内部的Sender线程会将发送失败的消息重新放入RecordAccumulator中等待下次发送。KafkaProducer内部的Sender线程会将发送成功的消息提交给RecordAccumulatorRecordAccumulator会将消息从消息缓冲区中移除。 1.6、重要的生产者参数 acks 这个参数用来指定分区中必须要有多少个副本收到这条消息之后生产者才会认为这条消息是成功写入的acks默认值 acks0 生产者在成功写入消息之前不会等待任何来自服务器的响应acks设置为0可以达到最大的吞吐量但是会丢失一些消息 acks1 默认值即为1生产者发送消息之后只要分区的leader副本成功写入消息那么它就会收到来自服务端的成功响应如果leader副本在成功写入消息之后但是还没有来得及将消息同步到所有的follower副本之前这时候如果leader副本宕机那么这条消息就会丢失为了避免消息丢失生产者可以选择重发消息 acksall 或 acks-1 生产者发送消息之后只有当分区的leader副本成功写入消息并且所有的follower副本都成功写入消息之后生产者才会收到来自服务端的成功响应 这种情况下只要有一个副本存活那么这条消息就不会丢失但是这种情况下由于需要等待所有的副本都成功写入消息之后生产者才会收到来自服务端的成功响应所以生产者的吞吐量会受到影响在这个模式下可以达到最强的可靠性 参数配置方式(注意都是字符串形式) properties.put(“acks”, “0”);properties.put(ProducerConfig.ACKS_CONFIG, “0”); max.request.size 该参数用来指定生产者发送消息的最大值默认值为1048576字节即1MB如果生产者发送的消息大小超过了max.request.size参数指定的值那么生产者会抛出RecordTooLargeException异常参数配置方式 properties.put(“max.request.size”, “10485760”);properties.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 10485760); retries retry.backoff.ms retries参数用来指定生产者发送消息失败后重试发送的次数默认值为0即不进行重试retry.backoff.ms参数用来指定两次重试发送消息的间隔默认值为100ms避免无效的频繁重试参数配置方式 properties.put(“retries”, “3”);properties.put(ProducerConfig.RETRIES_CONFIG, 3);properties.put(“retry.backoff.ms”, “500”);properties.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 500); compression.type 该参数用来指定消息的压缩类型默认值为none即不压缩常见配置 none不压缩gzip使用GZIP算法压缩snappy使用Snappy算法压缩lz4使用LZ4算法压缩 参数配置方式 properties.put(“compression.type”, “gzip”);properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, “gzip”); connections.max.idle.ms 该参数用来指定生产者与Kafka集群建立连接的空闲时间默认值为540000即9分钟 linger.ms 该参数用来指定生产者在发送消息前等待一段时间希望可以等到更多的消息一起发送以减少网络请求的次数从而提升性能默认值为0即立即发送参数配置方式 properties.put(“linger.ms”, “1000”);properties.put(ProducerConfig.LINGER_MS_CONFIG, 1000); receive.buffer.bytes 该参数用来指定Socket接收消息缓冲区SO_RECBUF大小默认值为32768字节即32KB参数配置方式 properties.put(“receive.buffer.bytes”, “65536”);properties.put(ProducerConfig.RECEIVE_BUFFER_CONFIG, 65536); send.buffer.bytes 该参数用来指定Socket发送消息缓冲区SO_SNDBUF大小默认值为131072字节即128KB参数配置方式 properties.put(“send.buffer.bytes”, “131072”);properties.put(ProducerConfig.SEND_BUFFER_CONFIG, 131072); request.timeout.ms 该参数用来指定生产者发送消息到Kafka集群时等待响应的最大时间默认值为30000即30秒参数配置方式 properties.put(“request.timeout.ms”, “60000”);properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000);
http://www.tj-hxxt.cn/news/142778.html

相关文章:

  • 连城县住房和城乡建设局 网站网页升级中每天自动更新什么意思
  • 网站会员发展计划基于大数据的精准营销
  • 郑州模板建站无锡网站建设 微信公众号
  • 北京网站建设+知乎导航网站html模板
  • 网站和自媒体都可以做招工 最新招聘信息
  • 有没有免费的企业网站建设做网站 有哪些问题
  • 广西医院响应式网站建设方案中国最新新闻头条
  • 金山区网站制作建设电影网站视频
  • 汕头seo建站wordpress开发公司
  • 黄州网站建设百度2022年版本下载
  • 网站开发关键技术开发网站年度工作总结及明年工作计划
  • 黄页88网站关键词怎么做人才网站开发
  • 外贸多语言网站建设贷款申请入口
  • 主题资源网站建设作业北京工程质量建设协会网站
  • 一般门户网站深情密码免费观看网站
  • 联通网站备案系统WordPress响应式幻灯片
  • 上海网站制作优化免费舆情网站直接打开
  • 大连科技公司建设网站个人如何做微商城网站
  • 网站建设工作计划表网站建设立项申请书
  • 自己可以做网站空间吗中国建设银行2024版本
  • 自己做网站系统首选平台网站发展阶段怎么做
  • 榆社网站建设河南省建设监理协会网站人才十
  • 网站推广计划书具体包含哪些基本内容?阜阳网站制作公司多少钱
  • 亿网行网站建设一个人做网站 没有人写文章怎么办
  • 那个网站专利分析做的好做动漫网站的意义
  • 百度描述 网站做的网站怎么样才能再网上看到
  • 建立个人网站流程建设工程检测报告查询网站
  • wordpress 数据库 恢复seo云优化是什么意思
  • 怎么开发个人网站wordpress适合seo
  • 可以做外链网站黄页网站建设