网站建设的公司有哪些,重庆最近的新闻大事10条,wordpress给会员发信,做视频网站需要什么证书✨ 在在线视频平台中引入 Kafka 消息队列解耦热点操作的实践
#x1f4cc; 背景简介
在我们构建 NovaTube 在线视频分享平台过程中#xff0c;点赞、评论、弹幕、播放记录等用户操作频繁#xff0c;尤其在用户量上升后#xff0c;出现了以下问题#xff1a;
点赞请求峰…✨ 在在线视频平台中引入 Kafka 消息队列解耦热点操作的实践 背景简介
在我们构建 NovaTube 在线视频分享平台过程中点赞、评论、弹幕、播放记录等用户操作频繁尤其在用户量上升后出现了以下问题
点赞请求峰值达到上千 QPS数据库写入压力骤增弹幕写入请求频繁偶发出现主从同步延迟播放记录写入直接影响视频播放体验用户评论操作影响接口整体响应时间
这些操作共同的特点是
写操作频繁实时性要求不高可异步处理
于是我们决定引入 Kafka 消息队列对这些热点写操作进行解耦和异步化处理优化系统性能、增强可扩展性与容错能力。 技术目标
降低数据库压力防止写操作击穿提高接口响应速度提升用户体验实现异步解耦、模块职责清晰为后续功能扩展预留处理链路如点赞反作弊、弹幕审核 ️ 系统架构设计
以下是 NovaTube 在点赞场景下的 Kafka 解耦架构图
[Client]↓ 点赞请求
[网关层] [Sentinel限流]↓
[主站服务 Controller]↓
KafkaTemplate.send() 发送到 topic: like-events↓
[Kafka Broker]↓
[点赞消费者服务]↓
MySQL/Redis 数据持久化 缓存更新说明
点赞请求不再直接写库而是快速封装为事件消息发送至 Kafka消费者服务异步消费集中处理点赞写入逻辑可通过 Redis 实时展示点赞数定时同步至数据库 实现过程详解
1. 点赞事件模型定义
Data
public class LikeEvent {private Long userId;private String videoId;private Long timestamp;
}2. 生产者发送点赞消息
RestController
public class LikeController {Autowiredprivate KafkaTemplateString, String kafkaTemplate;PostMapping(/like)public ResponseEntity? like(RequestParam String videoId, HttpServletRequest request) {Long userId getUserIdFromRequest(request);LikeEvent event new LikeEvent(userId, videoId, System.currentTimeMillis());kafkaTemplate.send(like-events, JSON.toJSONString(event));return ResponseEntity.ok(点赞成功);}
}特点
非阻塞、快速响应即使 Kafka 临时阻塞也可降级处理本地缓存/失败重试 3. 消费者异步写入数据库
Component
public class LikeConsumer {Autowiredprivate LikeService likeService;KafkaListener(topics like-events, groupId nova-like-group)public void consumeLikeEvent(String message) {LikeEvent event JSON.parseObject(message, LikeEvent.class);likeService.saveUserLike(event.getUserId(), event.getVideoId());}
}4. 点赞写入逻辑
Service
public class LikeService {Autowiredprivate LikeRepository likeRepository;public void saveUserLike(Long userId, String videoId) {if (!likeRepository.existsByUserIdAndVideoId(userId, videoId)) {likeRepository.save(new UserLike(userId, videoId));}}
}可进一步优化
增加幂等校验Redis缓存点赞状态批量消费与批量入库 异步解耦的优势
优势说明⚡ 提升响应速度请求快速返回主链路无阻塞 解耦架构点赞、弹幕、评论等模块独立演化 削峰填谷高并发场景下缓冲请求防止写库瓶颈 容错增强Kafka 持久化保障消息不丢失可重试 可扩展性好易于接入日志分析、反作弊、通知等模块 可扩展的场景
除了点赞我们还将 Kafka 运用到以下场景中
功能模块Kafka Topic说明弹幕发送danmu-events异步写入 敏感词审核评论发布comment-events异步发布 通知推送播放记录play-record-events异步记录观看历史减少实时写库压力 Kafka 运维与监控建议
使用 Kafka Manager / Confluent Control Center 管理 Topic、分区、副本接入 Prometheus Grafana 实时监控消费速率、堆积量、延迟等指标设置 消费者组分区均衡策略保证高可用引入 死信队列DLQ 处理消费异常数据 总结
引入 Kafka 解耦点赞等热点操作是 NovaTube 性能优化的重要一环。
它实现了
架构上的解耦与职责划分用户体验上的提升系统性能上的增强业务扩展上的灵活性
在现代高并发系统中这是极具实用价值的一种架构模式适用于所有频繁、可延迟、可容错的非核心路径操作。 附简历中的一句话 基于 Kafka 构建点赞、评论、弹幕等热点操作的异步处理链路实现系统解耦与削峰填谷提升响应性能与高并发下的稳定性。 Kafka 在视频平台的多场景应用实践弹幕、评论与播放记录异步解耦 作者NovaTube 后端开发者 时间2025 年 6 月 标签Kafka、异步架构、弹幕、评论、播放记录、解耦设计 引言
在 NovaTube 视频平台中我们不仅为视频提供播放服务还支持用户实时发送弹幕、评论互动、记录观看历史。这些功能虽然核心链路简单但在高并发场景下却会对系统产生明显负载
弹幕发送高并发、高实时性、海量数据涌入评论发布有写库操作、需要审核、还可能触发通知等播放记录请求频繁、通常伴随每次播放/跳转、写入延迟可接受
为了应对这些压力我们选择将这三类操作使用 Kafka 进行异步解耦处理目标是主流程快响应、后台异步处理、系统更加稳定与可扩展。 核心设计原则
场景是否频繁实时性要求可异步化备注弹幕发送非常频繁中等✅需要实时展示但写入可延迟评论发布频繁一般✅可异步入库后台审核通知播放记录高频低✅无需强一致性延迟可接受 ✅ 这些操作天然适合用 Kafka 异步处理具有良好的“削峰填谷”特性。 架构概览图 ┌────────────┐│ 客户端 │└────┬───────┘│┌──────────▼──────────┐│ SpringBoot 控制层 │└──────────┬──────────┘▼┌─────────────────────┐│ KafkaTemplate 发送 │└─────────────────────┘▼┌────────────┐│ Kafka Broker│└────┬───────┘▼┌──────────────────────────────┐│ 弹幕/评论/播放记录消费者服务 │└────┬──────────────┬──────────┘▼ ▼持久化入库 审核/通知等1️⃣ 弹幕发送异步化处理 背景
用户每秒可发起大量弹幕请求若每次弹幕都同步写库可能对主库造成写入压力弹幕在页面实时展示但持久化延迟可接受 实现步骤
Controller 层快速响应
PostMapping(/danmu/send)
public ResponseEntity? sendDanmu(RequestBody DanmuRequest req) {DanmuEvent event new DanmuEvent(req.getUserId(), req.getVideoId(), req.getContent());kafkaTemplate.send(danmu-events, JSON.toJSONString(event));return ResponseEntity.ok(发送成功);
}消费者异步写入 敏感词处理
KafkaListener(topics danmu-events, groupId danmu-consumers)
public void consumeDanmu(String msg) {DanmuEvent event JSON.parseObject(msg, DanmuEvent.class);if (!containsSensitiveWord(event.getContent())) {danmuRepository.save(event.toEntity());} else {log.warn(屏蔽敏感弹幕{}, event);}
}2️⃣ 评论发布异步处理链路 背景
用户发布评论时既要写数据库还可能通知作者、审核内容多个任务耦合在主流程影响响应速度 技术设计
请求快速入队
PostMapping(/comment/publish)
public ResponseEntity? publish(RequestBody CommentRequest req) {CommentEvent event new CommentEvent(req.getUserId(), req.getVideoId(), req.getContent());kafkaTemplate.send(comment-events, JSON.toJSONString(event));return ResponseEntity.ok(评论成功);
}消费者链路拆分
KafkaListener(topics comment-events, groupId comment-group)
public void consumeComment(String msg) {CommentEvent event JSON.parseObject(msg, CommentEvent.class);commentRepository.save(event.toEntity()); // 持久化auditService.asyncAudit(event); // 审核notifyService.notifyAuthor(event); // 通知作者
}好处写入、审核、通知等子任务可独立扩展互不影响。 3️⃣ 播放记录异步收集 背景
用户每播放/拖动一次客户端都会上报播放信息高并发环境下若同步写库会带来极大压力播放记录用于推荐和用户历史回顾非强实时 方案设计
PostMapping(/video/record)
public ResponseEntity? recordPlay(RequestBody PlayLogRequest req) {PlayRecordEvent event new PlayRecordEvent(req.getUserId(), req.getVideoId(), req.getTimestamp());kafkaTemplate.send(play-record-events, JSON.toJSONString(event));return ResponseEntity.ok().build();
}异步消费者持久化或写入 Elasticsearch
KafkaListener(topics play-record-events, groupId record-group)
public void consumePlayRecord(String msg) {PlayRecordEvent event JSON.parseObject(msg, PlayRecordEvent.class);playRecordService.saveRecord(event);
}可根据需要定向入库至 MySQL、Elasticsearch、ClickHouse 等。 ✅ 技术优势总结
优势说明⚡ 响应更快控制层快速返回异步处理耗时任务 系统解耦各模块职责清晰便于扩展、测试、部署 削峰填谷高并发写操作统一缓冲防止系统过载 易于链路拓展后续可接入 AI 审核、内容推荐、数据分析等服务 提高容错性消息存储于 Kafka失败可重试 简历中的一句话描述 利用 Kafka 构建弹幕发送、评论发布、播放记录等高频操作的异步处理链路实现写操作解耦、响应提速与高并发场景下系统稳定性提升。 可选优化方向
支持批量消费入库Kafka Topic 按用户/视频维度分区提升吞吐接入死信队列DLQ容错异常事件消息格式标准化 schema 管理如 Avro 总结
本篇博客分享了如何利用 Kafka 在一个中大型视频平台中实现多类高频操作的异步解耦帮助系统在高并发下保持高可用与高性能。这一架构思想同样适用于电商下单、支付、社交私信、点赞、游戏日志、成就等场景。 非常好以下是关于 Kafka 在 Spring Boot 项目中的结构位置、如何调用和使用 的详细讲解包括 组件职责划分、调用流程、配置方法以及关键代码解析。本讲解既适合入门者了解项目结构也适合用于博客/面试准备。 Kafka 在 Spring Boot 项目中的结构与使用详解含代码
一、Kafka 在项目中的整体结构位置
在一个典型的 Spring Boot 项目中Kafka 主要扮演 消息解耦中间件 的角色负责“接收请求 - 异步发送消息 - 消费处理任务”的链路。结构如下 src└── main├── controller // 控制层接收请求调用KafkaProducer├── service // 业务层处理逻辑或封装消费者回调├── kafka│ ├── KafkaProducer // 封装 KafkaTemplate 发送逻辑│ ├── KafkaConsumer // 监听 Kafka 消费消息│ └── KafkaConfig // Kafka 连接配置├── dto // 数据传输对象Event/Request└── repository // 数据持久化二、调用流程图以评论发布为例
用户请求↓
Controller - /comment/publish↓
KafkaProducer.send(comment-events, msg)↓
Kafka Broker 消息队列↓
KafkaListener(topiccomment-events) - KafkaConsumer↓
处理逻辑如持久化、通知等三、Kafka 的使用方式 关键代码
1️⃣ Kafka 配置KafkaConfig.java
Configuration
public class KafkaConfig {Value(${spring.kafka.bootstrap-servers})private String bootstrapServers;Beanpublic ProducerFactoryString, String producerFactory() {MapString, Object config new HashMap();config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return new DefaultKafkaProducerFactory(config);}Beanpublic KafkaTemplateString, String kafkaTemplate() {return new KafkaTemplate(producerFactory());}
}application.yml 配置
spring:kafka:bootstrap-servers: localhost:9092consumer:group-id: my-consumer-groupauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer2️⃣ Kafka 生产者封装KafkaProducer.java
Component
public class KafkaProducer {Autowiredprivate KafkaTemplateString, String kafkaTemplate;public void send(String topic, String message) {kafkaTemplate.send(topic, message);}
}3️⃣ 控制器调用 KafkaCommentController.java
RestController
RequestMapping(/comment)
public class CommentController {Autowiredprivate KafkaProducer kafkaProducer;PostMapping(/publish)public ResponseEntity? publishComment(RequestBody CommentRequest req) {CommentEvent event new CommentEvent(req.getUserId(), req.getVideoId(), req.getContent());String json JSON.toJSONString(event);kafkaProducer.send(comment-events, json); // 调用 Kafkareturn ResponseEntity.ok(评论成功正在后台处理);}
}4️⃣ 消费者监听处理KafkaCommentConsumer.java
Component
public class KafkaCommentConsumer {Autowiredprivate CommentService commentService;KafkaListener(topics comment-events, groupId comment-group)public void consumeComment(String message) {CommentEvent event JSON.parseObject(message, CommentEvent.class);commentService.processComment(event); // 处理评论逻辑写库、审核、通知}
}5️⃣ 业务服务层CommentService.java
Service
public class CommentService {Autowiredprivate CommentRepository commentRepository;public void processComment(CommentEvent event) {Comment comment new Comment();comment.setUserId(event.getUserId());comment.setVideoId(event.getVideoId());comment.setContent(event.getContent());comment.setCreateTime(LocalDateTime.now());commentRepository.save(comment);}
}四、完整流程回顾图解
POST /comment/publish↓
[Controller] 收到请求↓
KafkaProducer.send(comment-events, json)↓
Kafka 中转存储解耦主流程↓
KafkaConsumer 监听到消息↓
调用 CommentService.processComment()↓
MySQL 写入评论表五、总结Kafka 在 Spring Boot 中的定位
模块作用KafkaTemplate生产者 API负责消息发送KafkaListener消费者注解监听 Kafka TopicKafkaConfig统一配置 Kafka 客户端参数Producer 封装类隔离业务与发送逻辑Consumer 封装类解耦消费逻辑便于扩展与重试DTO/Event标准化传输格式如 JSON 面试/简历描述模板 在 Spring Boot 项目中集成 Kafka用于实现评论、弹幕、播放记录等模块的异步消息处理。封装 Kafka 生产者与消费者逻辑通过 KafkaTemplate 发送消息、KafkaListener 异步消费解耦主流程、提升系统吞吐能力。 Kafka Broker 是什么、消息队列是怎么体现的 Kafka 调用的整体逻辑流程 一、Kafka Broker 是什么
Kafka Broker 是 Kafka 的核心组件之一它是 Kafka 服务端的实例负责
接收消息Producer 发来存储消息分布式日志持久化投递消息给消费者Consumer
一个 Kafka 集群由多个 Broker 组成每个 Broker 负责维护一部分数据即部分 Topic 的分区。 可以简单理解一个 Broker 就是一个 Kafka 节点它是 Kafka 消息的存储与转发中心。
例如
Producer ----- Kafka Broker(1) ---- Consumer↑存储的是某个 Topic 的某个分区二、Kafka 的“消息队列”是如何体现的
Kafka 并不是传统意义上“链表结构”的消息队列而是基于 “Topic Partition Offset” 组成的 持久化分布式消息队列。
关键概念
概念含义Topic类似一个消息主题比如 comment-eventsPartition每个 Topic 可以被划分成多个分区每个分区是一个有序队列Offset消息在分区中的位置编号偏移量 队列特性的体现方式
Kafka 的每个 分区本质上就是一个追加写的消息队列消费时是按照 offset 顺序读取的。每个消费者可以通过 offset 控制从哪个位置消费。消息存储在磁盘上可以 重复消费具备 高可用、持久化、可回溯 的能力。 三、Kafka 的完整调用流程是怎样的
以“用户发表评论”场景为例Kafka 在系统中经历以下流程 1️⃣ Producer 发送消息
kafkaTemplate.send(comment-events, messageJson);将业务消息如评论发送给 Kafka 的 某个 Topic。Kafka 客户端选择 Topic 的某个 Partition将消息写入。 2️⃣ Kafka Broker 接收并存储
Kafka Broker 收到消息写入对应的 Topic 分区文件中顺序追加。数据以高效二进制日志文件形式存储支持高并发写入。 3️⃣ Consumer 拉取消息
KafkaListener(topics comment-events)
public void consume(String message) { ... }消费者通过轮询方式从指定 Topic 的分区中拉取消息。Kafka 保证每个消费者组可以独立消费互不影响。消费者提交 offsetKafka 不会自动删除消息支持回放。 图解整个调用过程
[业务代码]↓ 调用 KafkaProducer.send()
Producer─────────────→ Kafka Broker↑分区日志队列消息队列本质↓
KafkaListener 拉取消息
Consumer↓
持久化 or 业务处理总结通俗理解 Kafka 的机制
概念解释Kafka BrokerKafka 的核心服务节点负责接收/存储/分发消息消息队列特性Topic → Partition → Offset 结构形成有序、可回溯、高并发的消息通道调用流程发送Producer→ 存储Broker→ 消费Consumer技术优势高性能、可持久化、异步解耦、支持海量并发与分布式部署 ✅ Kafka Broker 在代码中的体现位置
虽然 Kafka Broker 是运行在服务器端的后台服务不是直接在代码中“写出来”的一个类或对象但在代码中配置连接 Kafka Broker 的地址就等于是告诉应用程序 “Kafka Broker 在这里所有消息收发请连接这个地址。” 所以Broker 在代码中的体现主要是它的地址体现在配置文件里。 1. 在 application.yml 或 application.properties 中体现 Broker 地址
spring:kafka:bootstrap-servers: localhost:9092这句配置就告诉了 Kafka 客户端Producer、Consumer Kafka Broker 正在 localhost:9092 上运行。 所有消息都要发送/接收给这个地址的 Kafka 服务。 如果是连接一个集群可能会是
spring:kafka:bootstrap-servers: broker1:9092,broker2:9092,broker3:90922. 它在 KafkaTemplate 和 KafkaListener 背后被使用
当用如下代码发送消息
kafkaTemplate.send(comment-events, your message);或者监听消费
KafkaListener(topics comment-events)
public void consume(String msg) {...
}这些 Kafka 客户端组件背后Spring Boot 会使用在配置文件中定义的 bootstrap-servers自动构建连接到 Kafka Broker 的连接客户端。 示例代码对应关系总结
Kafka 功能代码体现实际作用Broker 地址配置bootstrap-servers: localhost:9092Kafka 客户端通过这个地址连接 BrokerProducer 发送消息kafkaTemplate.send(...)使用配置好的 Kafka 客户端发送消息到 BrokerConsumer 消费消息KafkaListener(...)Spring Boot 通过配置的地址监听 Kafka Broker 的消息 总结一句话
Kafka Broker 本身不在业务代码中显式出现它通过配置文件中的 bootstrap-servers 地址体现Producer 和 Consumer 都基于这个地址与 Kafka Broker 建立连接、收发消息。