怎么查看vps网站服务器时间,二级分销模式图,网件路由器管理地址,外包网接单目录 一、Kafka 单机环境部署1. 环境准备2. 安装 Java3. 安装 ZooKeeper3.1 下载并解压 ZooKeeper3.2 配置 ZooKeeper3.3 启动 ZooKeeper3.4 验证 ZooKeeper 是否正常运行 4. 安装 Kafka4.1 下载并解压 Kafka4.2 配置 Kafka4.3 创建日志目录4.4 启动 Kafka Broker4.5 验证 Kafk… 目录 一、Kafka 单机环境部署1. 环境准备2. 安装 Java3. 安装 ZooKeeper3.1 下载并解压 ZooKeeper3.2 配置 ZooKeeper3.3 启动 ZooKeeper3.4 验证 ZooKeeper 是否正常运行 4. 安装 Kafka4.1 下载并解压 Kafka4.2 配置 Kafka4.3 创建日志目录4.4 启动 Kafka Broker4.5 验证 Kafka 是否正常运行 5. Kafka 单机部署的注意事项 二、Kafka 集群环境部署1. 环境准备2. 安装 ZooKeeper 集群2.1 配置 ZooKeeper 节点 ID2.2 启动 ZooKeeper 集群 3. 安装 Kafka 集群3.1 配置 Kafka Broker3.2 启动 Kafka Broker 4. 验证 Kafka 集群状态4.1 创建 Topic4.2 验证 Topic 5. Kafka 集群部署的注意事项 三、Kafka 使用案例生产者和消费者1. 使用 Java 实现 Kafka 生产者和消费者1.1 添加依赖1.2 编写 Kafka 生产者1.3 编写 Kafka 消费者1.4 运行 Java 程序 2. 使用 Python 实现 Kafka 生产者和消费者2.1 安装 Kafka 库2.2 编写 Kafka 生产者2.3 编写 Kafka 消费者2.4 运行 Python 程序 3. 注意事项 总结部署过程中的注意事项 下面是 Apache Kafka 单机和集群环境部署的详细教程包括部署过程中的注意事项以及一个使用案例。Apache Kafka 是一个分布式流处理平台广泛用于实时数据处理、日志收集、消息队列等场景。 一、Kafka 单机环境部署
1. 环境准备
操作系统Linux推荐 Ubuntu 20.04 或 CentOS 7JavaKafka 需要 Java 环境推荐使用 OpenJDK 8 或 11。ZooKeeperKafka 依赖 ZooKeeper 进行分布式协调。
2. 安装 Java
在 Ubuntu 中
sudo apt update
sudo apt install openjdk-11-jdk在 CentOS 中
sudo yum install java-11-openjdk验证 Java 安装
java -version3. 安装 ZooKeeper
Kafka 使用 ZooKeeper 进行节点管理和协调需要先安装并启动 ZooKeeper。
3.1 下载并解压 ZooKeeper
wget https://downloads.apache.org/zookeeper/zookeeper-3.8.2/apache-zookeeper-3.8.2-bin.tar.gz
tar -xzvf apache-zookeeper-3.8.2-bin.tar.gz
mv apache-zookeeper-3.8.2-bin /usr/local/zookeeper3.2 配置 ZooKeeper 创建数据目录 mkdir -p /var/lib/zookeeper复制配置文件 cp /usr/local/zookeeper/conf/zoo_sample.cfg /usr/local/zookeeper/conf/zoo.cfg编辑配置文件 /usr/local/zookeeper/conf/zoo.cfg dataDir/var/lib/zookeeper
clientPort21813.3 启动 ZooKeeper
/usr/local/zookeeper/bin/zkServer.sh start3.4 验证 ZooKeeper 是否正常运行
/usr/local/zookeeper/bin/zkCli.sh -server localhost:2181在连接成功后输入 ls /若返回空列表[]则说明连接成功。
4. 安装 Kafka
4.1 下载并解压 Kafka
访问 Kafka 官网 下载最新版本的 Kafka。
wget https://downloads.apache.org/kafka/3.5.0/kafka_2.12-3.5.0.tgz
tar -xzvf kafka_2.12-3.5.0.tgz
mv kafka_2.12-3.5.0 /usr/local/kafka4.2 配置 Kafka
编辑 Kafka 的配置文件 /usr/local/kafka/config/server.properties
# Kafka Broker ID唯一标识符
broker.id0# 监听的接口和端口
listenersPLAINTEXT://:9092# 日志文件存储路径
log.dirs/var/lib/kafka-logs# Zookeeper 连接地址
zookeeper.connectlocalhost:21814.3 创建日志目录
mkdir -p /var/lib/kafka-logs4.4 启动 Kafka Broker
/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties4.5 验证 Kafka 是否正常运行
创建一个测试 Topic
/usr/local/kafka/bin/kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1列出 Topic
/usr/local/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092你应该看到 test-topic 在列出的 Topic 中。
5. Kafka 单机部署的注意事项
ZooKeeper确保 ZooKeeper 正常运行并且 zookeeper.connect 地址配置正确。内存和存储为 Kafka 分配足够的内存和存储空间尤其是在高负载场景下。日志文件定期检查和清理 Kafka 日志文件以防止磁盘占满。监听地址如果需要远程访问确保 listeners 配置了正确的监听地址。防火墙设置确保防火墙开放了 Kafka 和 ZooKeeper 使用的端口默认 9092 和 2181。 二、Kafka 集群环境部署
Kafka 集群由多个 Kafka Broker 组成能够提供高可用性和水平扩展。
1. 环境准备
多台服务器至少 3 台3 个 Kafka Broker 和 3 个 ZooKeeper 实例操作系统Linux推荐 Ubuntu 20.04 或 CentOS 7Java在所有节点上安装 Java
2. 安装 ZooKeeper 集群
在每台服务器上按照单机部署的步骤安装 ZooKeeper并进行以下配置
2.1 配置 ZooKeeper 节点 ID
编辑每个节点的 zoo.cfg 文件添加如下配置
server.1zookeeper1:2888:3888
server.2zookeeper2:2888:3888
server.3zookeeper3:2888:3888在每台服务器上创建 myid 文件用于标识节点
echo 1 /var/lib/zookeeper/myid # 在 zookeeper1 上
echo 2 /var/lib/zookeeper/myid # 在 zookeeper2 上
echo 3 /var/lib/zookeeper/myid # 在 zookeeper3 上2.2 启动 ZooKeeper 集群
在每台服务器上启动 ZooKeeper
/usr/local/zookeeper/bin/zkServer.sh start3. 安装 Kafka 集群
在每台服务器上按照单机部署的步骤安装 Kafka并进行以下配置
3.1 配置 Kafka Broker
编辑每个节点的 server.properties 文件添加如下配置
broker.id0 # 每个 Broker 唯一 ID
listenersPLAINTEXT://:9092
log.dirs/var/lib/kafka-logs
zookeeper.connectzookeeper1:2181,zookeeper2:2181,zookeeper3:21813.2 启动 Kafka Broker
在每台服务器上启动 Kafka Broker
/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties4. 验证 Kafka 集群状态
4.1 创建 Topic
在任一 Kafka Broker 上执行以下命令
/usr/local/kafka/bin/kafka-topics.sh --create --topic test-topic --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --partitions 3 --replication-factor 34.2 验证 Topic
列出集群中的 Topic
/usr/local/kafka/bin/kafka-topics.sh --list --bootstrap-server kafka1:9092查看 Topic 详细信息
/usr/local/kafka/bin/kafka-topics.sh --describe --topic test-topic --bootstrap-server kafka1:90925. Kafka 集群部署的注意事项
ZooKeeper 集群确保每个节点配置了正确的 myid并且所有节点可以互相通信。Kafka Broker 配置每个 Broker 必须有唯一的 broker.id。分区和副本根据实际需求配置合适的分区数和副本数以提高数据可靠性和吞吐量。监控和报警使用 Kafka Manager 或其他监控工具监控集群状态及时处理故障。网络配置确保各节点之间的网络连接正常并且防火墙开放了必要端口。资源规划为 Kafka 和 ZooKeeper 分配足够的 CPU、内存和磁盘资源。 三、Kafka 使用案例生产者和消费者
1. 使用 Java 实现 Kafka 生产者和消费者
1.1 添加依赖
在 Maven 项目中添加 Kafka 的依赖
dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactIdversion3.5.0/version
/dependency1.2 编写 Kafka 生产者
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class SimpleProducer {public static void main(String[] args) {// Kafka 生产者配置Properties props new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer);// 创建生产者ProducerString, String producer new KafkaProducer(props);// 发送消息for (int i 0; i 10; i) {ProducerRecordString, String record new ProducerRecord(test-topic, Integer.toString(i), Message i);producer.send(record);}// 关闭生产者producer.close();}
}1.3 编写 Kafka 消费者
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;import java.util.Collections;
import java.util.Properties;public class SimpleConsumer {public static void main(String[] args) {// Kafka 消费者配置Properties props new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092);props.put(ConsumerConfig.GROUP_ID_CONFIG, test-group);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer);// 创建消费者KafkaConsumerString, String consumer new KafkaConsumer(props);// 订阅主题consumer.subscribe(Collections.singletonList(test-topic));// 轮询消息while (true) {ConsumerRecordsString, String records consumer.poll(100);for (ConsumerRecordString, String record : records) {System.out.printf(Offset %d, Key %s, Value %s%n, record.offset(), record.key(), record.value());}}}
}1.4 运行 Java 程序
编译并运行生产者
mvn compile
mvn exec:java -Dexec.mainClassSimpleProducer编译并运行消费者
mvn exec:java -Dexec.mainClassSimpleConsumer2. 使用 Python 实现 Kafka 生产者和消费者
2.1 安装 Kafka 库
pip install kafka-python2.2 编写 Kafka 生产者
from kafka import KafkaProducer# 创建 Kafka 生产者
producer KafkaProducer(bootstrap_serverslocalhost:9092)# 发送消息
for i in range(10):producer.send(test-topic, keybytes(str(i), encodingutf-8), valuebytes(fMessage {i}, encodingutf-8))# 关闭生产者
producer.close()2.3 编写 Kafka 消费者
from kafka import KafkaConsumer# 创建 Kafka 消费者
consumer KafkaConsumer(test-topic,bootstrap_serverslocalhost:9092,group_idtest-group,auto_offset_resetearliest
)# 轮询消息
for message in consumer:print(fOffset {message.offset}, Key {message.key.decode()}, Value {message.value.decode()})2.4 运行 Python 程序
运行生产者
python kafka_producer.py运行消费者
python kafka_consumer.py3. 注意事项
生产者和消费者配置合理配置 bootstrap.servers、key.serializer、value.serializer、group.id 等参数。分区策略在生产者中使用自定义分区策略可以提高吞吐量和负载均衡。消费组多个消费者实例可以组成一个消费组以提高处理能力。容错机制在实际应用中需要考虑重试、错误处理和幂等性等问题。 总结
通过以上步骤我们成功部署了 Kafka 单机和集群环境并实现了一个简单的生产者和消费者应用。Kafka 提供了高吞吐量、低延迟的消息传递能力适合用于实时流处理和数据管道。
部署过程中的注意事项
Java 版本确保安装了正确版本的 Java。ZooKeeper 集群确保 ZooKeeper 集群稳定运行并配置正确。网络配置各节点之间的网络连接需要稳定端口要开放。资源配置根据业务需求配置合适的内存、CPU 和磁盘资源。数据安全启用 Kafka 的 SSL/TLS 和 SASL 认证机制确保数据安全传输。监控和管理使用 Kafka Manager、Prometheus 等工具监控集群状态及时处理异常。日志管理定期检查和清理 Kafka 的日志以防止磁盘空间不足。 通过合理的配置和优化Kafka 可以为应用程序提供可靠的消息传递和流处理服务是构建实时数据管道和事件驱动架构的重要组件。
文章转载自: http://www.morning.nswcw.cn.gov.cn.nswcw.cn http://www.morning.ldcrh.cn.gov.cn.ldcrh.cn http://www.morning.drggr.cn.gov.cn.drggr.cn http://www.morning.mrpqg.cn.gov.cn.mrpqg.cn http://www.morning.fmry.cn.gov.cn.fmry.cn http://www.morning.hhqjf.cn.gov.cn.hhqjf.cn http://www.morning.sbczr.cn.gov.cn.sbczr.cn http://www.morning.skfkx.cn.gov.cn.skfkx.cn http://www.morning.xmrmk.cn.gov.cn.xmrmk.cn http://www.morning.svrud.cn.gov.cn.svrud.cn http://www.morning.jfqpc.cn.gov.cn.jfqpc.cn http://www.morning.hxsdh.cn.gov.cn.hxsdh.cn http://www.morning.bpmfr.cn.gov.cn.bpmfr.cn http://www.morning.fkfyn.cn.gov.cn.fkfyn.cn http://www.morning.ypnxq.cn.gov.cn.ypnxq.cn http://www.morning.bmnm.cn.gov.cn.bmnm.cn http://www.morning.wzjhl.cn.gov.cn.wzjhl.cn http://www.morning.ypktc.cn.gov.cn.ypktc.cn http://www.morning.xknmn.cn.gov.cn.xknmn.cn http://www.morning.lcmhq.cn.gov.cn.lcmhq.cn http://www.morning.ghjln.cn.gov.cn.ghjln.cn http://www.morning.mjdbd.cn.gov.cn.mjdbd.cn http://www.morning.gblrn.cn.gov.cn.gblrn.cn http://www.morning.hsxkq.cn.gov.cn.hsxkq.cn http://www.morning.alwpc.cn.gov.cn.alwpc.cn http://www.morning.bpmtz.cn.gov.cn.bpmtz.cn http://www.morning.rgsgk.cn.gov.cn.rgsgk.cn http://www.morning.sqdjn.cn.gov.cn.sqdjn.cn http://www.morning.bnmfq.cn.gov.cn.bnmfq.cn http://www.morning.xxwl1.com.gov.cn.xxwl1.com http://www.morning.jqcrf.cn.gov.cn.jqcrf.cn http://www.morning.skrrq.cn.gov.cn.skrrq.cn http://www.morning.bhznl.cn.gov.cn.bhznl.cn http://www.morning.fnxzk.cn.gov.cn.fnxzk.cn http://www.morning.wklhn.cn.gov.cn.wklhn.cn http://www.morning.cmzcp.cn.gov.cn.cmzcp.cn http://www.morning.bprsd.cn.gov.cn.bprsd.cn http://www.morning.npkrm.cn.gov.cn.npkrm.cn http://www.morning.knqzd.cn.gov.cn.knqzd.cn http://www.morning.yrccw.cn.gov.cn.yrccw.cn http://www.morning.hxmqb.cn.gov.cn.hxmqb.cn http://www.morning.bpmdr.cn.gov.cn.bpmdr.cn http://www.morning.bgrsr.cn.gov.cn.bgrsr.cn http://www.morning.tfsyk.cn.gov.cn.tfsyk.cn http://www.morning.lktjj.cn.gov.cn.lktjj.cn http://www.morning.qwbtr.cn.gov.cn.qwbtr.cn http://www.morning.mygbt.cn.gov.cn.mygbt.cn http://www.morning.zdfrg.cn.gov.cn.zdfrg.cn http://www.morning.khyqt.cn.gov.cn.khyqt.cn http://www.morning.gcspr.cn.gov.cn.gcspr.cn http://www.morning.lnrr.cn.gov.cn.lnrr.cn http://www.morning.kjyhh.cn.gov.cn.kjyhh.cn http://www.morning.jmwrj.cn.gov.cn.jmwrj.cn http://www.morning.jpjpb.cn.gov.cn.jpjpb.cn http://www.morning.hympq.cn.gov.cn.hympq.cn http://www.morning.tckxl.cn.gov.cn.tckxl.cn http://www.morning.rcqyk.cn.gov.cn.rcqyk.cn http://www.morning.mqbsm.cn.gov.cn.mqbsm.cn http://www.morning.clndl.cn.gov.cn.clndl.cn http://www.morning.mrpqg.cn.gov.cn.mrpqg.cn http://www.morning.snktp.cn.gov.cn.snktp.cn http://www.morning.hphfy.cn.gov.cn.hphfy.cn http://www.morning.ldsgm.cn.gov.cn.ldsgm.cn http://www.morning.bfhrj.cn.gov.cn.bfhrj.cn http://www.morning.kdnbf.cn.gov.cn.kdnbf.cn http://www.morning.sbrjj.cn.gov.cn.sbrjj.cn http://www.morning.kflpf.cn.gov.cn.kflpf.cn http://www.morning.jcwt.cn.gov.cn.jcwt.cn http://www.morning.llgpk.cn.gov.cn.llgpk.cn http://www.morning.tpchy.cn.gov.cn.tpchy.cn http://www.morning.knlbg.cn.gov.cn.knlbg.cn http://www.morning.fwcnx.cn.gov.cn.fwcnx.cn http://www.morning.ddjp.cn.gov.cn.ddjp.cn http://www.morning.zyffq.cn.gov.cn.zyffq.cn http://www.morning.zdbfl.cn.gov.cn.zdbfl.cn http://www.morning.pkggl.cn.gov.cn.pkggl.cn http://www.morning.qpnmd.cn.gov.cn.qpnmd.cn http://www.morning.yhxhq.cn.gov.cn.yhxhq.cn http://www.morning.ttvtv.cn.gov.cn.ttvtv.cn http://www.morning.rkkh.cn.gov.cn.rkkh.cn