当前位置: 首页 > news >正文

手机建网站 优帮云wordpress design

手机建网站 优帮云,wordpress design,国内网站速度慢,小说网站怎么做空间小目录 生产者发送数据原理参数说明代码示例#xff08;同步发送数据#xff09;代码示例#xff08;异步#xff09; 异步和同步的区别同步发送定义与流程特点 异步发送定义与流程特点 异步回调描述代码示例 拦截器描述代码示例 消息序列化描述代码示例#xff08;自定义序… 目录 生产者发送数据原理参数说明代码示例同步发送数据代码示例异步 异步和同步的区别同步发送定义与流程特点 异步发送定义与流程特点 异步回调描述代码示例 拦截器描述代码示例 消息序列化描述代码示例自定义序列化 分区描述分区策略代码示例写入默认分区0号分区自定义分区机制 消息丢失消息绝对不丢失的条件 数据去重描述幂等性事务代码示例事务 生产者 发送数据原理 说明 拦截器允许有多个可以组成拦截器链生产者发送的消息会被分配到不同的分区Partition。每个分区在内存中都有一个对应的缓冲区RecordAccumulator用于暂存即将发送的消息。sender线程从中读取数据sender线程两个重要参数大小默认16KB当数据量达到16KB时读取读取时间默认0ms当达到读取时间时自动读取数据不管大小有没有达到这两个参数均可以调整NetworkClient负责将生产者的请求如发送消息、获取元数据等发送到相应的broker并存储这些请求的响应NetworkClient还负责处理网络连接的建立、维护和关闭。当生产者发送消息到broker时可以选择不同的应答级别acks参数 acks0生产者不等待broker的应答直接认为消息发送成功。这种方式性能最高但可靠性最低。 acks1生产者等待leader broker的应答只要leader broker确认收到消息就认为消息发送成功。这种方式性能较高但可靠性略低。 acksall或acks-1leader和follower都落地回应才认为消息发送成功。这种方式性能最低但可靠性最高。 Broker在收到消息后会根据配置的应答机制向生产者发送应答或错误信息。请注意关于broker的落地是指数据存储到磁盘或持久化数据发送成功后接收到应答删除缓冲区内对应的数据 参数说明 参数默认值作用描述bootstrap.serversnode2:9092[,node3:9092][,node4:9092]生产者连接集群所需的broker地址清单一个或多个逗号隔开key.serializer无指定发送消息的key的序列化类型必须写全类名value.serializer无指定发送消息的value的序列化类型必须写全类名buffer.memory32MRecordAccumulator缓冲区总大小batch.size16K缓冲区一批数据最大值适当增加可提高吞吐量但可能增加延迟linger.ms0ms表示没有延迟如果数据未达到batch.sizesender等待linger.time后发送数据acks-1应答机制0-不需要应答1-Leader应答-1all-所有节点应答max.in.flight.requests.per.connection5允许最多没有返回ack的次数开启幂等性时建议1-5之间enable.idempotencetrue是否开启幂等性默认开启retries2147483647int最大值消息发送错误时的重试次数retry.backoff.ms100ms两次重试之间的时间间隔compression.typenone生产者发送的所有数据的压缩方式默认不压缩 代码示例同步发送数据 package com.wunaiieq;import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; import java.util.concurrent.ExecutionException;public class SyncCustomProducer {public static void main(String[] args) throws ExecutionException, InterruptedException {//TODO 1.声明并实例化Kafka Producer的配置文件对象Properties prop new Properties();//TODO 2.为配置文件对象设置参数//TODO 2.1 配置bootstrap_server(生产者连接集群所需的broker地址清单)prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,192.168.16.100:9092,192.168.16.101:9092,192.168.16.102:9092);//TODO 2.2 配置key和value的序列化类// 设置序列化器指定key和value的序列化类为StringSerializer用于将字符串类型的key和value转换为字节数组以便发送到Kafka。prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//TODO 3.声明并实例化生产者对象KafkaProducerString,String producer new KafkaProducerString, String(prop);//TODO 4.发送消息// producer.send(...).get()同步发送消息send()方法返回一个Future对象调用get()方法等待发送完成并获取结果。for(int i 0;i5;i){//同步发送消息producer.send(new ProducerRecord(topicA,sync_msgi)).get();}//TODO 5.关闭生产者producer.close();} }代码说明 类/对象描述用途PropertiesJava标准库中的类用于维护键值对列表。 Properties类提供了一种方便的方式来读取和写入属性文件通常是.properties文件在本代码中用于存储Kafka生产者的配置参数。KafkaProducerK, VKafka客户端库中的类用于向Kafka主题发送消息。泛型参数K和V分别表示消息键和值的类型。创建生产者实例发送消息到Kafka主题。ProducerConfigKafka客户端库中的类包含生产者配置的常量。提供配置参数的常量值如broker地址、序列化器等。ProducerRecordK, VKafka客户端库中的类表示要发送到Kafka主题的消息记录。泛型参数K和V分别表示消息键和值的类型。创建消息记录对象包含主题、键和值。StringSerializerKafka客户端库中的类实现了SerializerString接口用于将字符串类型的键或值序列化为字节数组。作为键和值的序列化器将字符串转换为字节数组进行传输。 效果 代码示例异步 package com.wunaiieq;import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class UnSyncCustomProducer {public static void main(String[] args) {//实例化PropertiesProperties prop new Properties();//集群节点prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,192.168.16.100:9092,192.168.16.101:9092,192.168.16.102:9092);//key和valueprop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//创建kafka生产者对象并写入响应参数KafkaProducerString, String producer new KafkaProducerString, String(prop);//发送数据for (int i 0; i 5; i) {//异步发送数据不调用get方法producer.send(new ProducerRecord(topicA, unsync_msg i));}producer.close();} } 异步和同步的区别 同步发送 定义与流程 定义 同步发送是指生产者在发送一条消息后会立即等待Kafka服务器的响应。只有在服务器返回成功响应后生产者才会继续发送下一条消息。 流程 生产者调用send()方法发送消息。 send()方法返回一个Future对象。 生产者调用Future对象的get()方法该方法会阻塞当前线程直到Kafka服务器返回响应或抛出异常。 生产者收到响应后根据结果成功或失败进行后续操作。 特点 高可靠性同步发送确保每条消息都被Kafka集群接收并持久化。生产者等待Kafka确认消息已经成功写入指定分区且复制到满足副本因子的节点上从而提高了消息的可靠性。 异常处理如果发送过程中发生异常生产者可以立即感知并处理避免了消息的丢失。 性能较低由于同步发送需要阻塞等待响应因此会增加消息的延迟降低系统的吞吐量。特别是在高并发场景下可能会导致线程资源的大量占用和性能瓶颈。 易调试便于发现和处理异常有利于开发和测试阶段的调试工作。 异步发送 定义与流程 定义 异步发送是指生产者在发送一条消息后不会立即等待Kafka服务器的响应而是继续发送下一条消息。发送方通过传递一个回调函数给send()方法该回调函数将在消息发送结果成功或失败可用时被异步调用。 流程 生产者调用send()方法发送消息并传递一个回调函数。 Kafka客户端将消息放入内部缓冲区并立即返回。 Sender线程负责将缓冲区中的消息批量发送到Kafka集群。 当消息发送成功或失败时Kafka客户端调用之前传递的回调函数通知生产者消息发送的结果。 特点 高性能异步发送方式下生产者无需等待每个消息的确认即可继续发送下一条消息从而提高了消息的发送效率适用于高吞吐量场景。 灵活性通过回调函数生产者可以对消息发送的结果进行异步处理如记录日志、重试发送等。 可靠性相对较低由于生产者不会立即得知消息是否成功写入Kafka因此消息的可靠性需要额外关注。如果生产者在发送消息后立即崩溃可能会导致部分消息丢失。 调试复杂由于消息发送和结果是异步的因此调试时可能需要更多的日志记录和监控手段来确保消息的可靠性和完整性。 异步回调 描述 回调函数会在 producer 收到 ack 时调用为异步调用该方法有两个参数分别是元数据信息RecordMetadata和异常信息Exception如果 Exception 为 null说明消息发送成功如果 Exception 不为 null说明消息发送失败。消息发送失败后会自动重试不需要再回调函数中手动重试。 代码示例 package com.wunaiieq; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; import java.util.concurrent.ExecutionException; public class UnSyncCallBackCustomProducer {public static void main(String[] args) throws ExecutionException, InterruptedException {//TODO 1.声明并实例化Kafka Producer的配置文件对象Properties prop new Properties();//TODO 2.为配置文件对象设置参数// 2.1 配置bootstrap_serversprop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,192.168.16.100:9092,192.168.16.101:9092,192.168.16.102:9092);// 2.2 配置key和value的序列化类prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//TODO 3.声明并实例化生产者对象KafkaProducerString,String producer new KafkaProducerString, String(prop);//TODO 4.发送消息for(int i 0;i5;i){//异步发送消息 不调用get()方法producer.send(new ProducerRecord(topicA, unsync_msg i),new Callback() {//如下方法在生产者收到acks确认时异步调用Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(e null){//无异常信息,笑死发送成功,输出主题和分区信息到控制台System.out.println(topic:recordMetadata.topic(),partition:recordMetadata.partition());}else{//打印异常信息System.out.println(e.getMessage());}}});Thread.sleep(5);}//TODO 5.关闭生产者producer.close();} }拦截器 描述 拦截器(Interceptor)是kafka0.10.0.0版本中引入的新功能主要用于实现clients端的定制化控制逻辑。它可以使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求比如修改消息等。同时允许指定多个Interceptor按序作用于同一条消息从而形成一个拦截器链Interceptor Chain。 自定义拦截器需要实现org.apache.kafka.clients.producer.ProducerInterceptor接口。 拦截器内部方法 onSend 方法 作用在消息发送之前进行拦截允许对消息进行修改或处理。参数接收一个ProducerRecord对象。返回值返回一个ProducerRecord对象可能是修改后的记录。应用场景添加消息头、修改消息内容、过滤消息等。 onAcknowledgement 方法 作用在消息发送成功或失败后进行回调。参数 RecordMetadata包含消息的元数据。Exception发送过程中可能抛出的异常成功时为null。 返回值无。应用场景记录发送结果、统计发送成功率、处理发送失败等。 close 方法 作用在拦截器不再使用时进行资源清理。参数无。返回值无。应用场景关闭打开的文件、释放内存、断开网络连接等。 拦截器Interceptor可能运行在多个线程中因此在具体实现时用户需要自行确保线程安全。另外若指定了多个Interceptor,则producer将按照指定顺序调用它们同时把每个Interceptor中捕获的异常记录写入到错误日志中而不是向上传递。这在使用过程中要特别留意。 代码示例 实现一个简单的双interceptor组成的拦截链。 第一个Interceptor会在消息发送前将时间戳信息加到消息value的前面 第二个Interceptor在消息发送后更新成功发送消息数或失败发送消息数。 第一个拦截器示例 package com.wunaiieq;import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Map;public class TimeStampInterceptor implements ProducerInterceptorString,String {/**初始化拦截器并接收Kafka生产者的配置参数。* */Overridepublic void configure(MapString, ? configs) {}/**发送之前被调用对消息进行处理。* */Overridepublic ProducerRecordString, String onSend(ProducerRecordString, String record) {return new ProducerRecordString, String(//原始消息记录的主题、分区、时间戳和键。record.topic(),record.partition(),record.timestamp(), record.key(),//将当前系统时间戳System.currentTimeMillis()和原始消息值拼接成新的消息值中间用逗号分隔。System.currentTimeMillis(),record.value());}/**消息发送成功或失败后被调用。* */Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {}/**在拦截器不再使用时进行资源清理。* */Overridepublic void close() {} } 第二个拦截器示例 package com.wunaiieq;import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Map;public class CounterIntercepter implements ProducerInterceptorString,String {private int errorCounter 0;private int successCounter 0;/**onSend方法该方法在消息发送之前被调用用于对消息进行处理。* 由于这是第二个拦截器因此这里接受的是前一个拦截器的输出* */Overridepublic ProducerRecordString, String onSend(ProducerRecordString, String record) {return record;}/**在消息发送成功或失败后被调用。* 统计消息发送成功或失败的数量* */Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {if(exceptionnull){successCounter;}else{errorCounter;}}/**拦截器关闭时会进行的额外操作* 打印成功或失败的消息数量* */Overridepublic void close() {System.out.println(successful send:successCounter);System.out.println(failed send:errorCounter);}Overridepublic void configure(MapString, ? configs) {} }拦截器调用 package com.wunaiieq;import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer;import java.util.ArrayList; import java.util.List; import java.util.Properties; import java.util.concurrent.ExecutionException;public class SyncCustomProducerInterceptor {public static void main(String[] args) throws ExecutionException, InterruptedException {Properties prop new Properties();prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,192.168.16.100:9092,192.168.16.101:9092,192.168.16.102:9092);prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 构造拦截器链ListString interceptors new ArrayList();interceptors.add(com.wunaiieq.TimeStampInterceptor);interceptors.add(com.wunaiieq.CounterIntercepter);//配置拦截器链(将拦截器链加入到配置文件中)prop.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);KafkaProducerString, String producer new KafkaProducerString, String(prop);for (int i 5; i 10; i) {//同步发送消息producer.send(new ProducerRecord(topicA, sync_msg i)).get();}//一定要关闭生产者这样才会调用interceptor的close方法producer.close();} }效果 消息序列化 描述 消息序列化是将对象转换为字节流的过程。在Kafka中生产者需要将消息对象序列化为字节流以便通过网络发送给Kafka集群而消费者则需要将从Kafka集群接收到的字节流反序列化为对象以便进行后续处理。 代码示例自定义序列化 pom.xml 增加依赖 dependencygroupIdorg.codehaus.jackson/groupIdartifactIdjackson-mapper-asl/artifactIdversion1.9.13/version/dependencyUserVo.java 值对象 package com.wunaiieq;public class UserVo {private String name;private int age;private String address;public UserVo(String name, int age, String address) {this.name name;this.age age;this.address address;}Overridepublic String toString() {return UserVo{ name name \ , age age , address address \ };}public String getName() {return name;}public void setName(String name) {this.name name;}public int getAge() {return age;}public void setAge(int age) {this.age age;}public String getAddress() {return address;}public void setAddress(String address) {this.address address;} }UserSerializer.java 重写Serializer接口实现序列化操作 package com.wunaiieq;import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Serializer; import org.codehaus.jackson.map.ObjectMapper; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Map;public class UserSerializer implements SerializerUserVo {private ObjectMapper objectMapper;Overridepublic void configure(MapString, ? configs, boolean isKey) {objectMapper new ObjectMapper();//Serializer.super.configure(configs, isKey);}/*** param topic 消息将要发送到的主题名* param data 需要序列化的UserVo对象。* */Overridepublic byte[] serialize(String topic, UserVo data) {//存储序列化后的字节数组byte[] ret null;try {//data写成JSON字符串再写成UTF_8的字节数组ret objectMapper.writeValueAsString(data).getBytes(StandardCharsets.UTF_8);} catch (IOException e) {throw new SerializationException(Error when serializing UserVo to byte[],exception is e.getMessage());}return ret;}Overridepublic void close() {objectMapper null;//Serializer.super.close();} } UserSerProducer.java 调用自定义序列化机制 package com.wunaiieq;import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties; import java.util.concurrent.ExecutionException;public class UserSerProducer {public static void main(String[] args) throws ExecutionException, InterruptedException {Properties prop new Properties();prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,192.168.16.100:9092,192.168.16.101:9092,192.168.16.102:9092);// TODO 不用修改key的序列化机制后续没用到prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// TODO 修改value的序列化机制prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, UserSerializer.class.getName());KafkaProducerString,UserVo producer new KafkaProducerString, UserVo(prop);UserVo userVo new UserVo(wunaiieq,18,北京);producer.send(// TODO 关于消息记录的构造中可以指定 1.主题、值 2.主题、键、值new ProducerRecordString,UserVo(topicA, userVo),new Callback() {//如下方法在生产者收到acks确认时异步调用Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(e null){//无异常信息输出主题和分区信息到控制台System.out.println(topic:recordMetadata.topic(),partition:recordMetadata.partition());}else{//打印异常信息System.out.println(e.getMessage());}}});Thread.sleep(50);producer.close();} } 效果 前面的不用管只是没清空而已 分区 描述 分区位于拦截器链后面 生产者分区的优势 便于合理使用存储资源每个Partition在一个Broker上存储可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。合理控制分区的任务可以实现负载均衡的效果。 提高并行度生产者可以以分区为单位发送数据消费者可以以分区为单位进行消费数据。 分区后更方便于做副本备份提高了数据安全性。 分区策略 以下为提供的默认分区策略自行选择即可 轮询策略Round-Robin Strategy 原理按照顺序将消息发送到不同的分区每个消息被发送到其对应分区循环轮询每个分区确保消息在所有分区之间均匀分布。 特点适用于生产者不需要根据消息内容或键选择特定分区的场景能够实现负载均衡最大限度地利用集群资源。 默认情况这是Kafka Java生产者API默认提供的分区策略。如果没有指定分区策略则会默认使用轮询策略。按键分配策略Key-Based Partitioning 原理消息的键被用作决定消息分区的依据。生产者将消息的键发送给KafkaKafka根据键的哈希值将消息路由到相应的分区。 特点适用于键值对的数据结构通过将具有相同键的消息发送到同一分区可以提高数据局部性和处理效率。同时能够保证具有相同键的消息顺序性。范围分区策略Range Partitioning 原理根据消息键的范围将消息分配到不同的分区。每个分区包含一个键值范围内的消息。 特点适用于有序数据的处理如时间戳或递增的ID。通过将具有相似时间戳或递增ID的消息分配到同一分区可以提高处理效率并保证数据的顺序性。自定义分区策略Custom Partitioning 原理用户可以根据特定的业务逻辑或规则来决定消息的分区。通过实现自定义的分区器类根据应用程序的需求来定义分区的逻辑。 特点提供了更高的灵活性可以根据地理位置、用户ID或其他业务规则来决定消息的分区。 实现方式实现org.apache.kafka.clients.producer.Partitioner接口并重写partition、close和configure方法。其中partition方法是核心用于根据给定的键、值和分区信息来计算分区号。粘性分区策略Sticky Partitioning 原理尽可能将消息分配到与之前消息相同的分区以减少跨分区的数据移动和复制。通过维护一个分区和消费者的映射关系来实现。 特点在消费者组或分区数量发生变化时能够尽可能减少对现有分区分配的影响减少负载均衡的开销提高处理效率。 代码示例 写入默认分区0号分区 消息只会发送到指定的分区内部 package com.wunaiieq.partition;import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties; import java.util.concurrent.ExecutionException;public class ProducerToPartition {public static void main(String[] args) throws ExecutionException, InterruptedException {//TODO 1.声明并实例化Kafka Producer的配置文件对象Properties prop new Properties();//TODO 2.为配置文件对象设置参数// 2.1 配置bootstrap_serversprop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,192.168.16.100:9092,192.168.16.101:9092,192.168.16.102:9092);// 2.2 配置key和value的序列化类prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//TODO 3.声明并实例化生产者对象KafkaProducerString,String producer new KafkaProducerString, String(prop);//TODO 4.发送消息for(int i 0;i5;i){//指定数据发送到0号分区key为nullproducer.send(new ProducerRecord(topicA,0,null, unsync_msg i),new Callback() {//如下方法在生产者收到acks确认时异步调用Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(e null){//无异常信息输出主题和分区信息到控制台System.out.println(topic:recordMetadata.topic(),partition:recordMetadata.partition());}else{//打印异常信息System.out.println(e.getMessage());}}});Thread.sleep(5);}//TODO 5.关闭生产者producer.close();} } 自定义分区机制 部分消息可能需要额外的处理内容比如审计等等这类消息的key会携带关键字符串“wunaiieq”现在让其发送到topicA主题的最后一个分区上以便于后续处理其他的消息则随机发送不包括最后一个分区 WunaiieqPartitioner.java 分区写入策略 package com.wunaiieq.partition;import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.PartitionInfo;import java.util.List; import java.util.Map; import java.util.Random;public class WunaiieqPartitioner implements Partitioner {private Random random;Overridepublic void configure(MapString, ? configs) {//该方法实现必要资源的初始化工作random new Random();}/** 计算信息对应的分区* param topic 主题* param key 消息的key* param keyBytes 消息的key序列化后的字节数组* param value 消息的value* param valueBytes 消息value序列化后的字节数组* param cluster 集群元数据 可以获取分区信息* return 息对应的分区号*/Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {//将key转换为字符串String keyInfo (String)key;//获取主题的分区对象列表ListPartitionInfo partitionInfoList cluster.availablePartitionsForTopic(topic);//获取主题下的分区总数量int partCount partitionInfoList.size();if (partCount 1) {System.out.println(1 partition);return 0; // 只有一个分区时直接返回0}//最后一个分区号int wunaiieqPartition partCount-1;//如果 key 为空、key 为空字符串或 key 不包含 wunaiieq则随机选择一个除最后一个分区外的分区否则消息发送到最后一个分区。return keyInfonull || keyInfo.isEmpty()||!keyInfo.contains(wunaiieq)? random.nextInt(partCount-1) : wunaiieqPartition ;}Overridepublic void close() {//该方法实现必要资源的清理工作random null;} } CustomPartitionerProducer.java 调用分区策略 package com.wunaiieq.partition;import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; import java.util.concurrent.ExecutionException;public class CustomPartitionerProducer {public static void main(String[] args) throws ExecutionException, InterruptedException {Properties prop new Properties();prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,192.168.16.100:9092,192.168.16.101:9092,192.168.16.102:9092);prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// TODO 在用于构造KafkaProducer的Properties对象中设置partitioner.class参数prop.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,com.wunaiieq.partition.WunaiieqPartitioner);KafkaProducerString,String producer new KafkaProducerString, String(prop);for(int i 0;i5;i){// TODO 不指定分区号key为wunaiieq测试运行一次改为kafka后再测试一次。producer.send(new ProducerRecord(topicA,aa, unsync_msg i),new Callback() {//如下方法在生产者收到acks确认时异步调用Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(e null){//无异常信息输出主题和分区信息到控制台System.out.println(topic:recordMetadata.topic(),partition:recordMetadata.partition());}else{//打印异常信息System.out.println(e.getMessage());}}});Thread.sleep(5);}producer.close();} } 消息丢失 判断消息丢失时一般看应答机制 acks0因为不需要等待leader数据持久化就完成应答leader宕机后可能存在数据丢失follower内部数据从leader中同步leader没有----follower也没有acks1此时leader持久化完成应答但是follower可能没有完成数据同步leader宕机导致数据丢失一般用于传输普通日志acksall或acks-1leader和follower都持久化后并回应才认为消息发送成功。这种方式性能最低但可靠性最高。传输重要数据。 特殊情况 在acks-1或all的情况下Leader接收到数据并持久化后所有Follower开始同步Leader刚刚持久化的数据但是有一个Follower因故障迟迟不能进行数据同步该问题应该怎么解决 Leader维护了一个动态的in-sync replica setISR意为和Leader保持同步的FollowerLeader集合(leader0isr:0,1,2)。 如果Follower长时间未向Leader发送通信请求或同步数据则该Follower将被踢出ISR。 该时间阈值由replica.lag.time.max.ms参数设定默认30000ms。例如1超时(leader:0, isr:0,2)。这样就不用等长期联系不上或者已经故障的节点。 消息绝对不丢失的条件 ACK级别设置为-1分区副本2ISR应答的最小副本数2 最小副本数有min.insync.replicas设置默认为1 生产中配置响应级别代码块 // 设置 acks prop.put(ProducerConfig.ACKS_CONFIG, all); //重试次数 retries默认是 int 最大值2147483647 prop.put(ProducerConfig.RETRIES_CONFIG, 3);数据去重 描述 数据重复的原因 当生产者发送消息到Kafka集群时如果由于网络故障或Kafka Broker的临时问题导致消息发送失败生产者通常会进行重试。如果重试时Kafka Broker已经成功处理了之前的消息但尚未发送确认ACK那么重试发送的消息就会导致数据重复。网络延迟或不稳定可能导致消息发送失败生产者会进行重试从而增加消息重复的风险。如果Kafka Broker在消息发送成功后崩溃但在发送确认ACK之前崩溃生产者可能会重试发送相同的消息导致消息重复。 数据去重 至少一次At Least OnceACK级别设置为-1 分区副本大于等于2 ISR里应答的最小副本数量大于等于2保证数据绝对不丢失但不能保证数据不重复 最多一次At Most OnceACK级别设置为0保证数据绝对不重复但不能保证数据不丢失 精确一次Exactly Once对于一些非常重要的信息比如和钱相关的数据要求数据既不能重复也不丢失。 幂等性 幂等性就是指Producer不论向Broker发送多少次重复数据Broker端都只会持久化一条保证了不重复。 精确一次Exactly Once 幂等性 至少一次 ack-1 分区副本数2 ISR最小副本数量2 。重复数据的判断标准具有PID, Partition, SeqNumber相同主键的消息提交时Broker只会持久化一条。其中PID是Kafka每次重启都会分配一个新的Partition 表示分区号Sequence Number是单调自增的。所以幂等性只能保证的是在单分区单会话内不重复。 使用幂等性的使用 开启参数 enable.idempotence 默认为 truefalse 关闭。 事务 寻找事务协调器KafkaProducer 使用 trans.id 寻找事务协调器 (Transaction Coordinator)。协调器通过 broker0 返回事务协调器的地址包括事务信息的主题和分区领导者 (transaction_state-分区-Leader)。请求 partId开启幂等性 KafkaProducer向事务协调器请求 partId并开启幂等性。事务协调器接收到请求后将请求持久化并返回 partId给 KafkaProducer。发送消息 KafkaProducer 使用返回的 partId 发送消息到指定主题的指定分区(topicA-Partition0 或 topicA-Partition1)。 6.发送 commit 请求 KafkaProducer发送 commit 请求到事务协调器以提交事务。 7.事务协调器接收到 commit 请求后将其持久化。事务成功事务协调器确认事务成功并返回成功信息给 KafkaProducer。 代码示例事务 package com.wunaiieq; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; import java.util.concurrent.ExecutionException; public class ProducerTransaction {public static void main(String[] args) throws ExecutionException, InterruptedException {Properties prop new Properties();prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,192.168.16.100:9092,192.168.16.101:9092,192.168.16.102:9092);prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//TODO 设置事务idprop.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,transaction_id_topicA_0);KafkaProducerString,String producer new KafkaProducerString, String(prop);//TODO 初始化事务producer.initTransactions();//TODO 开启事务producer.beginTransaction();//TODO 添加异常处理成功提交事务失败回滚事务try {//发送消息for (int i 0; i 5; i) {//同步发送消息producer.send(new ProducerRecord(topicA, sync_msg i)).get();}//TODO 提交事务producer.commitTransaction();}catch (Exception e){//TODO 放弃事务producer.abortTransaction();}producer.close();} }
http://www.tj-hxxt.cn/news/227583.html

相关文章:

  • 网站开发结构图深圳市建设工程交易服务中心网站
  • 广告公司管理制度兴安盟seo
  • 人工做流量的网站购物网站最近浏览怎么做
  • 河北高阳做网站的wordpress防止机器评论
  • 怎样做辅导班的网站保定网站建设的过程
  • 花都网站开发公司电脑培训中心
  • 建设企业网站的价格智慧团建网站注册登录入口
  • 沧州商城网站开发设计早教网站模板
  • 网站设计培训班前台公司部门解散 转岗不同意会怎么样
  • 永州网站建设多少钱最近发生的重大新闻事件
  • python 做 网站网页快速收录
  • 海口网站建设联系方式中国电信黄页网
  • 建设企业网站中国建设银行批量替换wordpress页面文字
  • 重庆手机版建站系统信息抖音代运营报价明细表
  • 可以做动态影集的网站响应式网站排名
  • 温州网站建设 温州网站制作东莞营销网站建设服务
  • 做网站需要商标注册吗友链交换网站源码
  • 上海网站开发定制手机膜+东莞网站建设
  • 做网站给源码吗网站开发研究前景
  • 免费网站推广app河北石家庄地图
  • 搭建科技网站价格天津公共资源交易平台官网
  • 网站文字规范网站维护知识
  • 遂昌网站建设人人网
  • 网站域名如何起温州整站推广咨询
  • 建设京东商城网站淄博网站开发
  • 虚拟网站佛山应用软件开发
  • 服务器php网站打不开网络营销推广的主要形式为
  • 济南网站建站公司企业展馆展厅设计公司
  • 自创网站娄底网站建设建站
  • 做新浪网网站所需的条件网站建设经验典型