当前位置: 首页 > news >正文

网站后台编辑器不能用个人主页html源码

网站后台编辑器不能用,个人主页html源码,山东咕果做网站怎么样,外贸网站建设长沙一、上下文 之前的博客我们分析了Kafka的设计思想、Kafka的Producer端、Kafka的Server端的分析#xff0c;为了完整性#xff0c;我们接下来分析下Kafka的Consumer。《Kafka-代码示例》中有对应的Consumer示例代码#xff0c;我们以它为入口进行分析 二、KafkaConsumer是什…一、上下文 之前的博客我们分析了Kafka的设计思想、Kafka的Producer端、Kafka的Server端的分析为了完整性我们接下来分析下Kafka的Consumer。《Kafka-代码示例》中有对应的Consumer示例代码我们以它为入口进行分析 二、KafkaConsumer是什么 一个从Kafka集群中消费记录的客户端KafkaConsumer与broker交互会随着获取的TopicPartition在集群中因故障迁移而自己调整。 KafkaConsumer允许消费者组使用consumer groups对消费进行负载平衡 三、offset Kafka为分区中的每条记录维护一个offset该offset充当该分区内记录的唯一标识符也表示consumer的消费进度。例如如果offset5就表示offset为0-4的记录已经被消费。 consumer的position(TopicPartition)给出了下一条记录的offset这将比消费者在该分区中看到的最高偏移量大一个。每次consumer在调用poll(Duration) 收到消息时都会推进offset的增长。 consumer可以交由 kafka 定期自动提交offset 。也可以 调用 commitSync() 手动提交。 如果consumer进程失败并重新启动会从维护的offset处开始继续消费。 1、自动提交offset 如下的示例依赖于自动提交offset Properties props new Properties(); //指定broker列表用于连接kafka集群1个或多个都可以尽量多个 props.setProperty(bootstrap.servers, localhost:9092); //定义的组为test props.setProperty(group.id, test); //配置自动提交 props.setProperty(enable.auto.commit, true); //自动提交频率1000表示1秒提交1次 props.setProperty(auto.commit.interval.ms, 1000); props.setProperty(key.deserializer, org.apache.kafka.common.serialization.StringDeserializer); props.setProperty(value.deserializer, org.apache.kafka.common.serialization.StringDeserializer); KafkaConsumerString, String consumer new KafkaConsumer(props); //订阅 foo bar 两个topic的数据 consumer.subscribe(Arrays.asList(foo, bar)); while (true) {ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(100));for (ConsumerRecordString, String record : records)System.out.printf(offset %d, key %s, value %s%n, record.offset(), record.key(), record.value()); } 当程序正常运行offset会随着consumer的消费自动向前推进如果consumer发生故障会出现重复消费或数据丢失的情况。 如果Kafka以auto.commit.interval.ms的频率提交了offsetconsumer在下一次自动提交前出现故障当consumer再次启动后会出现重复消费的现象。 如果consumer刚刚拉回来一批数据准备处理此刻Kafka正好自动提交offset但是consumer出现了故障当consumer再次启动后会出现这批数据丢失的现象。 2、手动提交offset 如下的示例依赖于手动提交offset Properties props new Properties(); props.setProperty(bootstrap.servers, localhost:9092); props.setProperty(group.id, test); //关闭自动提交 props.setProperty(enable.auto.commit, false); props.setProperty(key.deserializer, org.apache.kafka.common.serialization.StringDeserializer); props.setProperty(value.deserializer, org.apache.kafka.common.serialization.StringDeserializer); KafkaConsumerString, String consumer new KafkaConsumer(props); consumer.subscribe(Arrays.asList(foo, bar)); final int minBatchSize 200; ListConsumerRecordString, String buffer new ArrayList(); while (true) {ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(100));for (ConsumerRecordString, String record : records) {buffer.add(record);}if (buffer.size() minBatchSize) {insertIntoDb(buffer);//手动提交consumer.commitSync();buffer.clear();} } 用户可以根据自己处理数据的逻辑控制何时提交offset。 在这个例子中我们需要积累一定数量的数据后再批量插入数据库中。因此只有插入数据库后才需要提交offset。 这里还会发生一个小概率事件这批数据插入数据库成功但是再手动提交offset时consumer发生故障。这会导致consumer重启后发生重复消费现象。因此最好把插入数据和提交offset放入到一个事务中要么都成功要么都失败。 上面的示例使用 commitSync() 将所有接收到的记录标记为已提交。在某些情况下可能希望通过明确指定offset来更好地控制已提交的记录。比如下面示例 try {while(running) {ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(Long.MAX_VALUE));for (TopicPartition partition : records.partitions()) {ListConsumerRecordString, String partitionRecords records.records(partition);for (ConsumerRecordString, String record : partitionRecords) {System.out.println(record.offset() : record.value());}long lastOffset partitionRecords.get(partitionRecords.size() - 1).offset();//提交的偏移量应始终是应用程序将读取的下一条消息的偏移量//因此这里给的是lastOffset 1consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset 1)));}} } finally {consumer.close(); } 四、consumer groups Kafka使用consumer groups的概念来允许一个进程池来划分消费和处理记录的工作。这些进程可以在同一台机器上运行也可以分布在许多机器上为处理提供可扩展性和容错性。 共享统一 group.id  的所有consumer 都将属于同一消费者组。组中的每个消费者都可以通过其中一个subscribe(Collection, ConsumerRebalanceListener) API动态设置要订阅的topic列表。Kafka会将订阅topic中的每条消息传递给每个消费者组中的一个进程。这是通过平衡消费者组中所有成员之间的分区来实现的这样每个分区都被分配给组中的一个消费者。 如果一个topic有四个分区一个消费者组有两个进程那么每个进程会被分配两个分区来消费。 consumer group中的consumer是动态维护的如果要给consumer出现故障分配给它的分区将被重新分配给同一组中的其他consumer。同样如果新的consumer加入该组分区将从现有consumer移动到新的consumer。这被称 重新平衡 组。 当topic中的分区增多时也会触发 组的 重新平衡 当组重新分配自动发生时可以通过ConsumerRebalanceListener通知消费者这允许他们完成必要的应用程序级逻辑如状态清理、手动偏移提交等。 消费者也可以使用assign()手动分配特定分区。在这种情况下动态分区分配和消费者组协调将被禁用。 五、监测consumer故障 consumer订阅了一组topic后当调用poll(Duration)  时消费者将自动加入一个组。poll(Duration) 可以让Kafka认为该consumer是活动状态consumer就会留在组中并继续从分配给它的分区接收消息。 consumer定期向服务器发送心跳如果consumer崩溃或在 session.timeout.ms  的持续时间内无法发送心跳则consumer将被视为已死亡其分区将被重新分配。 consumer也可能遇到“livelock”情况即它继续发送心跳但没有调用poll(Duration) 为了防止consumer在这种情况下无限期地保留其分区Kafka使用 max.poll.interval.ms 设置提供了一种活性检测机制。如果consumer在max.poll.interval.ms内没有执行poll(Duration)消费数据那么consumer将主动离开该组以便另一个consumer可以接管其分区。 当这种情况发生时可能会看到偏移提交失败如调用commitSync() 时抛出的CommitFailedException 异常。这是一种安全机制保证只有组中的活跃成员才能提交偏移。因此要想留在组中就必须持续调用 poll(Duration) consumer提供两个配置来控制轮询循环的行为 max.poll.interval.ms 增加轮询之间的间隔让consumer可以有更多的事件来处理拉取的数据缺点增加此值可能会延迟组重新平衡。 max.poll.records 使用此设置可限制单次轮询调用返回的总记录数通过调整此值您可以减少轮询间隔这将减少组重新平衡的影响 如果拉回的消息处理时间变化不可预测这两种选项可能都不能完美的解决。处理这种情况的推荐方法是将消息处理转移到另一个线程或者线程池。此consumer只负责拉取数据。如果使用这种方法需要关闭自动提交offset当数据处理完成进行手动提交offset。 六、为consumer手动指定分区 一般情况下consumer只要订阅topic或topic列表并让Kafka根据组中的活动consumer为这些主题动态分配公平的分区份额。但某些情况下可能需要对特定分区进行更精细的控制。 1、如果进程正在维护与该分区相关联的某种本地状态如本地磁盘键值存储那么它应该只获取它在磁盘上维护的分区的记录。 2、如果进程本身具有高可用性并且在失败时将重新启动可能使用YARN、Mesos或AWS设施等集群管理框架或者作为流处理框架的一部分。在这种情况下Kafka不需要检测故障并重新分配分区因为消费进程将在另一台机器上重新启动。 使用示例 String topic foo; TopicPartition partition0 new TopicPartition(topic, 0); TopicPartition partition1 new TopicPartition(topic, 1); consumer.assign(Arrays.asList(partition0, partition1)); 手动分区分配不使用组协调因此消费者故障不会导致分配的分区重新平衡。每个消费者独立行动即使它与另一个消费者共享一个groupId。为了避免偏移提交冲突通常应该确保每个消费者实例的groupId都是唯一的。 请注意不可能将手动分区分配和订阅topic混合使用。 七、将offset维护转移到Kafka外部 Kafka自身会维护一个内部topic用于存储offset。当然我们也可以选择将offset存储在外部存储系统例如mysql。也可以将消费结果和offset存放在同一个存储系统以原子方式存储结果和偏移量用数据库事务 将 数据消费和offset提交绑定在一起。这将使消费完全原子化并给出“恰好一次”的语义该语义比您使用Kafka的偏移提交功能获得的默认“至少一次”语义更强。 此时的做法是 1、配置手动提交 enable.auto.commitfalse 2、使用每个 ConsumerRecord提供的偏移量来保存您的位置 3、重新启动时使用seek(TopicPartition, long)恢复消费者的位置 如果consumer手动指定分区那么将offset维护到外部是简单的因为指定的这个分区一直都是归属与你这个consumer来消费的。如果consumer消费的分区时Kafka给你自动分配的则会因为consumer的变化等因素导致你当下的consumer被分到的分区变化。这时需要通过对subscribe(Collection, ConsumerRebalanceListener)和subscribe(Pattern, ConsumerRebalanceListener)的调用提供一个ConsumerRebalanceListener 例如当consumer获取分区时consumer希望通过实现ConsumerRebalanceListener.onPartitionsRevoked(Collection)来提交这些分区的偏移量。当将分区分配给消费者时消费者将希望查找这些新分区的偏移量并通过实现ConsumerRebalanceListener.onPartitionsAssigned(Collection)将consumer正确初始化到该位置。 ConsumerRebalanceListener的另一个常见用途是清除应用程序为移动到其他地方的分区维护的任何缓存 八、控制consumer的消费位置 在大多数用例中consumer只需从头到尾消费记录定期提交其位置自动或手动。然而Kafka允许consumer手动控制其位置在分区中随意向前或向后移动。这意味着consumer可以重新消费旧记录或者跳到最近的记录而无需实际消费中间记录。 例如对于时间敏感的记录处理如果数据已经生产了很长时间对于一个新的consumer来说就很有意义。它可以直接处理最新的数据或者它只关心中午12点之后的数据。 九、流量控制 如果一个consumer被分配了多个分区来获取数据它将尝试同时从所有分区中消费从而有效地赋予这些分区相同的消费优先级。然而在某些情况下consumer可能希望首先全速处理某一个分区。当这个分区没有数据时才处理其他分区。 还有一种情况那就是流处理一个consumer从两个topic消费数据并对这两个流进行连接。当其中一个topic远远落后与另一个topic时consumer会暂停从速度快的topic获取数据以便让滞后的topic跟上。 Kafka支持消费流的动态控制在未来的 poll(Duration)调用中分别使用pause(Collection)和resume()来暂停指定分配分区上的消费和恢复指定暂停分区上的消费。 十、消费事务性消息 Kafka 0.11.0引入了事务其中producer可以原子地写入多个topic和partition。为了实现这一点从这些分区读取的consumer应该被配置为只读取已提交的数据。既isolation.levelread_committed 在read_committed模式下consumer将只读取已成功提交的事务消息。consumer的分区结束偏移量将是分区中属于打开事务的第一条消息的偏移量。这种偏移被称为“最后稳定偏移”LSO LSO 是 Last Stable Offset 的缩写 consumer只会读取LSO并过滤掉任何已中止的事务消息。LSO还影响consumer的seekToEnd(Collection)和endOffsets(Collection)行为。 带有事务消息的分区将包括表示事务结果的提交或中止标记。这些标记不会返回给consumer但在日志中有偏移。 因此从具有事务性消息的topic中读取的应用程序将看到所消耗的偏移量存在缺口。这些缺失的消息将成为事务标记并在两个隔离级别中为consumer过滤掉。此外消费者也可能会看到由于事务中止而导致的空白因为这些消息不会被consumer返回但会有有效的偏移量。 十一、多线程消费 Kafka消费者不是线程安全的。我们有责任确保多线程访问正确同步。不同步访问将导致ConcurrentModificationException 此规则的唯一例外是 wakeup()它可以从外部线程安全地用于中断活动操作。在这种情况下将从操作的线程阻塞中抛出org.apache.kafka.common.errors.WakeupException这可用于从另一个线程关闭消费者。 如下示例 public class KafkaConsumerRunner implements Runnable {private final AtomicBoolean closed new AtomicBoolean(false);private final KafkaConsumer consumer;public KafkaConsumerRunner(KafkaConsumer consumer) {this.consumer consumer;}{literal}Overridepublic void run() {try {consumer.subscribe(Arrays.asList(topic));while (!closed.get()) {ConsumerRecords records consumer.poll(Duration.ofMillis(10000));// 处理新记录 ......}} catch (WakeupException e) {// 如果关闭忽略异常if (!closed.get()) throw e;} finally {consumer.close();}}// Shutdown hook which can be called from a separate thread// 关闭钩子可以从单独的线程调用public void shutdown() {closed.set(true);consumer.wakeup();} } 然后在一个单独的线程中可以通过设置closed标志并唤醒消费者来关闭消费者。 closed.set(true); consumer.wakeup(); 我们看以下几种情况 1、1个consumer一个线程 优点 1、最容易实施 2、它通常是最快的因为不需要线程间的协调 3、它使得基于每个分区的按顺序处理非常容易实现每个线程只按接收消息的顺序处理消息。 缺点 1、更多的consumer意味着更多的TCP连接到集群每个线程一个。一般来说Kafka非常有效地处理连接因此这通常是一个很小的成本。 2、多个consumer意味着向服务器发送的请求更多数据批处理略有减少这可能会导致I/O吞吐量下降。 3、所有进程中的线程总数将受到分区总数的限制 2、将消费和处理解耦 consumer只负责接收数据处理数据的逻辑让另一个线程池来处理 优点 此选项允许独立扩展consumer和处理器的数量。这使得有可能有一个为多个处理器线程提供数据的单一consumer从而避免了对分区的任何限制。 缺点 1、保证处理器之间的顺序需要特别小心因为线程将独立执行由于线程执行时间的运气较早的数据块实际上可能会在较晚的数据块之后被处理。对于没有订购要求的加工这不是问题。 2、手动提交 offset 变得更加困难因为它要求所有线程协调以确保该分区的处理完成。
http://www.tj-hxxt.cn/news/223477.html

相关文章:

  • 可以做兼职笔译的网站中信建设网站
  • 自助建站吧网站图怎么做会高清图片
  • 大酒店网站源代码网站设计网页配色
  • 手机平板购物网站的设计背景郑州市域名服务公司
  • 怎么做网站二级页面做义工的网站
  • 珠海市斗门建设局网站网络推广招聘
  • 中国数学外国人做视频网站房价必涨的十大城市
  • 上线了怎么建网站泰安新闻头条最新消息
  • 怎么创建卡密网站乐山市城乡规划建设局网站
  • 外卖网站怎么做北京企业网站案例
  • 高中信息技术课程做网站网站宣传图
  • 学网站维护怎么改一个网站的关键词密度
  • 个人做众筹网站合法吗wordpress更换域名后台登不进去
  • 建设银行儿童网站2345网址导航站
  • 网站设计制作厂家有哪些网站为什么要seo
  • 洛阳微信平台网站建设推荐一个免费网站
  • 小馋网站建设书专业的营销型网站建设价格
  • 苗木网站素材佛山网站建设业务员
  • 企业网站一般要素即墨区城乡建设局网站官网
  • 好的网站有哪些灰色关键词排名代发
  • wordpress付费建站北京城建建设工程有限公司网站
  • 哈尔滨网站建设吧网站页面太多怎么做网站地图
  • 网站图片移动怎么做的自己做视频的网站吗
  • 工程建设标准化期刊网站最近三天的新闻大事国内
  • 校园安全网站建设如何优化移动端网站
  • 企业网站制作 厦门如何把wordpress的文章页写成模板
  • 快速微信网站设计工信部信息备案网站查询系统
  • 网站根目录相对路径做网站赚大钱
  • 大一学生做的网站校园网站素材
  • 商城网站规划百度收录的网页数量