网站文字广告代码,皖icp备 网站建设,互联网商业计划书模板范文,泉州建设局网站目录 Kafka生产者——向Kafka写入数据生产者概览创建Kafka生产者bootstrap.serverskey.serializervalue.serializer 发送消息到Kafka同步发送消息 Kafka生产者——向Kafka写入数据
不管是把Kafka作为消息队列、消息总线还是数据存储平台#xff0c;总是需要一个可以往Kafka写… 目录 Kafka生产者——向Kafka写入数据生产者概览创建Kafka生产者bootstrap.serverskey.serializervalue.serializer 发送消息到Kafka同步发送消息 Kafka生产者——向Kafka写入数据
不管是把Kafka作为消息队列、消息总线还是数据存储平台总是需要一个可以往Kafka写入数据的生产者、一个可以从Kafka读取数据的消费者或者一个兼具两种角色的应用程序。
生产者概览
一个应用程序会在很多情况下向Kafka写入消息记录用户的活动用于审计和分析、记录指标、记录日志、记录从智能家电收集到的信息、与其他应用程序进行异步通信、缓冲即将写入数据库的数据等等。不同的应用场景直接影响如何使用和配置生产者API。尽管生产者API使用起来很简单但消息的发送过程还是有点儿复杂。下图展示了向Kafka发送消息的主要步骤 先从创建一个ProducerRecord对象开始其中需要包含目标主题和要发送的内容。另外还可以指定键、分区、时间戳或标头。在发送ProducerRecord对象时生产者需要先把键和值对象序列化成字节数组这样才能在网络上传输。
接下来如果没有显式地指定分区那么数据将被传给分区器。分区器通常会基于ProducerRecord对象的键选择一个分区。选好分区以后生产者就知道该往哪个主题和分区发送这条消息了。紧接着该消息会被添加到一个消息批次里这个批次里的所有消息都将被发送给同一个主题和分区。有一个独立的线程负责把这些消息批次发送给目标broker。
broker在收到这些消息时会返回一个响应。如果消息写入成功就返回一个RecordMetaData对象其中包含了主题和分区信息以及消息在分区中的偏移量。如果消息写入失败则会返回一个错误。生产者在收到错误之后会尝试重新发送消息重试几次之后如果还是失败则会放弃重试并返回错误信息。
创建Kafka生产者
要向Kafka写入消息首先需要创建一个生产者对象并设置一些属性。Kafka生产者有3个必须设置的属性。
bootstrap.servers
broker的地址。可以由多个host:port组成生产者用它们来建立初始的Kafka集群连接。它不需要包含所有的broker地址因为生产者在建立初始连接之后可以从给定的broker那里找到其他broker的信息。不过还是建议至少提供两个broker地址因为一旦其中一个停机则生产者仍然可以连接到集群。
key.serializer
一个类名用来序列化消息的键。broker希望接收到的消息的键和值都是字节数组。生产者可以把任意Java对象作为键和值发送给broker但它需要知道如何把这些Java对象转换成字节数组。key.serializer必须被设置为一个实现了org.apache.kafka.common.serialization.Serializer接口的类生产者会用这个类把键序列化成字节数组。Kafka客户端默认提供了ByteArraySerializer、StringSerializer和IntegerSerializer等如果你只使用常见的几种Java对象类型就没有必要实现自己的序列化器。需要注意的是必须设置key.serializer这个属性尽管你可能只需要将值发送给Kafka。如果只需要发送值则可以将Void作为键的类型然后将这个属性设置为VoidSerializer。
value.serializer
一个类名用来序列化消息的值。与设置key.serializer属性一样需要将value.serializer设置成可以序列化消息值对象的类。
发送消息到Kafka
同步发送消息
同步发送消息很简单当Kafka返回错误或重试次数达到上限时生产者可以捕获到异常。这里需要考虑性能问题。根据Kafka集群繁忙程度的不同broker可能需要2毫秒或更长的时间来响应请求。如果采用同步发送方式那么发送线程在这段时间内就只能等待什么也不做甚至都不发送其他消息这将导致糟糕的性能。因此同步发送方式通常不会被用在生产环境中。
KafkaProducer一般会出现两种错误。一种是可重试错误这种错误可以通过重发消息来解决。例如对于连接错误只要再次建立连接就可以解决。对于“not leader for partition”非分区首领错误只要重新为分区选举首领就可以解决此时元数据也会被刷新。可以通过配置启用KafkaProducer的自动重试机制。如果在多次重试后仍无法解决问题则应用程序会收到重试异常。另一种错误则无法通过重试解决比如“Message size too large”消息太大。对于这种错误KafkaProducer不会进行任何重试而会立即抛出异常。