当前位置: 首页 > news >正文

网站上挂百度广告联盟需要申请icp经营许可证吗最好用的搜索引擎排名

网站上挂百度广告联盟需要申请icp经营许可证吗,最好用的搜索引擎排名,做外贸那里发广告网站,广州市地铁站地图目录 一、引入依赖二、添加Kafka配置三、创建 Kafka 消费者(一)Kafka生产的消息是JSON 字符串1、方式一2、方式二:需要直接访问消息元数据 (二)Kafka生产的消息是对象Order 四、创建 启动类五、配置 Kafka 生产者&…

目录

  • 一、引入依赖
  • 二、添加Kafka配置
  • 三、创建 Kafka 消费者
    • (一)Kafka生产的消息是JSON 字符串
      • 1、方式一
      • 2、方式二:需要直接访问消息元数据
    • (二)Kafka生产的消息是对象Order
  • 四、创建 启动类
  • 五、配置 Kafka 生产者(可选)
    • (一)消息类型为json串
    • (二)消息类型为对象Order
  • 六、启动 Kafka 服务
  • 七、测试 Kafka 消费者
  • 九、测试和调试
  • 十、 结语

一、引入依赖

你需要在 pom.xml 中添加 spring-kafka 相关依赖:

<dependencies><!-- Spring Boot Web --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Spring Kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><!-- Spring Boot Starter for Logging (optional but useful for debugging) --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-logging</artifactId></dependency><!-- Spring Boot Starter for Testing --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency>
</dependencies>

二、添加Kafka配置

在 application.yml 或 application.properties 文件中配置 Kafka 连接属性:

  • application.yml 示例:
spring:kafka:bootstrap-servers: localhost:9092  # Kafka服务器地址consumer:group-id: my-consumer-group   # 消费者组IDauto-offset-reset: earliest   # 消费者从头开始读取(如果没有已提交的偏移量)key-deserializer: org.apache.kafka.common.serialization.StringDeserializer  # 设置key的反序列化器value-deserializer: org.apache.kafka.common.serialization.StringDeserializer  # 设置value的反序列化器为字符串listener:missing-topics-fatal: false    # 如果主题不存在,不抛出致命错误
  • application.properties 示例:
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-consumer-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.listener.missing-topics-fatal=false
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer  # 设置key的反序列化器
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer  # 设置value的反序列化器为字符串
  • 注意:spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer # 设置key的反序列化器
    spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer # 设置value的反序列化器为字符串
    以上配置说明Kafka生产的数据是json字符串,那么消费接收的数据默认也是json字符串,如果接收消息想用对象接受,需要自定义序列化器,比如以下配置
spring:kafka:producer:key-serializer: org.apache.kafka.common.serialization.StringSerializer  # 对 Key 使用 StringSerializervalue-serializer: org.springframework.kafka.support.serializer.ErrorHandlingSerializer  # 对 Value 使用 ErrorHandlingSerializerproperties:spring.json.value.default.type: com.example.Order  # 默认的 JSON 反序列化目标类型为 Order

三、创建 Kafka 消费者

创建一个 Kafka 消费者类来处理消息。你可以使用 @KafkaListener 注解来监听 Kafka 中的消息

(一)Kafka生产的消息是JSON 字符串

1、方式一

  • 如果消息是 JSON 字符串,你可以使用 StringDeserializer 获取消息后,再使用 ObjectMapper 将其转换为
    Java 对象(如 Order)。
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.stereotype.Service;@Service
@EnableKafka  // 启用 Kafka 消费者
public class KafkaConsumer {private final ObjectMapper objectMapper = new ObjectMapper();// 监听 Kafka 中的 order-topic 主题@KafkaListener(topics = "order-topic", groupId = "order-consumer-group")public void consumeOrder(String message) {try {// 将 JSON 字符串反序列化为 Order 对象Order order = objectMapper.readValue(message, Order.class);System.out.println("Received order: " + order);} catch (Exception e) {e.printStackTrace();}}}

说明:

  • @KafkaListener(topics = “my-topic”, groupId = “my-consumer-group”):
    topics 表示监听的 Kafka 主题,groupId 表示消费者所属的消费者组。
  • listen(String message): 该方法会被调用来处理收到的每条消息。在此示例中,我们打印出消息内容。

2、方式二:需要直接访问消息元数据

  • 可以通过 ConsumerRecord 来接收 Kafka 消息。这种方式适用于需要直接访问消息元数据(如
    topic、partition、offset)的场景,也适合手动管理消息消费和偏移量提交的情况。
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;@Service
public class KafkaConsumer {// 监听 Kafka 中的 order-topic 主题@KafkaListener(topics = "order-topic", groupId = "order-consumer-group")public void consumeOrder(ConsumerRecord<String, String> record) {// 获取消息的详细信息String key = record.key();           // 获取消息的 keyString value = record.value();       // 获取消息的 valueString topic = record.topic();       // 获取消息的 topicint partition = record.partition(); // 获取消息的分区long offset = record.offset();      // 获取消息的偏移量long timestamp = record.timestamp(); // 获取消息的时间戳// 处理消息(这里我们只是打印消息)System.out.println("Consumed record: ");System.out.println("Key: " + key);System.out.println("Value: " + value);System.out.println("Topic: " + topic);System.out.println("Partition: " + partition);System.out.println("Offset: " + offset);System.out.println("Timestamp: " + timestamp);}
}

(二)Kafka生产的消息是对象Order

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;@Service
public class KafkaConsumer {// 监听 Kafka 中的 order-topic 主题@KafkaListener(topics = "order-topic", groupId = "order-consumer-group")public void consumeOrder(ConsumerRecord<String, Order> record) {// 获取消息的详细信息String key = record.key();           // 获取消息的 keyOrder value = record.value();       // 获取消息的 valueString topic = record.topic();       // 获取消息的 topicint partition = record.partition(); // 获取消息的分区long offset = record.offset();      // 获取消息的偏移量long timestamp = record.timestamp(); // 获取消息的时间戳// 处理消息(这里我们只是打印消息)System.out.println("Consumed record: ");System.out.println("Key: " + key);System.out.println("Value: " + value);System.out.println("Topic: " + topic);System.out.println("Partition: " + partition);System.out.println("Offset: " + offset);System.out.println("Timestamp: " + timestamp);}
}

四、创建 启动类

确保你的 Spring Boot 启动类正确配置了 Spring Boot 应用程序启动。

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class KafkaConsumerApplication {public static void main(String[] args) {SpringApplication.run(KafkaConsumerApplication.class, args);}}

五、配置 Kafka 生产者(可选)

(一)消息类型为json串

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.stereotype.Service;
import com.fasterxml.jackson.databind.ObjectMapper;@Service
@EnableKafka
public class KafkaProducer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;  // 发送的是 String 类型消息private ObjectMapper objectMapper = new ObjectMapper();  // Jackson ObjectMapper 用于序列化// 发送订单到 Kafkapublic void sendOrder(String topic, Order order) {try {// 将 Order 对象转换为 JSON 字符串String orderJson = objectMapper.writeValueAsString(order);// 发送 JSON 字符串到 KafkakafkaTemplate.send(topic, orderJson);  // 发送字符串消息System.out.println("Order JSON sent to Kafka: " + orderJson);} catch (Exception e) {e.printStackTrace();}}
}

(二)消息类型为对象Order

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.stereotype.Service;@Service
@EnableKafka
public class KafkaProducer {@Autowiredprivate KafkaTemplate<String, Order> kafkaTemplate;// 发送订单到 Kafkapublic void sendOrder(String topic, Order order) {kafkaTemplate.send(topic, order);  // 发送订单对象,Spring Kafka 会自动将 Order 转换为 JSON}
}

六、启动 Kafka 服务

启动 Kafka 服务

bin/kafka-server-start.sh config/server.properties

七、测试 Kafka 消费者

你可以通过向 Kafka 发送消息来测试消费者是否工作正常。假设你已经在 Kafka 中创建了一个名为 my-topic 的主题,可以使用 KafkaProducer 来发送消息:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class KafkaController {@Autowiredprivate KafkaProducer kafkaProducer;@GetMapping("/sendOrder")public String sendOrder() {Order order = new Order();order.setOrderId(1L);order.setUserId(123L);order.setProduct("Laptop");order.setQuantity(2);order.setStatus("Created");kafkaProducer.sendOrder("order-topic", order);return "Order sent!";}
}

当你访问 /sendOrder端点时,KafkaProducer 会将消息发送到 Kafka,KafkaConsumer 会接收到这条消息并打印出来。

九、测试和调试

你可以通过查看 Kafka 消费者日志,确保消息已经被成功消费。你还可以使用 KafkaTemplate 发送消息,并确保 Kafka 生产者和消费者之间的连接正常。

十、 结语

至此,你已经在 Spring Boot 中成功配置并实现了 Kafka 消费者和生产者。你可以根据需要扩展功能,例如处理更复杂的消息类型、批量消费等。

http://www.tj-hxxt.cn/news/10515.html

相关文章:

  • 国外最火的网站指数型基金怎么买
  • 一家做特卖的网站叫什么时候外包seo服务口碑好
  • 做钓鱼网站论坛网站关键词怎么优化到首页
  • 网站 商城 app 建设品牌营销方案
  • 手机自助建站系统品牌推广方式有哪些
  • 龙华网站建设招聘西安优化外包
  • seo北京网站推广今日郑州头条最新新闻
  • 青岛网站建设 新视点seo文章代写平台
  • 网站建设新闻资讯如何做市场推广方案
  • 佛山如何网站建设在哪里做网络营销具有哪些特点
  • 广州市广园路建设公司网站企业文化内容范本
  • 私人承接做网站多少钱智能建站系统
  • 成都品牌营销策划有限公司网站优化网站
  • 烟台建设用地规划查询网站营销策略ppt
  • 曹县商城网站建设推广优化排名
  • 品牌的网站建设百度seo排名优化提高流量
  • 李氏牛仔网站建设风济宁百度推广电话
  • 网站首页html代码在哪最新百度新闻
  • 周口学做网站宁波营销型网站建设优化建站
  • 上海电子商务网站开发网络推广营销网站建设专家
  • 免费生成手机网站自己怎么开电商平台
  • 深圳网站开发搜行者seo无锡今日头条新闻
  • 免费网站服务器安全综合性b2b电子商务平台网站
  • 校园网站建设方案书最新国内新闻10条
  • 怎么做美瞳网站网络推广公司运营
  • 韩国做美食网站网址链接
  • 数据库网站制作合肥关键词排名优化
  • 维护一个网站全自动推广软件
  • 手机网站建设域名空间网站搜索排名优化
  • 企业网站建设用什么廊坊关键词排名优化