深圳网站营销seo费用,大淘客网站logo怎么做,qq小程序api,搜索大全引擎入口网站文章目录 Kafka 命令详解及使用示例Kafka 命令详解kafka-topics.sh#xff1a;主题管理创建主题创建带副本的主题修改主题分区数了解分区分布列出主题查看主题详情删除主题 kafka-console-producer.sh#xff1a;消息生产者发送消息到主题带键值对的消息消息生产性能优化带分… 文章目录 Kafka 命令详解及使用示例Kafka 命令详解kafka-topics.sh主题管理创建主题创建带副本的主题修改主题分区数了解分区分布列出主题查看主题详情删除主题 kafka-console-producer.sh消息生产者发送消息到主题带键值对的消息消息生产性能优化带分区键的消息发送 kafka-console-consumer.sh消息消费者消费主题中的消息只读取键值对消息实时消费消息只消费特定分区的消息以 JSON 格式输出消息 kafka-consumer-groups.sh消费者组管理查看消费者组信息查看消费者组的偏移量信息重置消费者组的偏移量 kafka-configs.sh配置管理查看主题配置修改主题配置 kafka-acls.sh访问控制列表管理为用户创建权限删除用户权限 示例总结 Kafka 命令详解及使用示例
Kafka 是一个分布式流处理平台提供了高吞吐量、低延迟的消息系统。Kafka 主要用于消息发布-订阅模式中的消息传输广泛应用于数据管道、日志系统、事件追踪等场景。本文将介绍 Kafka 中常用的命令行工具及其具体使用方式帮助开发者更好地管理和使用 Kafka。 Kafka 命令详解
kafka-topics.sh主题管理
主题Topic是 Kafka 中消息的逻辑分类所有消息都发送到指定的主题中。kafka-topics.sh 用于管理主题包括创建、删除、列出主题等操作。
创建主题
bin/kafka-topics.sh --create --topic my-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1--topic主题名称。--partitions分区数消息将分布在多个分区中。--replication-factor副本因子用于消息的高可用性。
创建带副本的主题
在分布式环境中副本对于 Kafka 来说至关重要它能确保在 Broker 故障时消息不会丢失。创建主题时设置合适的副本数和分区数非常关键。
bin/kafka-topics.sh --create --topic important-topic --partitions 5 --replication-factor 3 --bootstrap-server localhost:9092--partitions 设置为 5意味着主题的数据会被分散到 5 个分区中提升并发处理能力。--replication-factor 设置为 3确保每个分区有 3 个副本在不同的 Broker 上提高容错性。 注意副本数不能超过集群中的 Broker 数量生产环境中一般设置副本数为 3保证高可用性。 修改主题分区数
Kafka 支持在线扩展主题的分区数。可以在不停止服务的情况下动态增加分区数但要注意增加分区会影响数据的顺序性因为 Kafka 不会自动对已存在的数据进行重分配。
bin/kafka-topics.sh --alter --topic important-topic --partitions 10 --bootstrap-server localhost:9092此命令将 important-topic 的分区数从 5 扩展到 10 个。
了解分区分布
通过 --describe 命令可以查看每个分区在哪些 Broker 上存储并了解它们的副本状态。
bin/kafka-topics.sh --describe --topic important-topic --bootstrap-server localhost:9092输出的结果会显示每个分区的副本和首领Leader在哪个 Broker 上。Leader 是处理读写请求的副本其他副本是跟随者用于容错。
列出主题
bin/kafka-topics.sh --list --bootstrap-server localhost:9092查看主题详情
bin/kafka-topics.sh --describe --topic my-topic --bootstrap-server localhost:9092删除主题
bin/kafka-topics.sh --delete --topic my-topic --bootstrap-server localhost:9092kafka-console-producer.sh消息生产者
Kafka 提供了一个控制台生产者工具允许我们从命令行发送消息到指定主题。
发送消息到主题
bin/kafka-console-producer.sh --topic my-topic --bootstrap-server localhost:9092输入消息后按 Enter 发送到 Kafka 主题。
带键值对的消息
bin/kafka-console-producer.sh --topic my-topic --bootstrap-server localhost:9092 --property parse.keytrue --property key.separator:在这里消息的键和值通过冒号分隔例如
key1:value1
key2:value2消息生产性能优化
在高吞吐量场景下可以通过调整生产者配置来提高性能。例如批量发送消息和异步生产可以显著提高效率。
bin/kafka-console-producer.sh --topic fast-topic --bootstrap-server localhost:9092 --producer-property batch.size16384 --producer-property linger.ms5batch.size控制批量消息的大小以字节为单位Kafka 会尝试将消息累积到这个大小后一起发送。linger.ms在批量消息发送前的等待时间可以通过稍微延迟发送消息来增加批量的大小。
此外生产者可以配置为异步发送这样可以减少网络等待时间
--producer-property acks1acks1 表示只等待 Leader 确认即可继续发送消息这种方式可以提高性能但有可能在 Leader 故障时丢失部分消息。
带分区键的消息发送
指定消息发送到特定的分区时可以使用 key 参数这在有状态的消息处理如事务处理场景中非常重要。
bin/kafka-console-producer.sh --topic partitioned-topic --bootstrap-server localhost:9092 --property parse.keytrue --property key.separator:这样每个消息都会根据键key被分配到相同的分区。例如key1:message1 和 key1:message2 会发送到相同的分区。 kafka-console-consumer.sh消息消费者
消费者用于从 Kafka 主题中读取消息。kafka-console-consumer.sh 是 Kafka 提供的命令行消费者工具。
消费主题中的消息
bin/kafka-console-consumer.sh --topic my-topic --from-beginning --bootstrap-server localhost:9092--from-beginning从主题的起始位置读取所有消息。
只读取键值对消息
bin/kafka-console-consumer.sh --topic my-topic --bootstrap-server localhost:9092 --property print.keytrue --property key.separator,这样读取的消息会显示为键和值的格式例如
key1,value1实时消费消息
使用 kafka-console-consumer.sh 来实时消费主题中的消息
bin/kafka-console-consumer.sh --topic fast-topic --bootstrap-server localhost:9092如果要从主题的起始位置读取消息可以添加 --from-beginning 参数。
只消费特定分区的消息
Kafka 支持直接从某个分区中读取消息。在某些场景下如故障恢复或日志分析我们可能只需要处理某个分区的数据
bin/kafka-console-consumer.sh --topic important-topic --bootstrap-server localhost:9092 --partition 0 --offset 10此命令从分区 0 开始读取第 10 条消息。
以 JSON 格式输出消息
Kafka 消费者可以输出 JSON 格式的消息方便后续处理和分析
bin/kafka-console-consumer.sh --topic json-topic --bootstrap-server localhost:9092 --formatter kafka.tools.DefaultMessageFormatter --property print.keytrue --property print.valuetrue --property key.separator, --property value.deserializerorg.apache.kafka.common.serialization.StringDeserializerkafka-consumer-groups.sh消费者组管理
Kafka 的消费者组允许多个消费者一起协同消费消息每个分区的消息只能被一个组内的消费者消费。kafka-consumer-groups.sh 可以用于管理消费者组。
查看消费者组信息
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list查看消费者组的偏移量信息
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group输出信息包括每个分区的已消费消息偏移量以及消费者的状态。
重置消费者组的偏移量
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-group --reset-offsets --to-earliest --execute --topic my-topic--to-earliest将偏移量重置为最早的消息。 kafka-configs.sh配置管理
Kafka 主题和代理的配置可以通过 kafka-configs.sh 进行管理。
查看主题配置
bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic --describe修改主题配置
bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic --alter --add-config retention.ms172800000此命令将主题 my-topic 的消息保留时间修改为 2 天单位为毫秒。 kafka-acls.sh访问控制列表管理
Kafka 提供了基于 ACL访问控制列表的权限管理。kafka-acls.sh 用于管理权限。
为用户创建权限
bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User:Alice --operation All --topic my-topic此命令允许用户 Alice 对主题 my-topic 执行所有操作。
删除用户权限
bin/kafka-acls.sh --bootstrap-server localhost:9092 --remove --allow-principal User:Alice --operation All --topic my-topic示例总结
我们通过几个简单的示例介绍了 Kafka 的基本操作
创建主题 my-topic并通过控制台生产者发送消息。使用控制台消费者从该主题中读取消息。管理消费者组的偏移量重置到最早的消息。修改主题的保留时间以及管理用户的权限。
Kafka 提供了丰富的命令行工具用于主题、消费者组、配置、权限等的管理。灵活使用这些命令可以帮助我们高效地维护 Kafka 集群。