嘉定建站公司,网址域名ip查询子域名解析,期货网站开发,提供免费主页空间的网站Kafka主题与分区
主题与分区
topic partition#xff0c;是Kafka两个核心的概念#xff0c;也是Kafka的基本组织单元。 主题作为消息的归类#xff0c;可以再细分为一个或多个分区#xff0c;分区也可以看作对消息的二次归类。 分区的划分为kafka提供了可伸缩性、水…Kafka主题与分区
主题与分区
topic partition是Kafka两个核心的概念也是Kafka的基本组织单元。 主题作为消息的归类可以再细分为一个或多个分区分区也可以看作对消息的二次归类。 分区的划分为kafka提供了可伸缩性、水平扩展性、容错性等优势。 分区可以有一个至多个副本每个副本对应一个日志文件每个日志文件对应一至多个日志分段LogSegment每个日志分段还可以细分为索引文件、日志存储文件和快照文件等
主题的管理
主题的管理 创建主题 查看主题信息 修改主题 删除主题
上述操作可以采用Kafka提供的kafka-topics.sh脚本来完成也可以采用Kafka提供的AdminClient来完成。 该脚本位于¥KAFKA_HOME/bin目录下
创建主题
创建主题的命令格式如下
kafka-topics.sh --bootstrap-server server:port \--create --topic topic \--partitions numPartitions \--replication-factor replicationFactor创建一个分区数为4、副本因子为2的主题
kafka-topics.sh --bootstrap-server localhost:9092 \--create --topic topic-create \--partitions 4 \--replication-factor 2创建一个分区数为4、副本因子为2的主题并且指定主题的配置信息
kafka-topics.sh --bootstrap-server localhost:9092 \--create --topic topic-create \--partitions 4 \--replication-factor 2 \--config max.message.bytes128000通过describe指令来查看分区副本的分配细节
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic topic-create使用replica-assignment参数手动指定分区副本的分配方案
使用这种方式根据分区号的数值大小按照从小到大的顺序进行排列
例如0:1:2,0:1:2,0:1:2,0:1:2 分区与分区之间用逗号分隔 分区与副本之间用冒号分隔
kafka-topics.sh --bootstrap-server localhost:9092 \--create --topic topic-create-same \--replica-assignment 0:1:2,0:1:2,0:1:2,0:1:2注意 同一个分区内的副本不能有重复比如0:01:1这样就会报出AdminCommandFailedException异常 分区之间所指定的副本数不同比如0:01:1这样就会报出AdminOperationException异常
主题命名规范 主题名称只能包含ASCII字母、数字、点、减号和下划线 主题名称长度不能超过249个字符 主题名称不能以点开头 不能以__开头这是Kafka内部使用的主题前缀 不能包含空格、单引号、双引号、逗号、分号、冒号和NULL字符 主题名称应该全部小写因为Kafka在区分主题名称时是不区分大小写的 主题名称不能与Kafka保留的名称冲突比如__consumer_offsets 主题名称不能与已经存在的消费者组名称冲突 主题名称不能与已经存在的主题名称冲突
查看主题信息
通过list指令来查看当前Kafka集群中所有可用的主题
kafka-topics.sh --bootstrap-server localhost:9092 --list通过describe指令来查看主题的详细信息
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic topic-create修改主题
当主题被创建之后依然允许我们对其做一定的修改比如修改分区数、修改副本因子、修改配置等。 通过alter指令来修改主题的配置信息
# 修改主题的最大消息字节数配置值从10000修改为20000kafka-topics.sh --bootstrap-server localhost:9092 \--alter --topic topic-config \--config max.message.bytes20000通过alter指令来修改主题的分区数
kafka-topics.sh --bootstrap-server localhost:9092 \--alter --topic topic-create \--partitions 6删除主题
通过delete指令来删除主题
kafka-topics.sh --bootstrap-server localhost:9092 \--delete --topic topic-delete通过delete-config参数来删除之前设置的配置信息
kafka-topics.sh --bootstrap-server localhost:9092 \--alter --topic topic-config \--delete-config max.message.bytes手动删除主题 主题中的元数据存储在Zookeeper中的/brokers/topics和/config/topics路径下 主题中的消息数据存储在log.dir或log.dirs配置的路径下只需要手动删除这些地方的数据即可。
配置管理
kafka-configs.sh脚本用于管理Kafka的配置信息该脚本位于$KAFKA_HOME/bin目录下 主要包含变更配置alter和查看配置describe两个指令
# 变更主题的配置信息
kafka-configs.sh --bootstrap-server localhost:9092 \--alter --entity-type topics --entity-name topic-config \--add-config max.message.bytes128000# 添加主题的配置信息
kafka-configs.sh --bootstrap-server localhost:9092 \--alter --entity-type topics --entity-name topic-config \--add-config max.message.bytes128000# 查看主题的配置信息
kafka-configs.sh --bootstrap-server localhost:9092 \--describe --entity-type topics --entity-name topic-config KafkaAdminClient
KafkaAdminClient是Kafka提供的一个管理客户端用于管理Kafka集群中的资源比如主题、分区、消费者组等。
TopicCommand基本使用
使用KafkaAdminClient来完成TopicCommand的基本操作
查看主题信息
public class demo{public static void describeTopic(){String[ ] options new String[ ]{--bootstrap-server localhost:9092,--describe,--topic, topic-create};kafka.admin.TopicCommand.main(options);}
}创建主题
public class demo{public static void createTopic(){String[ ] options new String[ ]{--bootstrap-server localhost:9092,--create,--replication-factor, 1,--partitions, 1,--topic, topic-create-api};kafka.admin.TopicCommand.main(options);}
}查看所有可用主题
public class demo{public static void listTopic(){String[ ] options new String[ ]{--bootstrap-server localhost:9092,--list};kafka.admin.TopicCommand.main(options);}
}KafkaAdminClient基本使用
KafkaAdminClient可以用来管理broker、配置和ACLAccess Control List以及管理主题、分区和消费者组等。 KafkaAdminClient继承了org.apache.kafka.clients.admin.AdminClient提供了一系列的API来管理Kafka集群中的资源。
AdminClient常见的方法 createTopics创建主题 CreateTopicsResult createTopics(Collection newTopics) deleteTopics删除主题 DeleteTopicsResult deleteTopics(Collection topics) listTopics列出所有可用的主题 ListTopicsResult listTopics() describeTopics查看主题的详细信息 DescribeTopicsResult describeTopics(Collection topicNames) describeCluster查看集群的详细信息 DescribeClusterResult describeCluster() describeConfigs查看配置的详细信息 DescribeConfigsResult describeConfigs(Collection resources) alterConfigs修改配置信息 AlterConfigsResult alterConfigs(MapConfigResource, Config configs) describeConsumerGroups查看消费者组的详细信息 DescribeConsumerGroupsResult describeConsumerGroups(Collection groupIds) listConsumerGroups列出所有可用的消费者组 ListConsumerGroupsResult listConsumerGroups() createPartitions创建分区 CreatePartitionsResult createPartitions(MapString, NewPartitions newPartitions)
使用KafkaAdminClient创建主题
public class KafkaAdminClientCreateTopic {/*** 使用AdminClient创建Topic** 创建完成之后使用如下脚本进行检查* 进入KAFKA_HOME/bin* 执行 ./kafka-topics.sh --bootstrap-server localhost:9092 --list*/public static void createTopic(){Properties props new Properties();props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092);AdminClient adminClient AdminClient.create(props);NewTopic newTopic new NewTopic(topic-create-api, 1, (short) 1);// 创建主题的方法内部是通过发送CreateTopicRequest请求来完成的CreateTopicsResult result adminClient.createTopics(Arrays.asList(newTopic));try {result.all().get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}// 使用完之后需要关闭AdminClient释放资源adminClient.close();}public static void main(String[ ] args) {createTopic();}
}使用KafkaAdminClient查看主题信息
public class KafkaAdminClientDescribeTopic {/*** 使用AdminClient查看Topic信息*/public static void describeTopic(){Properties props new Properties();props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092);AdminClient adminClient AdminClient.create(props);DescribeTopicsResult result adminClient.describeTopics(Arrays.asList(topic-create-api));try {MapString, TopicDescription map result.all().get();for (Map.EntryString, TopicDescription entry : map.entrySet()) {System.out.println(entry.getKey() : entry.getValue());}} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}// 使用完之后需要关闭AdminClient释放资源adminClient.close();}public static void main(String[ ] args) {describeTopic();}
}使用KafkaAdminClient查看所有可用的主题
public class KafkaAdminClientListTopic {/*** 使用AdminClient查看所有可用的Topic*/public static void listTopic(){Properties props new Properties();props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092);AdminClient adminClient AdminClient.create(props);ListTopicsResult result adminClient.listTopics();try {SetString set result.names().get();for (String s : set) {System.out.println(s);}} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}// 使用完之后需要关闭AdminClient释放资源adminClient.close();}public static void main(String[ ] args) {listTopic();}
}使用KafkaAdminClient创建分区
public class KafkaAdminClientCreatePartition {/*** 使用AdminClient创建分区*/public static void createPartition(){Properties props new Properties();props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092);AdminClient adminClient AdminClient.create(props);MapString, NewPartitions map new HashMap();NewPartitions newPartitions NewPartitions.increaseTo(2);map.put(topic-create-api, newPartitions);CreatePartitionsResult result adminClient.createPartitions(map);try {result.all().get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}// 使用完之后需要关闭AdminClient释放资源adminClient.close();}public static void main(String[ ] args) {createPartition();}
}使用KafkaAdminClient删除主题
public class KafkaAdminClientDeleteTopic {/*** 使用AdminClient删除Topic*/public static void deleteTopic(){Properties props new Properties();props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092);AdminClient adminClient AdminClient.create(props);DeleteTopicsResult result adminClient.deleteTopics(Arrays.asList(topic-create-api));try {result.all().get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}// 使用完之后需要关闭AdminClient释放资源adminClient.close();}public static void main(String[ ] args) {deleteTopic();}
}使用KafkaAdminClient修改主题配置
public class KafkaAdminClientAlterTopic {/*** 使用AdminClient修改Topic配置*/public static void alterTopic(){Properties props new Properties();props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092);AdminClient adminClient AdminClient.create(props);ConfigEntry configEntry new ConfigEntry(max.message.bytes, 128000);Config config new Config(Arrays.asList(configEntry));MapConfigResource, Config map new HashMap();ConfigResource configResource new ConfigResource(ConfigResource.Type.TOPIC, topic-create-api);map.put(configResource, config);AlterConfigsResult result adminClient.alterConfigs(map);try {result.all().get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}// 使用完之后需要关闭AdminClient释放资源adminClient.close();}public static void main(String[ ] args) {alterTopic();}
}使用KafkaAdminClient查看主题配置
public class KafkaAdminClientDescribeTopicConfig {/*** 使用AdminClient查看Topic配置*/public static void describeTopicConfig(){Properties props new Properties();props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092);AdminClient adminClient AdminClient.create(props);ConfigResource configResource new ConfigResource(ConfigResource.Type.TOPIC, topic-create-api);DescribeConfigsResult result adminClient.describeConfigs(Arrays.asList(configResource));try {MapConfigResource, Config map result.all().get();for (Map.EntryConfigResource, Config entry : map.entrySet()) {System.out.println(entry.getKey() : entry.getValue());}} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}// 使用完之后需要关闭AdminClient释放资源adminClient.close();}public static void main(String[ ] args) {describeTopicConfig();}
}
文章转载自: http://www.morning.mbmh.cn.gov.cn.mbmh.cn http://www.morning.rzscb.cn.gov.cn.rzscb.cn http://www.morning.mfbzr.cn.gov.cn.mfbzr.cn http://www.morning.mdwb.cn.gov.cn.mdwb.cn http://www.morning.bhwll.cn.gov.cn.bhwll.cn http://www.morning.bpttm.cn.gov.cn.bpttm.cn http://www.morning.ljjph.cn.gov.cn.ljjph.cn http://www.morning.rbknf.cn.gov.cn.rbknf.cn http://www.morning.bygyd.cn.gov.cn.bygyd.cn http://www.morning.pbgnx.cn.gov.cn.pbgnx.cn http://www.morning.lhptg.cn.gov.cn.lhptg.cn http://www.morning.nzkkh.cn.gov.cn.nzkkh.cn http://www.morning.gwtbn.cn.gov.cn.gwtbn.cn http://www.morning.dwyyf.cn.gov.cn.dwyyf.cn http://www.morning.yxplz.cn.gov.cn.yxplz.cn http://www.morning.wyzby.cn.gov.cn.wyzby.cn http://www.morning.xhgxd.cn.gov.cn.xhgxd.cn http://www.morning.rdqzl.cn.gov.cn.rdqzl.cn http://www.morning.sqmlw.cn.gov.cn.sqmlw.cn http://www.morning.hlzpb.cn.gov.cn.hlzpb.cn http://www.morning.xfdkh.cn.gov.cn.xfdkh.cn http://www.morning.mkczm.cn.gov.cn.mkczm.cn http://www.morning.jczjf.cn.gov.cn.jczjf.cn http://www.morning.ymdhq.cn.gov.cn.ymdhq.cn http://www.morning.xsetx.com.gov.cn.xsetx.com http://www.morning.zwdrz.cn.gov.cn.zwdrz.cn http://www.morning.ymdhq.cn.gov.cn.ymdhq.cn http://www.morning.qcygd.cn.gov.cn.qcygd.cn http://www.morning.nqlkb.cn.gov.cn.nqlkb.cn http://www.morning.gnhsg.cn.gov.cn.gnhsg.cn http://www.morning.zymgs.cn.gov.cn.zymgs.cn http://www.morning.msmtf.cn.gov.cn.msmtf.cn http://www.morning.yybcx.cn.gov.cn.yybcx.cn http://www.morning.ryfpx.cn.gov.cn.ryfpx.cn http://www.morning.dfojgo.cn.gov.cn.dfojgo.cn http://www.morning.rdxp.cn.gov.cn.rdxp.cn http://www.morning.krklj.cn.gov.cn.krklj.cn http://www.morning.fkmyq.cn.gov.cn.fkmyq.cn http://www.morning.bdypl.cn.gov.cn.bdypl.cn http://www.morning.mwqbp.cn.gov.cn.mwqbp.cn http://www.morning.mfbcs.cn.gov.cn.mfbcs.cn http://www.morning.gkpgj.cn.gov.cn.gkpgj.cn http://www.morning.hous-e.com.gov.cn.hous-e.com http://www.morning.rkfxc.cn.gov.cn.rkfxc.cn http://www.morning.nfyc.cn.gov.cn.nfyc.cn http://www.morning.phzrq.cn.gov.cn.phzrq.cn http://www.morning.jfsbs.cn.gov.cn.jfsbs.cn http://www.morning.skscy.cn.gov.cn.skscy.cn http://www.morning.wdhzk.cn.gov.cn.wdhzk.cn http://www.morning.zrkws.cn.gov.cn.zrkws.cn http://www.morning.qftzk.cn.gov.cn.qftzk.cn http://www.morning.lwzpp.cn.gov.cn.lwzpp.cn http://www.morning.wpwyx.cn.gov.cn.wpwyx.cn http://www.morning.wdshp.cn.gov.cn.wdshp.cn http://www.morning.kllzy.com.gov.cn.kllzy.com http://www.morning.dpflt.cn.gov.cn.dpflt.cn http://www.morning.qrpdk.cn.gov.cn.qrpdk.cn http://www.morning.fdrch.cn.gov.cn.fdrch.cn http://www.morning.snnwx.cn.gov.cn.snnwx.cn http://www.morning.jbhhj.cn.gov.cn.jbhhj.cn http://www.morning.gwqq.cn.gov.cn.gwqq.cn http://www.morning.jmdpp.cn.gov.cn.jmdpp.cn http://www.morning.gnghp.cn.gov.cn.gnghp.cn http://www.morning.wkjzt.cn.gov.cn.wkjzt.cn http://www.morning.qnywy.cn.gov.cn.qnywy.cn http://www.morning.pcjw.cn.gov.cn.pcjw.cn http://www.morning.zfyfy.cn.gov.cn.zfyfy.cn http://www.morning.fxzgw.com.gov.cn.fxzgw.com http://www.morning.mfrb.cn.gov.cn.mfrb.cn http://www.morning.ytfr.cn.gov.cn.ytfr.cn http://www.morning.qytby.cn.gov.cn.qytby.cn http://www.morning.ssjee.cn.gov.cn.ssjee.cn http://www.morning.rstrc.cn.gov.cn.rstrc.cn http://www.morning.rkjb.cn.gov.cn.rkjb.cn http://www.morning.ychrn.cn.gov.cn.ychrn.cn http://www.morning.lynb.cn.gov.cn.lynb.cn http://www.morning.mgtmm.cn.gov.cn.mgtmm.cn http://www.morning.blfgh.cn.gov.cn.blfgh.cn http://www.morning.lsjtq.cn.gov.cn.lsjtq.cn http://www.morning.bzfld.cn.gov.cn.bzfld.cn