校园网站建设与应用,武冈网站建设,如室室内设计官网,小程序公司十大排名#x1f407;明明跟你说过#xff1a;个人主页 
#x1f3c5;个人专栏#xff1a;《大数据前沿#xff1a;技术与应用并进》#x1f3c5; 
#x1f516;行路有良友#xff0c;便是天堂#x1f516; 
目录 
一、引言 
1、Kafka简介 
2、Kafka核心优势 
二、环境准备 
1… 
明明跟你说过个人主页 
个人专栏《大数据前沿技术与应用并进》 
行路有良友便是天堂 
目录 
一、引言 
1、Kafka简介 
2、Kafka核心优势 
二、环境准备 
1、服务器 
2、服务器环境初始化 
三、安装zookeeper 
1、上传tar包 
2、编辑配置文件 
3、创建数据目录 
4、安装JAVA 
5、启动zookeeper 
四、Kafka集群搭建 
1、上传tar包 
2、编辑配置文件 
3、创建数据目录 
4、启动kafka 
5、查看端口状态 
五、测试  
1、创建Topic 
2、生产消息  
3、消费消息  一、引言 
1、Kafka简介 
Apache Kafka 是一个开源的分布式流处理平台它最初由 LinkedIn 开发后来成为 Apache 软件基金会的顶级项目。Kafka 设计用于处理实时数据流提供了一种高效、可扩展、持久化的方式来进行数据发布和订阅。它通常被描述为一种分布式发布-订阅消息队列但它实际上超越了传统消息队列的概念。 2、Kafka核心优势 
1. 高吞吐量 
Kafka 能够处理海量数据支持每秒数十万条消息的读写操作即使在大规模部署中也能保持高性能。通过高效的文件系统设计和内存管理机制Kafka 能够在处理大量数据的同时保持低延迟。 
2. 持久性和可靠性 
Kafka 将数据存储在磁盘上并支持数据复制replication确保即使在节点故障的情况下也能保证数据的可靠性和持久性。数据以追加的方式写入日志文件减少了磁盘的随机写操作提高了写入速度和数据完整性。 
3. 可扩展性 
Kafka 具有良好的水平扩展能力可以通过增加更多的节点来提升系统的处理能力和存储容量。分布式架构使得 Kafka 能够轻松地在多台服务器上部署并且能够动态扩展和收缩集群大小。 
4. 灵活的发布-订阅模型 
Kafka 支持发布-订阅模式允许多个消费者订阅同一个主题并且消费者可以独立消费消息。消费者可以控制自己的消费进度不会影响其他消费者的状态实现了消息消费的解耦。 二、环境准备 
1、服务器 
准备3台或者5台Linux服务器用来组建高可用集群这里使用3台Centos 7.9来进行搭建大家也可以使用其他的Linux发行版本 
配置如下 2、服务器环境初始化 
3台机器都要执行 
关闭Selinux 
vi /etc/selinux/config#修改成如下
SELINUXdisabled 
之后重启服务器 
reboot 
关闭并禁用防火墙 
[rootkafka1 ~]# systemctl stop firewalld  systemctl disable firewalld 
修改 /etc/hosts 
vi /etc/hosts# 添加以下内容
192.168.40.100  kafka1
192.168.40.101  kafka2
192.168.40.102  kafka3 
修改镜像源  
curl -o /etc/yum.repos.d/CentOS-Base.repo https://mirrors.aliyun.com/repo/Centos-7.repo 
三、安装zookeeper 为什么安装Kafka时要先安装zookeeper  ZooKeeper 是一个分布式的协调服务它为分布式应用程序提供了一套完整的协调服务功能包括命名服务、配置管理、集群管理和同步等。Kafka 利用 ZooKeeper 来管理其集群中的多个组件确保系统的稳定性和一致性。 1、上传tar包 
apache-zookeeper-3.8.0-bin.tar.gz 
tar包可以去官网进行下载 Apache ZooKeeperhttps://zookeeper.apache.org/releases.html#download 
解压tar包至 /opt 下 
tar zxvf apache-zookeeper-3.8.0-bin.tar.gz -C /opt 
2、编辑配置文件 
vim /opt/apache-zookeeper-3.8.0-bin/conf/zoo.cfg# 输入如下内容
tickTime2000
initLimit10
syncLimit5
dataDir/opt/zookeeper/zkData
dataLogDir/opt/zookeeper/zkLog
clientPort2181
server.1kafka1:2188:3888
server.2kafka2:2188:3888
server.3kafka3:2188:3888
4lw.commands.whitelist* tickTime2000 
tickTime 定义了 ZooKeeper 服务器之间的心跳间隔时间毫秒。它是 ZooKeeper 中最基本的单位时间。默认值通常是 2000 毫秒即 2 秒。 
initLimit10 
initLimit 定义了初始同步阶段的最大超时时间心跳次数。这意味着在初始同步阶段跟随者follower必须在 initLimit * tickTime 毫秒内完成与领导者leader的同步。例如这里设置为 20 秒10 * 2000 毫秒。 
syncLimit5 
syncLimit 定义了在领导者和跟随者之间发送消息的最大超时时间心跳次数。这意味着在同步阶段跟随者必须在 syncLimit * tickTime 毫秒内响应领导者的请求。例如这里设置为 10 秒5 * 2000 毫秒。 
dataDir/opt/zookeeper/zkData 
dataDir 指定 ZooKeeper 服务器用来存储快照snapshot的目录。 
dataLogDir/opt/zookeeper/zkLog 
dataLogDir 指定 ZooKeeper 服务器用来存储事务日志transaction logs的目录。这是从 ZooKeeper 3.4.6 开始引入的一个配置项使得日志和数据可以分开存储。 
clientPort2181 
clientPort 指定客户端连接到 ZooKeeper 服务器的端口默认为 2181。 
server.1ka1:2188:3888 
server.N 表示第 N 台服务器的信息格式为 hostname:peerPort:leaderPort。peerPort 是服务器之间通信的端口leaderPort 是选举领导者时使用的端口。 
4lw.commands.whitelist* 
4lw.commands.whitelist 指定客户端可以执行的命令白名单。* 表示允许所有命令。  
3、创建数据目录 mkdir -p /opt/zookeeper/zkData 
mkdir -p /opt/zookeeper/zkLog 
创建集群ID文件 
在3台机器上分别执行 [rootkafka1 bin]# echo 1   /opt/zookeeper/zkData/myid
[rootkafka2 bin]# echo 2   /opt/zookeeper/zkData/myid
[rootkafka3 bin]# echo 3   /opt/zookeeper/zkData/myid 
4、安装JAVA 
yum install -y java-1.8.0-openjdk-devel 
5、启动zookeeper 
cd /opt/apache-zookeeper-3.8.0-bin/bin/ 
./zkServer.sh start ../conf/zoo.cfg 查看状态 
[rootkafka1 bin]# ps -aux | grep zook 四、Kafka集群搭建 
1、上传tar包 
资源包大家可以到官网下载 
https://kafka.apache.org/ 
解压至指定目录 
[rootkafka1 ~]# tar zxvf kafka_2.13-3.1.0.tgz -C /opt/
[rootkafka2 ~]# tar zxvf kafka_2.13-3.1.0.tgz -C /opt/
[rootkafka3 ~]# tar zxvf kafka_2.13-3.1.0.tgz -C /opt/ 
2、编辑配置文件 
在kafka1上执行 
[rootkafka1 config]# vim /opt/kafka_2.13-3.1.0/config/server.properties#输入如下内容
broker.id0
listenersPLAINTEXT://kafka1:9092
num.network.threads3
num.io.threads8
socket.send.buffer.bytes102400
socket.receive.buffer.bytes102400
socket.request.max.bytes104857600
log.dirs/opt/kafka-logs
num.partitions5
default.replication.factor2
num.recovery.threads.per.data.dir1
offsets.topic.replication.factor1
transaction.state.log.replication.factor1
transaction.state.log.min.isr1
log.retention.hours168
log.segment.bytes1073741824
log.retention.check.interval.ms300000
zookeeper.connectkafka1:2181,kafka2:2181,kafka3:2181/kafka
zookeeper.connection.timeout.ms18000
zookeeper.connection.timeout.ms18000
group.initial.rebalance.delay.ms0 在kafka2上执行 
[rootkafka2 config]# vim /opt/kafka_2.13-3.1.0/config/server.properties#输入如下内容
broker.id1
listenersPLAINTEXT://kafka2:9092
num.network.threads3
num.io.threads8
socket.send.buffer.bytes102400
socket.receive.buffer.bytes102400
socket.request.max.bytes104857600
log.dirs/opt/kafka-logs
num.partitions5
default.replication.factor2
num.recovery.threads.per.data.dir1
offsets.topic.replication.factor1
transaction.state.log.replication.factor1
transaction.state.log.min.isr1
log.retention.hours168
log.segment.bytes1073741824
log.retention.check.interval.ms300000
zookeeper.connectkafka1:2181,kafka2:2181,kafka3:2181/kafka
zookeeper.connection.timeout.ms18000
zookeeper.connection.timeout.ms18000
group.initial.rebalance.delay.ms0 在kafka3上执行 
[rootkafka3 config]# vim /opt/kafka_2.13-3.1.0/config/server.properties#输入如下内容
broker.id2
listenersPLAINTEXT://kafka3:9092
num.network.threads3
num.io.threads8
socket.send.buffer.bytes102400
socket.receive.buffer.bytes102400
socket.request.max.bytes104857600
log.dirs/opt/kafka-logs
num.partitions5
default.replication.factor2
num.recovery.threads.per.data.dir1
offsets.topic.replication.factor1
transaction.state.log.replication.factor1
transaction.state.log.min.isr1
log.retention.hours168
log.segment.bytes1073741824
log.retention.check.interval.ms300000
zookeeper.connectkafka1:2181,kafka2:2181,kafka3:2181/kafka
zookeeper.connection.timeout.ms18000
zookeeper.connection.timeout.ms18000
group.initial.rebalance.delay.ms0 broker.id0 
这是 Kafka broker 的唯一标识符。每个 broker 必须有唯一的 ID。这里的值为 0意味着这是一个集群中的单个 broker 或者是第一个 broker。 
listenersPLAINTEXT://kafka1:9092 
定义了 broker 监听的网络接口和端口。此处使用 PLAINTEXT 协议意味着没有加密。kafka1:9092 表示监听名为 kafka1 的主机上的 9092 端口。 
num.network.threads3 
指定了用于网络请求处理的线程数。网络请求包括接收来自生产者的消息、发送消息给消费者等操作。这里设置为 3 个线程。 
num.io.threads8 
指定了用于处理 I/O 请求的线程数。I/O 请求包括磁盘上的读写操作。这里设置为 8 个线程。 
socket.send.buffer.bytes102400 
设置了发送套接字的缓冲区大小单位字节。此配置影响网络数据包的发送速度。此处设置为 102400 字节。 
socket.receive.buffer.bytes102400 
设置了接收套接字的缓冲区大小单位字节。此配置影响网络数据包的接收速度。此处设置为 102400 字节。 
socket.request.max.bytes104857600 
定义了从客户端接收的最大请求大小单位字节。这有助于防止因过大请求而导致的内存溢出。此处设置为 104857600 字节即约 100MB。 
log.dirs/opt/kafka-logs 
指定了日志文件存储的位置。日志文件包含了 Kafka topic 的数据。这里设置的日志目录为 /opt/kafka-logs。 
num.partitions5 
指定了默认主题分区的数量。分区越多通常意味着更高的并发度。这里设置的主题默认分区数为 5。 
default.replication.factor2 
指定了创建新主题时的默认复制因子。复制因子决定了每个分区的副本数量。这里设置的复制因子为 2意味着每个分区有 2 份副本。 
num.recovery.threads.per.data.dir1 
指定了用于恢复日志段的线程数。每个数据目录可以有不同的线程数。这里设置为 1 个线程。 
offsets.topic.replication.factor1 
指定了 _consumer_offsets 主题的复制因子。此主题用于存储消费者的偏移量信息。这里设置的复制因子为 1意味着只有一个副本。 
transaction.state.log.replication.factor1 
指定了 _transactions 主题的复制因子。此主题用于记录事务状态。这里设置的复制因子为 1意味着只有一个副本。 
transaction.state.log.min.isr1 
指定了 _transactions 主题的最小 ISRIn-Sync Replicas数量。ISR 是与 leader 同步的副本集合。这里设置的最小 ISR 数量为 1。 
log.retention.hours168 
指定了日志数据保留的时间长度单位小时。这里设置的日志保留时间为 168 小时即 7 天。 
log.segment.bytes1073741824 
指定了日志段的最大大小单位字节。一旦达到这个大小Kafka 就会创建一个新的日志段。这里设置的日志段大小为 1073741824 字节即 1GB。 
log.retention.check.interval.ms300000 
指定了检查日志清理的间隔时间单位毫秒。这里设置的检查间隔为 300000 毫秒即 5 分钟。 
zookeeper.connectkafka1:2181,kafka2:2181,kafka3:2181/kafka 
指定了 ZooKeeper 服务器列表。ZooKeeper 用于协调 Kafka 集群。这里设置的 ZooKeeper 服务器为 kafka1:2181, kafka2:2181, kafka3:2181路径为 /kafka。 
zookeeper.connection.timeout.ms18000 
指定了 Kafka 与 ZooKeeper 之间的连接超时时间单位毫秒。这里设置的超时时间为 18000 毫秒即 18 秒。请注意zookeeper.connection.timeout.ms 在配置中出现了两次应该是误写只需要保留一次即可。 
group.initial.rebalance.delay.ms0 
指定了消费者组初始重新平衡的延迟时间单位毫秒。这里设置的延迟时间为 0即立即开始重新平衡。 
3、创建数据目录 
在3台机器上分别执行 
mkdir /opt/kafka-logs 
4、启动kafka 
[rootkafka1 bin]# cd /opt/kafka_2.13-3.1.0/bin/
[rootkafka1 bin]# ./kafka-server-start.sh -daemon ../config/server.properties 
[rootkafka2 bin]# cd /opt/kafka_2.13-3.1.0/bin/
[rootkafka2 bin]# ./kafka-server-start.sh -daemon ../config/server.properties 
[rootkafka3 bin]# cd /opt/kafka_2.13-3.1.0/bin/
[rootkafka3 bin]# ./kafka-server-start.sh -daemon ../config/server.properties 
5、查看端口状态 
[rootkafka1 bin]# netstat -antupl 五、测试  
1、创建Topic 
前面我们已经将kafka集群搭建起来了接下来创建一个Topic进行写入测试如果不清楚Topic是什么可以翻看作者之前的文章。 
在kafka1上执行 
[rootkafka1 bin]# cd /opt/kafka_2.13-3.1.0/bin
[rootkafka1 bin]# ./kafka-topics.sh --bootstrap-server192.168.40.100:9092 --topic test --create --partitions3 --replication-factor2 
--bootstrap-server指定 Kafka broker 的地址和端口号。这里的 192.168.40.100:9092 指定了 broker 的 IP 地址为 192.168.40.100端口号为 9092。--topic指定要操作的主题名称。在这个例子中主题名为 test。--create告诉 Kafka 创建一个新主题。如果主题已经存在这条命令将会失败除非你配置了允许创建已存在的主题。--partitions指定主题的分区数。分区数决定了主题能够并行处理消息的能力。在这个例子中主题 test 将会有 3 个分区。--replication-factor指定主题的复制因子。复制因子决定了每个分区的副本数量这对于数据的冗余和可靠性非常重要。在这个例子中主题 test 的每个分区将会有 2 个副本。 
查看Topic 
[rootkafka1 bin]# ./kafka-topics.sh --bootstrap-server192.168.40.100:9092 --list 2、生产消息  
[rootkafka1 bin]# ./kafka-console-producer.sh --bootstrap-server 192.168.40.100:9092 --topic test 
向我们刚刚创建的test Topic写入几条消息 3、消费消息  
[rootkafka1 bin]# ./kafka-console-consumer.sh --bootstrap-server192.168.40.100:9092 --topic test --from-beginning 如果能看到之前生产的消息则证明集群搭建成功 每一次的分享都是一次成长的旅程感谢您的陪伴和关注。希望这些关于大数据的文章能陪伴您走过技术的一段旅程共同见证成长和进步  让我们一起在技术的海洋中探索前行共同书写美好的未来    
 文章转载自: http://www.morning.jlxld.cn.gov.cn.jlxld.cn http://www.morning.fbbmg.cn.gov.cn.fbbmg.cn http://www.morning.gbkkt.cn.gov.cn.gbkkt.cn http://www.morning.wtcyz.cn.gov.cn.wtcyz.cn http://www.morning.rongxiaoman.com.gov.cn.rongxiaoman.com http://www.morning.txtgy.cn.gov.cn.txtgy.cn http://www.morning.pwzzk.cn.gov.cn.pwzzk.cn http://www.morning.kfclh.cn.gov.cn.kfclh.cn http://www.morning.hqgkx.cn.gov.cn.hqgkx.cn http://www.morning.tcsdlbt.cn.gov.cn.tcsdlbt.cn http://www.morning.gqtzb.cn.gov.cn.gqtzb.cn http://www.morning.ydnx.cn.gov.cn.ydnx.cn http://www.morning.qbzfp.cn.gov.cn.qbzfp.cn http://www.morning.nfqyk.cn.gov.cn.nfqyk.cn http://www.morning.ryxbz.cn.gov.cn.ryxbz.cn http://www.morning.jbysr.cn.gov.cn.jbysr.cn http://www.morning.drjll.cn.gov.cn.drjll.cn http://www.morning.xrsqb.cn.gov.cn.xrsqb.cn http://www.morning.fpkdd.cn.gov.cn.fpkdd.cn http://www.morning.fnzbx.cn.gov.cn.fnzbx.cn http://www.morning.dtpqw.cn.gov.cn.dtpqw.cn http://www.morning.elbae.cn.gov.cn.elbae.cn http://www.morning.hxmqb.cn.gov.cn.hxmqb.cn http://www.morning.kpcdc.cn.gov.cn.kpcdc.cn http://www.morning.blzrj.cn.gov.cn.blzrj.cn http://www.morning.sggzr.cn.gov.cn.sggzr.cn http://www.morning.irqlul.cn.gov.cn.irqlul.cn http://www.morning.rgqnt.cn.gov.cn.rgqnt.cn http://www.morning.jmmzt.cn.gov.cn.jmmzt.cn http://www.morning.pcjw.cn.gov.cn.pcjw.cn http://www.morning.grnhb.cn.gov.cn.grnhb.cn http://www.morning.kgsws.cn.gov.cn.kgsws.cn http://www.morning.jfjfk.cn.gov.cn.jfjfk.cn http://www.morning.rbylq.cn.gov.cn.rbylq.cn http://www.morning.jqjnx.cn.gov.cn.jqjnx.cn http://www.morning.tgbx.cn.gov.cn.tgbx.cn http://www.morning.ejknty.cn.gov.cn.ejknty.cn http://www.morning.lfsmf.cn.gov.cn.lfsmf.cn http://www.morning.tbhf.cn.gov.cn.tbhf.cn http://www.morning.bpmdx.cn.gov.cn.bpmdx.cn http://www.morning.qbrdg.cn.gov.cn.qbrdg.cn http://www.morning.zkgpg.cn.gov.cn.zkgpg.cn http://www.morning.chmkt.cn.gov.cn.chmkt.cn http://www.morning.wphzr.cn.gov.cn.wphzr.cn http://www.morning.lxyyp.cn.gov.cn.lxyyp.cn http://www.morning.brkrt.cn.gov.cn.brkrt.cn http://www.morning.qmpbs.cn.gov.cn.qmpbs.cn http://www.morning.fwdln.cn.gov.cn.fwdln.cn http://www.morning.rkrcd.cn.gov.cn.rkrcd.cn http://www.morning.kngx.cn.gov.cn.kngx.cn http://www.morning.wdshp.cn.gov.cn.wdshp.cn http://www.morning.rbyz.cn.gov.cn.rbyz.cn http://www.morning.sdkaiyu.com.gov.cn.sdkaiyu.com http://www.morning.ctfh.cn.gov.cn.ctfh.cn http://www.morning.pqktp.cn.gov.cn.pqktp.cn http://www.morning.bbxbh.cn.gov.cn.bbxbh.cn http://www.morning.nnhrp.cn.gov.cn.nnhrp.cn http://www.morning.gmwqd.cn.gov.cn.gmwqd.cn http://www.morning.qrwnj.cn.gov.cn.qrwnj.cn http://www.morning.tnkwj.cn.gov.cn.tnkwj.cn http://www.morning.pkrtz.cn.gov.cn.pkrtz.cn http://www.morning.lftpl.cn.gov.cn.lftpl.cn http://www.morning.wlbwp.cn.gov.cn.wlbwp.cn http://www.morning.cfccp.cn.gov.cn.cfccp.cn http://www.morning.nlywq.cn.gov.cn.nlywq.cn http://www.morning.ydrn.cn.gov.cn.ydrn.cn http://www.morning.jmdpp.cn.gov.cn.jmdpp.cn http://www.morning.crfyr.cn.gov.cn.crfyr.cn http://www.morning.fhtmp.cn.gov.cn.fhtmp.cn http://www.morning.rkck.cn.gov.cn.rkck.cn http://www.morning.tqbw.cn.gov.cn.tqbw.cn http://www.morning.tkzqw.cn.gov.cn.tkzqw.cn http://www.morning.errnull.com.gov.cn.errnull.com http://www.morning.kfhm.cn.gov.cn.kfhm.cn http://www.morning.cnlmp.cn.gov.cn.cnlmp.cn http://www.morning.zzaxr.cn.gov.cn.zzaxr.cn http://www.morning.wmmtl.cn.gov.cn.wmmtl.cn http://www.morning.pjjkz.cn.gov.cn.pjjkz.cn http://www.morning.qnlbb.cn.gov.cn.qnlbb.cn http://www.morning.slmbg.cn.gov.cn.slmbg.cn