当前位置: 首页 > news >正文

html网站模板源码wordpress下载破解

html网站模板源码,wordpress下载破解,企业代码,宝山做网站公司Kafka 是一款高吞吐量、低延迟的分布式消息系统。本文将详细介绍如何在 Spring Boot 项目中使用 Kafka 进行消息接收与消费#xff0c;并结合幂等和重试机制#xff0c;确保消息消费的可靠性和系统的扩展性。我们将以电商交易系统为案例进行深入解析。 1. 系统架构概览 在电…Kafka 是一款高吞吐量、低延迟的分布式消息系统。本文将详细介绍如何在 Spring Boot 项目中使用 Kafka 进行消息接收与消费并结合幂等和重试机制确保消息消费的可靠性和系统的扩展性。我们将以电商交易系统为案例进行深入解析。 1. 系统架构概览 在电商系统中Kafka 常用于订单状态变更、库存变化等事件的异步处理。 ---------------- Kafka ---------------- | 订单服务 | --- Produce --- | 消费服务 | | (Order Service)| Topic | (Consumer Service)| ---------------- ----------------| |MySQL MySQL主要流程 订单服务接收用户订单请求后异步将订单信息发送到 Kafka。消费服务从 Kafka 中消费订单信息更新库存、生成发货信息等操作。数据库使用 MySQL 存储订单和库存数据并通过 MyBatis 实现持久化操作。 2. Kafka 的基础介绍 Kafka 是一种基于发布-订阅模式的消息系统支持高吞吐、分区与复制等机制具备容错和可扩展的特点。它的主要组成部分有 Producer生产者向 Kafka 的 Topic 发送消息。Consumer消费者从 Kafka 的 Topic 读取消息。Broker代理Kafka 的服务器集群。Topic主题消息的分类单位。Partition分区用于分布式处理消息。 3. 项目环境搭建 3.1 Maven 依赖 在 Spring Boot 项目中我们通过 spring-kafka 提供对 Kafka 的集成。还需要引入 MyBatis 和 MySQL 相关依赖。 dependencies!-- Spring Boot Kafka --dependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactId/dependency!-- MySQL Driver --dependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactId/dependency!-- MyBatis --dependencygroupIdorg.mybatis.spring.boot/groupIdartifactIdmybatis-spring-boot-starter/artifactIdversion2.2.0/version/dependency!-- Spring Boot Starter Web --dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependency!-- Lombok (可选用于简化代码) --dependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdoptionaltrue/optional/dependency /dependencies3.2 数据库表结构设计 为实现电商系统的消息接收与消费以下是两个主要数据库表订单表和消费记录表。 订单表orders存储订单的基础信息。消费记录表message_consume_record记录消费过的消息用于幂等校验。 CREATE TABLE orders (id BIGINT AUTO_INCREMENT PRIMARY KEY,order_no VARCHAR(64) NOT NULL,user_id BIGINT NOT NULL,total_price DECIMAL(10, 2) NOT NULL,status INT NOT NULL,created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP );CREATE TABLE message_consume_record (id BIGINT AUTO_INCREMENT PRIMARY KEY,message_key VARCHAR(64) NOT NULL UNIQUE,consumed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP );4. Kafka 消息生产与接收实现 4.1 生产者配置 在 Spring Boot 中我们可以通过 KafkaTemplate 发送消息。首先在 application.yml 中配置 Kafka 的基础信息 spring:kafka:bootstrap-servers: localhost:9092producer:retries: 3key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerbootstrap-servers: 这是 Kafka 服务的地址Kafka 集群通常由多个 Broker 组成每个 Broker 提供消息的存储与转发功能。这里指定了本地的 Kafka 服务器localhost:9092如果有多个 Broker可以用逗号分隔例如localhost:9092,localhost:9093。 retries: 当消息发送失败时生产者将重试发送的次数。这里配置了 3 次重试。这在网络不稳定或 Kafka 节点暂时不可用时非常有用可以有效提高消息发送成功率。 key-serializer: 生产者发送的消息可以有一个键值对。key-serializer 用于将消息的键序列化为字节数组。这里使用了 StringSerializer表示消息的键是字符串形式序列化为字节后发送。 value-serializer: 类似于键value-serializer 用于将消息的值序列化为字节数组。配置 StringSerializer 表示消息内容是字符串。 4.2 消息生产示例 Service public class OrderProducer {Autowiredprivate KafkaTemplateString, String kafkaTemplate;public void sendOrderMessage(String orderId) {kafkaTemplate.send(order-topic, orderId);} }在 OrderService 中用户提交订单后可以将订单 ID 发送至 Kafka Service public class OrderService {Autowiredprivate OrderProducer orderProducer;public void createOrder(OrderDTO order) {// 保存订单逻辑...orderProducer.sendOrderMessage(order.getOrderId());} }5. 消息消费实现 5.1 消费者配置 在消费者中我们需要定义 KafkaListener 注解监听 Kafka 主题并从中接收消息。 spring:kafka:consumer:group-id: order-groupauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializergroup-id: 消费者组 ID。Kafka 允许多个消费者组监听同一个 Topic每个消费者组可以独立消费消息。此处配置 order-group意味着该消费者属于订单消费逻辑的消费者组。 auto-offset-reset: 指定消费者在没有初始偏移量offset或当前偏移量无效的情况下从哪里开始读取消息。earliest 表示从最早的可用消息开始消费这对于新启动的消费者非常有用能够确保读取历史数据。 key-deserializer: 将接收到的消息键从字节数组反序列化为 Java 对象。这里配置 StringDeserializer表示键是字符串。 value-deserializer: 类似于键的反序列化value-deserializer 用于将消息内容反序列化为 Java 对象。配置 StringDeserializer表示消息内容是字符串。 5.2 消息消费示例 Service public class OrderConsumer {Autowiredprivate OrderService orderService;KafkaListener(topics order-topic, groupId order-group)public void consumeOrder(String orderId) {orderService.processOrder(orderId);} }在 OrderService 中处理接收到的订单消息 Service public class OrderService {Autowiredprivate OrderMapper orderMapper;Transactionalpublic void processOrder(String orderId) {// 根据订单 ID 更新订单状态、库存等操作Order order orderMapper.findById(orderId);// 更新订单逻辑...} }6. 幂等性保证 Kafka 的消息消费可能会因为网络问题或其他故障导致重复消费因此在消费消息时需要考虑幂等性。我们可以通过在数据库中存储每个消息的唯一标识来实现幂等。 6.1 幂等校验实现 在消费消息时首先检查该消息是否已经被消费过 Service public class OrderConsumer {Autowiredprivate MessageConsumeRecordMapper consumeRecordMapper;KafkaListener(topics order-topic, groupId order-group)public void consumeOrder(String orderId) {if (consumeRecordMapper.existsByMessageKey(orderId)) {// 如果已经处理过该消息直接返回return;}// 处理订单orderService.processOrder(orderId);// 记录已处理消息consumeRecordMapper.insertConsumeRecord(orderId);} }MessageConsumeRecordMapper 接口用于操作消费记录表 Mapper public interface MessageConsumeRecordMapper {boolean existsByMessageKey(String messageKey);void insertConsumeRecord(String messageKey); }通过这种方式我们确保了每条消息只被消费一次避免重复处理订单数据。 7. 重试机制实现 为了保证消息的可靠消费可能会需要对消费失败的消息进行重试。Kafka 提供了自动重试机制但在多次重试失败后仍然可能需要手动处理。因此我们可以通过将消费失败的消息保存至数据库并定期进行重试的方式实现可靠的消息处理。 7.1 消费失败记录表设计 CREATE TABLE failed_message (id BIGINT AUTO_INCREMENT PRIMARY KEY,message_key VARCHAR(64) NOT NULL,payload TEXT NOT NULL,failed_reason TEXT,retry_count INT DEFAULT 0,created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP );7.2 重试机制实现 在消费消息失败时将消息记录到失败表中并定期进行重试。 Service public class OrderConsumer {Autowiredprivate FailedMessageMapper failedMessageMapper;KafkaListener(topics order-topic, groupId order-group)public void consumeOrder(String orderId) {try {orderService.processOrder(orderId);} catch (Exception e) {failedMessageMapper.insertFailedMessage(orderId, e.getMessage());}} } 通过定时任务或手动触发定期查询失败的消息并重新消费 Service public class FailedMessageRetryService {Autowiredprivate FailedMessageMapper failedMessageMapper;Scheduled(fixedDelay 60000) // 每分钟重试一次public void retryFailedMessages() {ListFailedMessage failedMessages failedMessageMapper.findAll();for (FailedMessage message : failedMessages) {try {orderService.processOrder(message.getPayload());failedMessageMapper.deleteById(message.getId());} catch (Exception e) {failedMessageMapper.incrementRetryCount(message.getId());}}} }8. 扩展性设计 为了使系统具备良好的扩展性我们需要考虑以下几个方面 8.1 支持多种消息格式 除了支持 Kafka 消息我们可以通过设计合理的接口结构扩展系统支持其他消息队列或 HTTP 请求的接入。例如通过创建统一的 MessageConsumer 接口任何类型的消息都可以实现消费逻辑。 public interface MessageConsumer {void consume(String payload); }Service public class KafkaOrderConsumer implements MessageConsumer {Overridepublic void consume(String payload) {// Kafka 消息消费逻辑} }通过这种设计可以轻松添加新的消息类型或处理逻辑而不需要修改现有代码。 8.2 动态配置 为了增强系统的灵活性系统可以支持通过数据库或配置文件动态调整消息消费逻辑。例如可以在配置文件中定义不同业务的消费逻辑 message-consumers:order:type: kafkatopic: order-topicuser:type: httpurl: http://example.com/user/message通过读取这些配置系统可以动态选择不同的消费逻辑从而增强扩展性。 9. 性能优化 9.1 异步消费 为了提高消费速度可以将消息的处理逻辑放入线程池中异步执行从而避免阻塞 Kafka 消费的主线程。 Async public void processOrderAsync(String orderId) {orderService.processOrder(orderId); }9.2 批量消费 Kafka 支持批量消费消息这样可以减少 Kafka 客户端与 Broker 之间的交互次数提升性能。在 Spring Boot 中可以通过配置 max.poll.records 参数控制每次批量消费的消息数量。 spring:kafka:consumer:max-poll-records: 5009.3 分区与并行消费 通过为 Kafka 的 Topic 配置多个分区并为消费者组中的消费者分配不同的分区可以实现并行消费从而提升系统的消费能力。 spring:kafka:consumer:concurrency: 310. Kafka 防止 MQ 队列堆积太多导致内存溢出问题 在实际的生产环境中当消费速度低于消息的生产速度时Kafka 消费者端的消息队列可能会出现堆积。如果消息堆积时间过长会导致 Kafka 中的分区文件过大甚至在消费者端可能造成内存溢出。因此我们需要在架构设计中考虑如何有效防止消息堆积的问题。 以下是一些常见的应对策略 10.1 提高消费速度 当 Kafka 的消费速度低于生产速度时最直接的应对措施就是提升消费的速度 并行消费通过配置 Kafka 消费者的 concurrency 参数来增加消费者实例的数量。Kafka 使用分区来进行负载均衡分区的数量决定了并发消费的能力。因此增加分区数可以提升消费者的并发处理能力。 spring:kafka:consumer:concurrency: 3 # 配置多个消费者进行并行处理批量消费通过 max.poll.records 参数配置每次拉取的消息数量。增加批量消费可以减少 Kafka 消费者与 Broker 之间的交互从而提升性能。 spring:kafka:consumer:max-poll-records: 500 # 每次批量拉取 500 条消息10.2 优化消息处理逻辑 在消费端消息的处理速度是决定 Kafka 消费效率的关键。因此需要对消费逻辑进行优化 异步处理在消息处理完成后再返回响应可能导致整个消费过程变慢。可以通过使用异步任务处理消息内容从而避免阻塞 Kafka 消费的主线程。可以结合 Spring 的 Async 注解实现异步处理。 Async public void processOrderAsync(String orderId) {// 异步处理订单消息orderService.processOrder(orderId); }缩短消息处理时间简化业务逻辑避免冗长的处理流程。使用缓存等方式减少对数据库的频繁访问降低 I/O 操作带来的性能开销。 10.3 调整 Kafka 生产者端的速率 生产者端的消息发送速率直接影响消息的堆积情况。当消费端无法跟上生产端的速度时适当限制生产者的消息发送速率是一个有效的策略 限流机制在生产者端通过限流策略控制每秒钟向 Kafka 发送的消息数量确保消费者有足够的时间处理消息。例如可以使用 RateLimiter 实现限流。 RateLimiter rateLimiter RateLimiter.create(1000); // 每秒最多发送 1000 条消息public void sendMessage(String topic, String message) {rateLimiter.acquire(); // 获取许可kafkaTemplate.send(topic, message); }分布式限流如果消息的生产端部署在多个节点上可以使用 Redis 等工具实现分布式限流。 10.4 设置合适的消费位移提交策略 Kafka 消费者有两种提交消费位移的方式自动提交和手动提交。默认情况下Kafka 会每隔一段时间自动提交消费位移。如果消费端发生异常未能处理的消息在下次重新拉取时会再次被消费。为了避免消息重复消费我们可以将消费位移的提交改为手动提交确保消息处理完后再提交位移。 spring:kafka:consumer:enable-auto-commit: false # 关闭自动提交位移手动提交消费位移 try {// 消费处理消息processMessage(record);// 手动提交位移acknowledgment.acknowledge(); } catch (Exception e) {// 处理异常 }10.5 配置 Kafka 消息保留策略 如果消息堆积严重可以通过 Kafka 的 retention.ms 参数设置消息的存储时间确保超过存储时间的消息自动删除防止 Kafka 分区文件无限制增长。 log.retention.ms604800000 # 配置 Kafka 日志文件的保留时间单位为毫秒这里设置为 7 天此外可以通过配置 log.retention.bytes 来限制 Kafka 每个分区的日志文件大小确保超出大小限制后自动删除最早的消息。 log.retention.bytes1073741824 # 配置 Kafka 分区日志文件的最大大小单位为字节这里设置为 1 GB10.6 使用 Kafka 消息压缩 对于大数据量的消息可以启用 Kafka 消息压缩功能减少消息的占用空间从而提升生产和消费的效率。Kafka 支持多种压缩算法包括 GZIP、LZ4 和 SNAPPY。 spring:kafka:producer:compression-type: gzip # 启用 GZIP 压缩压缩不仅可以减少网络传输的数据量还可以降低 Kafka Broker 和消费端的存储压力从而减少消息堆积的可能性。
文章转载自:
http://www.morning.fesiy.com.gov.cn.fesiy.com
http://www.morning.kjyqr.cn.gov.cn.kjyqr.cn
http://www.morning.zqmdn.cn.gov.cn.zqmdn.cn
http://www.morning.jwdys.cn.gov.cn.jwdys.cn
http://www.morning.bwqcx.cn.gov.cn.bwqcx.cn
http://www.morning.mxnfh.cn.gov.cn.mxnfh.cn
http://www.morning.brwp.cn.gov.cn.brwp.cn
http://www.morning.cttgj.cn.gov.cn.cttgj.cn
http://www.morning.hclplus.com.gov.cn.hclplus.com
http://www.morning.hqlnp.cn.gov.cn.hqlnp.cn
http://www.morning.pdghl.cn.gov.cn.pdghl.cn
http://www.morning.bpttm.cn.gov.cn.bpttm.cn
http://www.morning.kzhxy.cn.gov.cn.kzhxy.cn
http://www.morning.ljzss.cn.gov.cn.ljzss.cn
http://www.morning.wdpbq.cn.gov.cn.wdpbq.cn
http://www.morning.qmtzq.cn.gov.cn.qmtzq.cn
http://www.morning.qsyyp.cn.gov.cn.qsyyp.cn
http://www.morning.sfqtf.cn.gov.cn.sfqtf.cn
http://www.morning.rwmq.cn.gov.cn.rwmq.cn
http://www.morning.lmqw.cn.gov.cn.lmqw.cn
http://www.morning.kdxzy.cn.gov.cn.kdxzy.cn
http://www.morning.nshhf.cn.gov.cn.nshhf.cn
http://www.morning.bwttj.cn.gov.cn.bwttj.cn
http://www.morning.zwmjq.cn.gov.cn.zwmjq.cn
http://www.morning.fbtgp.cn.gov.cn.fbtgp.cn
http://www.morning.pttrs.cn.gov.cn.pttrs.cn
http://www.morning.lgmty.cn.gov.cn.lgmty.cn
http://www.morning.wftrs.cn.gov.cn.wftrs.cn
http://www.morning.xckqs.cn.gov.cn.xckqs.cn
http://www.morning.chgmm.cn.gov.cn.chgmm.cn
http://www.morning.rgrys.cn.gov.cn.rgrys.cn
http://www.morning.kltmt.cn.gov.cn.kltmt.cn
http://www.morning.jqmqf.cn.gov.cn.jqmqf.cn
http://www.morning.pqkyx.cn.gov.cn.pqkyx.cn
http://www.morning.pzwfw.cn.gov.cn.pzwfw.cn
http://www.morning.tgqzp.cn.gov.cn.tgqzp.cn
http://www.morning.bkkgt.cn.gov.cn.bkkgt.cn
http://www.morning.llxyf.cn.gov.cn.llxyf.cn
http://www.morning.bwxph.cn.gov.cn.bwxph.cn
http://www.morning.ypklb.cn.gov.cn.ypklb.cn
http://www.morning.weitao0415.cn.gov.cn.weitao0415.cn
http://www.morning.nwpnj.cn.gov.cn.nwpnj.cn
http://www.morning.hjrjy.cn.gov.cn.hjrjy.cn
http://www.morning.dbqcw.com.gov.cn.dbqcw.com
http://www.morning.lsnbx.cn.gov.cn.lsnbx.cn
http://www.morning.htbgz.cn.gov.cn.htbgz.cn
http://www.morning.jhxtm.cn.gov.cn.jhxtm.cn
http://www.morning.btqqh.cn.gov.cn.btqqh.cn
http://www.morning.sqgqh.cn.gov.cn.sqgqh.cn
http://www.morning.langlaitech.cn.gov.cn.langlaitech.cn
http://www.morning.zxznh.cn.gov.cn.zxznh.cn
http://www.morning.nwfpl.cn.gov.cn.nwfpl.cn
http://www.morning.gtjkh.cn.gov.cn.gtjkh.cn
http://www.morning.yqlrq.cn.gov.cn.yqlrq.cn
http://www.morning.rfgc.cn.gov.cn.rfgc.cn
http://www.morning.rfmzc.cn.gov.cn.rfmzc.cn
http://www.morning.xlmpj.cn.gov.cn.xlmpj.cn
http://www.morning.btns.cn.gov.cn.btns.cn
http://www.morning.yrdn.cn.gov.cn.yrdn.cn
http://www.morning.fnpmf.cn.gov.cn.fnpmf.cn
http://www.morning.dmrjx.cn.gov.cn.dmrjx.cn
http://www.morning.jqwpw.cn.gov.cn.jqwpw.cn
http://www.morning.rnzwh.cn.gov.cn.rnzwh.cn
http://www.morning.prjty.cn.gov.cn.prjty.cn
http://www.morning.wcqxj.cn.gov.cn.wcqxj.cn
http://www.morning.dongyinet.cn.gov.cn.dongyinet.cn
http://www.morning.fynkt.cn.gov.cn.fynkt.cn
http://www.morning.nylbb.cn.gov.cn.nylbb.cn
http://www.morning.wnywk.cn.gov.cn.wnywk.cn
http://www.morning.wpspf.cn.gov.cn.wpspf.cn
http://www.morning.nhdw.cn.gov.cn.nhdw.cn
http://www.morning.tmbfz.cn.gov.cn.tmbfz.cn
http://www.morning.ryspp.cn.gov.cn.ryspp.cn
http://www.morning.dgng.cn.gov.cn.dgng.cn
http://www.morning.saastob.com.gov.cn.saastob.com
http://www.morning.rpjr.cn.gov.cn.rpjr.cn
http://www.morning.ypcbm.cn.gov.cn.ypcbm.cn
http://www.morning.ddjp.cn.gov.cn.ddjp.cn
http://www.morning.rhpy.cn.gov.cn.rhpy.cn
http://www.morning.nrbcx.cn.gov.cn.nrbcx.cn
http://www.tj-hxxt.cn/news/249058.html

相关文章:

  • 网站维护中一般要多长时间wordpress图片自动加广告
  • 企业品牌网站建设公司厦门中小企业建网站补助
  • tklink的登录做网站2022新闻热点事件及评论
  • 博物馆门户网站建设目标个人网站支付解决方案
  • 泰州企业建站系统郴州网站排名优化
  • 2022百度搜索风云榜如何做网站seo韩小培
  • 做淘宝客没有网站怎么做wordpress 兔
  • 龙华网站网页设计在百度做推广需要网站
  • 信息服务平台网站名称软件开发平台合同
  • 云南网站建设找天软专注营销型网站建设公司 做网站
  • 长沙建站公司哪有网站定制营销
  • 做视频网站需要多大带宽常见的cms系统
  • 厦门建设工程招标中心的网站wordpress 新建模板
  • 建设销售型网站信阳建设监理协会网站
  • 织梦网站产品河南省新闻最新消息
  • 企业建设网站的目标网站设计怎么做
  • 网站首页制作教程视频多语言网站系统
  • 江苏省城乡与建设厅网站织梦网站首页
  • 软件网站开发公司名字山东省工程建设招标信息网站
  • 国外的ui设计思想网站烟台企业做网站
  • 汕头中文建站模板淮北网站建设公司
  • 十堰做网站的深圳防疫今天最新规定
  • wordpress更新的文章编辑器不好用上海seo
  • 做网站p图工具郑州seo公司哪家好
  • wordpress多站点命名石家庄logo标志设计
  • 空间建设网站房山营销型网站建设
  • 建一个多用户团购网站需要多少钱pc手机模板网站建设
  • 做哪些网站流量最大最有创意的logo设计
  • 广州门户网站开发做网站的公司有
  • 微盟属于营销型手机网站wordpress 坐标