做业帮网站,英文网站建设szjijie,专业柳州网站建设多少钱,电子商务网站的特点一#xff0c;基础知识 1#xff0c;消费者与消费组 每个消费者都有对应的消费组#xff0c;不同消费组之间互不影响。 Partition的消息只能被一个消费组中的一个消费者所消费#xff0c; 但Partition也可能被再平衡分配给新的消费者。 一个Topic的不同Partition会根据分配…一基础知识 1消费者与消费组 每个消费者都有对应的消费组不同消费组之间互不影响。 Partition的消息只能被一个消费组中的一个消费者所消费 但Partition也可能被再平衡分配给新的消费者。 一个Topic的不同Partition会根据分配策略消费者客户端参数partition.assignment strategy分给不同消费者。 2Kafka的消息模式
消息中间件一般有两种消息投递模式点对点模式和发布订阅模式Kafka 同时支持两种。 如果所有的消费者都属于同一消费组那么所有的消息都会被均衡地投递给每个消费者即每条消息只会被一个消费者处理这就相当于点对点模式 如果所有的消费者都隶属于不同的消费组那么所有的消息都会被广播给所有的消费者即每条消息会被所有的消费者处理这就相当于发布订阅模式 二Client开发 1消费逻辑需要具备以下几个步骤 配置消费者参数及创建消费者实例 订阅主题 拉取消息并消费 提交消费位移 关闭消费者实例 public class Consumer {private static final String BROKER_LIST localhost:9092;private static final String TOPIC TOPIC-A;private static final String GROUP_ID GROUP-A;private static final AtomicBoolean IS_RUNNING new AtomicBoolean(true);public static Properties initConfig() {Properties properties new Properties();// 以下3个必须properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 客户端IDproperties.put(ConsumerConfig.CLIENT_ID_CONFIG, eris-kafka-consumer);// 消费组IDproperties.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);// 自动提交默认为trueproperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);return properties;}public static void main(String[] args) {Properties properties initConfig();KafkaConsumerString, String kafkaConsumer new KafkaConsumerString, String(properties);kafkaConsumer.subscribe(Arrays.asList(TOPIC));try {while (IS_RUNNING.get()) {// poll内部封装了消费位移提交、消费者协调器、组协调器、消费者的选举、分区分配与再均衡、心跳等// Duration用来控制在消费者的缓冲区里没有可用数据时阻塞等待的时间0表示不等待直接返回ConsumerRecordsString, String records kafkaConsumer.poll(Duration.ofMillis(1000));for (ConsumerRecordString, String r : records) {print(topic: r.topic() , patition: r.partition() , offset: r.offset());print(key: r.key() , value: r.value());}}} catch (WakeupException e) {// wakeup方法是KafkaConsumer中唯一可以从其他线程里安全调用的方法调用wakeup后可以退出poll的逻辑并抛出WakeupException。我们也不需处理WakeupException它只是一种跳出循环的方式。} catch (Exception e) {e.printStackTrace();} finally {// maybe commit offset.kafkaConsumer.close();}}
} 注 意KafkaConsumer是非线程安全的wakeup()方法是 KafkaConsume 中唯一可以从其他线程里安全调用的方法。 2subscribe有4个重载方法 public void subscribe(CollectionString topics, ConsumerRebalanceListener listener)
public void subscribe(CollectionString topics)// 在之后如果又创建了新主题并且与正表达式相匹配那么这个消费者也可以消费到新添加的Topic
public void subscribe (Pattern pattern, ConsumerRebalanceListener listener)
public void subscribe (Pattern pattern) 3assign订阅指定的分区
消费者不仅可以通过 subscribe方法订阅还可以直接订阅指定分区 如下 consumer.assign(Arrays.asList(new TopicPartition (topic-demo, 0))) ; 通过subscribe订阅主题具有消费者自动再均衡功能 可以根据分区分配策略来自动分配各个消费者与分区的关系 通过assign来订阅分区时是不具备消费者自动均衡的功能的。 4取消订阅
以下三行代码等效。 consumer.unsubscribe();
consumer.subscribe(new ArrayListString()) ;
consumer.assign(new ArrayListTopicPartition()); 5消息消费 消费者消费到的每条消息的类型为ConsumerRecord 这个和生产者发送的ProducerRecord相对应。 注意与ConsumerRecords区别ConsumerRecords实现了Iterable是poll返回的对象。 public class ConsumerRecordK, V {private final String topic;private final int partition;pr vate final long offset;private final long timestamp;private final TimestampType timestampType;private final int serializedKeySize;private final int serializedValueSize;private final Headers headers;private final K key;private final V value;private volatile Long checksum;省略若干方法
} 根据Partition或Topic来消费消息 ① 根据分区对当前批次消息分类public ListConsumerRecordK, V records(TopicPart tion partition)for (TopicPartition tp : records.partitions()) {for (ConsumerRecordString, String record : records.records(tp)) {System.out.println(record.partition() : record.value());}
}② 根据主题对当前批次消息分类public IterableConsumerRecordK, V records(String topic)// ConsumerRecords类中并没提供与partitions()类似的topics()方法来查看拉取的消息集中所含的主题列表。
for (String topic : Arrays.asList(TOPIC)) {for (ConsumerRecordString, String record : records.records(topic)) {System.out.println(record.topic() : record.value());}
} 6反序列化 ConsumerRecord里的value就是经过反序列化后的业务对象。 Kafka所提供的反序列器有ByteBufferDeserializer、ByteArrayDeserializer、BytesDeserializer、DoubleDeserializer、FloatDeserializer、IntegerDeserializer、LongDeserializer、ShortDeserializer、StringDeserializer。 在实际应用中在Kafka提供的序列化器和反序列化器满足不了应用需求的前提下推荐使用 Avro、JSON、Thrift、 ProtoBuf、Protostuff等通用的序列化工具来包装不建议使用自定义的序列化器或反序列化器。 三位移提交 1消费位移提交
在每次调用poll方法时返回的是还没有被消费过的消息集。 消费位移必须做持久化保存否则消费者重启之后就无法知晓之前的消费位移。再者当有新的消费者加入时那么必然会再均衡某个分区可能在再均衡之后分配给新的消费者如果不持久化保存消费位移那么这个新消费者也无法知晓之前的消费位移。 在旧消费者客户端中消费位移存储在ZooKeeper中而在新消费者客户端中则存储在Kafka内部的主题 __consumer_offsets中。 这里把将消费位移持久化的动作称为“提交” 消费者在消费完消息之后需要执行消费位移的提交。 2三个位移的关系 lastConsumedOffset当前消费到的位置即poll拉到的该分区最后一条消息的offset committed offset提交的消费位移 position下次拉取的位置 position committed offset lastConsumedOffset 1当然position和committed offset 并不会一直相同 TopicPartition tp new TopicPartition(topic, 0);
kafkaConsumer.assign(Arrays.asList(tp));
long lastConsumedOffset 0;while (true) {ConsumerRecordsString, String consumerRecords kafkaConsumer.poll(Duration.ofMillis(1000));ListConsumerRecordString, String partitionRecords consumerRecords.records(tp);lastConsumedOffset partitionRecords.get(partitionRecords.size() - 1).offset();// 同步提交消费位移kafkaConsumer.commitSync();System.out.println(consumed off set is lastConsumedOffset);OffsetAndMetadata offsetAndMetadata kafkaConsumer.committed(tp);System.out.println(commited offset is offsetAndMetadata.offset());long posititon kafkaConsumer.position(tp);System.out.println(he offset of t he next record is posititon);
}输出结果
consumed offset is 377
commited offset is 378
the offset of the next record is 378 3消息丢失与重复消费 如果poll后立马提交位移之后业务异常再次拉取就从新位移开始就丢失了数据。 如果poll后先处理数据处理到一半异常了或者最后提交位移异常重新拉取会从之前的位移拉就重复消费了。 4自动提交位移原理 Kafka中默认的消费位移的提交方式是自动提交这个由消费者客户端参数enable.auto.commit配置默认值为true。 不是每消费一条消息就提交一次而是定期提交这个定期的周期时间由客户端参数auto.commit.interval.ms配置默认值为5秒此参数生效的前提是enable.auto.commit参数为true。 在默认的方式下消费者每隔5秒会将拉取到的每个分区中最大的消息位移进行提交。动位移提交的动作是在poll()方法的逻辑里完成的在每次真正向服务端发起拉取请求之前会检查是否可以进行位移提交如果可以那么就会提交上一次轮询的位移。 自动提交让编码更简洁但随之而来的是重复消费和消息丢失的问题。 5手动提交位移
① 同步提交 ConsumerRecordsString, String records kafkaConsumer.poll(Duration.ofMillis(1000));
for (ConsumerRecordString, String record : records) {//do some logical processing kafkaConsumer.commitSync();
}1commitSync会根据poll拉取的最新位移来进行提交注意提交的值对应于图3-6 position的位置〉。2可以使用带参方法提交指定位移commitSync(final MapTopicPartition OffsetAndMetadata offsets)3没必要每条消息提交一次可以改为批量提交。 ② 异步提交 public void commitAsync()
public void commitAsync(OffsetCommitCallback callback)
public void commitAsync(final MapTopicPartition, OffsetAndMetadata offsets, OffsetCommitCallback callback)1异步提交在执行的时候消费者线程不会被阻塞可能在提交消费位移的结果还未返回之前就开始了新一次的poll操作。2提交也可能会异常可引入重试机制。但重试可能出现问题若第一次commitAsync失败在重试第二次成功了然后第一次重试也成功了就会覆盖位移为之前。解决方案可以提交时维护一个序号如果发现过期的序号就不再重试。 总结不管是自动提交还是手动提交同步、异步都可能出现漏消费和重复消费一般情况提交位移这一步操作很少失败至于业务异常如何影响提交需要结合具体情况分析。 可以引入重试机制重试提交或者业务处理。但重试会增加代码逻辑复杂度。 还有对于消费者异常退出重复消费的问题就很难避免因为这种情况下无法及时提交消费位移需要消费者的幂等处理。 如果消费者正常退出或发生再均衡况那么可以 在退出或再均衡执行之前使用同步提交的方式做最后的把关。 try {while (IS_RUNNING.get()) {// poll records and do some logical processing .kafkaConsumer.commitAsync();}
} finally {try {kafkaConsumer.commitSync();} finally {kafkaConsumer.close();}
} 四暂停或恢复消费 暂停某些分区在poll时返回数据给客户端和恢复某些分区返回数据给客户端。 public void pause(CollectionTopicPartition partitions)
public roid resume(CollectionTopicPartition partitions) 五指定位移消费 有了消费位移的持久化才使消费者在关闭、崩溃或者在遇到再均衡的时候可以让接替的消费者能够根据存储的消费位移继续进行消费。 当新的消费组建立的时候它根本没有可以查找的消费位移。或者消费组内的新消费者订阅了一个新的主题它也没有可以查找的消费位移。 auto.offset.reset 配置 ① 作用 决定从何处消费
② 何时生效 找不到消费位移记录时 位移越界时seek
③ 配置值 默认值auto.offset.resetlatest从分区末尾开始消费 auto.offset.resetearliest从分区起始开始消费; seek方法 ① 定义指定消费的位置可以向前跳过若干消息或回溯消息。 // partition分区offset从哪个位置消费
public void seek(TopicPartition partition, long offset) ② seek方法只能重置消费者分配到的分区的消费位置而分区的分配是在 poll方法过程中实现的也就是说在执行seek之前需要先执行一次poll等到分配到分区之后才可以重置消费位置。 SetTopicPartition assignment new HashSet();
while (assignment.size() 0) {// 如采不为空则说明已经成功分配到了分区kafkaConsumer.poll(Duration.ofMillis(1000));assignment kafkaConsumer.assignment();
}for (TopicPartition tp : assignment) {kafkaConsumer.seek(tp, 10);
}
while (true) {ConsumerRecordsString, String records kafkaConsumer.poll(Duration.ofMillis(1000));//consume the record
} ③ 如果对当前消费者未分配到的分区执行 seek方法那么会报IllegalStateException。 ④ 如果消费者启动能找到位移记录又想从头或者尾消费可以通过seek结合endOffsets、beginningOffsets或者直接seekToBeginning、seekToEnd实现。 注意分区的起始位置是0但并不代表每时每刻都为0因为日志清理会清理旧的数据 所以分区的起始位置自然会增加。 ⑤ 按时间戳指定消费位置 public MapTopicPartition, OffsetAndTimestamp offsetsForTimes(MapTopicPartition, Long timestampsToSearch)
public MapTopicPartition, OffsetAndTimestamp offsetsForTimes(MapTopicPartition, Long timestampsToSearch, Duration timeout)给定待查分区和时间戳返回大于等于该时间戳的第一条消息对应的offset和timestamp对应于OffsetAndTimestamp中的offset、timestamp字段。 ⑥ 将分区消费位移存储在数据库、文件等外部介质再通过seek指定消费可以配合再均衡监听器来实现新消费者的继续消费。 六消费再均衡 再均衡是指分区的所属权从一个消费者转移到另一消费者的行为。它为消费组具备高可用性和伸缩性提供保障使我们可以既方便又安全地删除消费组内的消费者或往消费组内添加消费者。 不过 在再均衡发生期间的这一小段时间消费组会变得不可用。而且再均衡容易导致重复消费等问题一般情况应尽量避免不必要的再均衡。 再均衡监听器 ConsumerRebalanceListener 注册
subscribe(CollectionString topics, ConsumerRebalanceListener listener) 和 subscribe(Patten pattern, ConsumerRebalanceListener listener)ConsumerRebalanceListener是一个接口有2个方法。(1) void onPartitionsRevoked(CollectionTopicPartition partitions)
再均衡开始之前和消费者停止读取消息之后被调用。(2) void onPartitionsAssigned(CollectionTopicPartition partitions)
新分配分区之后和消费者开始拉取消费之前被调用 。 示例①消费时把位移暂存在Map再均衡之前同步提交避免发送再均衡的同时异步提交还没提交上去重复消费。 MapTopicPartition, OffsetAndMetadata currentOffsets new HashMap();
kafkaConsumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {Overridepublic void onPartitionsRevoked(CollectionTopicPartition partitions) {kafkaConsumer.commitSync(currentOffsets);currentOffsets.clear();}Overridepublic void onPartitionsAssigned(CollectionTopicPartition partitions) {//do nothing .}
});
ConsumerRecordsString, String records kafkaConsumer.poll(Duration.ofMillis(1000));
for (ConsumerRecordString, String record : records) {//process the recordcurrentOffsets.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() 1));
}
kafkaConsumer.commitAsync(currentOffsets, null); 示例②把位移存在db再均衡后通过seek定位继续消费 kafkaConsumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {Overridepublic void onPartitionsRevoked(CollectionTopicPartition partitions) {// store offset in DB}Overridepublic void onPartitionsAssigned(CollectionTopicPartition partitions) {for (TopicPartition tp : partitions) {// 从DB中读取消费位移kafkaConsumer.seek(tp, getOffsetFromDB(tp));}}
} 七消费者拦截器 与生产者拦截器对应消费者拦截器需要自定义实现org.apache.kafka.clients.consumer.Consumerlnterceptor接口。 Consumerlnterceptor有3个方法 // poll方法返回之前调用可以修改返回的消息内容、按照某种规则过滤消息等
public ConsumerRecordsK, V onConsume(ConsumerRecordsK , V records);// 提交完消费位移之后调用可以用来记录跟踪所提交的位移信息比如当使用commitSync的无参方法时我们不知道提交的消费位移而onCommit方法却可以做到这一点
public void onCommit(MapTopicPartition, OffsetAndMetadata offsets);public void close(); 在消费者中也有拦截链的概念和生产者的拦截链一样 也是按照interceptor.classes参数配置的拦截器的顺序来一一执行的。如果在拦截链中某个拦截器执行失败那么下一个拦截器会接着从上一个执行成功的拦截器继续执行。 八消费者多线程模型 ① 每个消费者单独线程只消费一个分区 模型简单自动、手动提交位移都很简单。 优点每个分区可以按顺序消费 缺点多个TCP连接消耗 ② 一个消费者拉取提交线程池处理 各分区的消费是散乱在各线程提交位移顺序复杂。 优点拉取快TCP连接少 缺点分区消费顺序不好保障。 上图是自动提交位移。如果要手动提交可考虑共享offsets方式同时为了避免对同一个分区后序批次提交了更大的位移前序批次处理失败造成的消息丢失可以考虑滑动窗口机制。参考《深入理解Kafka核心设计与实践原理》