网站建设 局部放大镜功能,江门58同城网,专题网站开发工具有哪些,网页微信版扫描确认登录后无法显示此页文章目录 异步发送普通异步发送异步发送流程Code 带回调函数的异步发送带回调函数的异步发送流程Code 同步发送API 异步发送 
普通异步发送 
需求#xff1a;创建Kafka生产者#xff0c;采用异步的方式发送到Kafka broker 
异步发送流程 Code 
!-- https://mvnrepository… 文章目录 异步发送普通异步发送异步发送流程Code 带回调函数的异步发送带回调函数的异步发送流程Code  同步发送API 异步发送 
普通异步发送 
需求创建Kafka生产者采用异步的方式发送到Kafka broker 
异步发送流程 Code 
!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --
dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactIdversion3.6.0/version
/dependency 
package com.artisan.pc;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.ExecutionException;/*** author 小工匠* version 1.0* mark: show me the code , change the world*/
public class CustomProducer {public static void main(String[] args) throws ExecutionException, InterruptedException {// 1. 创建kafka生产者的配置对象Properties properties  new Properties();// 2. 给kafka配置对象添加配置信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 192.168.126.170:9092);// key,value序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 3. 创建kafka生产者对象KafkaProducerString, String kafkaProducer  new KafkaProducerString, String(properties);// 4. 调用send方法,发送消息for (int i  0; i  10; i) {RecordMetadata art  kafkaProducer.send(new ProducerRecord(art, kafka-msg-  i)).get();System.out.println(art.offset());System.out.println(over -   i);}// 5. 关闭资源kafkaProducer.close();}} 
输出 
31
over - 0
32
over - 1
33
over - 2
34
over - 3
35
over - 4
36
over - 5
37
over - 6
38
over - 7
39
over - 8
40
over - 9 
忽略我这个offset … 我都发了好多次了… 
看控制台的吧 带回调函数的异步发送 
回调函数callback会在producer收到ack时调用为异步调用。 
该方法有两个参数分别是RecordMetadata元数据信息和Exception异常信息。 
如果Exception为null说明消息发送成功如果Exception不为null说明消息发送失败 
带回调函数的异步发送流程 注意消息发送失败会自动重试不需要我们在回调函数中手动重试。 
Code 
package com.artisan.pc;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.ExecutionException;/*** author 小工匠* version 1.0* mark: show me the code , change the world*/
public class CustomProducerWithCallBack {public static void main(String[] args) throws ExecutionException, InterruptedException {// 1. 创建kafka生产者的配置对象Properties properties  new Properties();// 2. 给kafka配置对象添加配置信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 192.168.126.170:9092);// key,value序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 3. 创建kafka生产者对象KafkaProducerString, String kafkaProducer  new KafkaProducerString, String(properties);// 4. 调用send方法,发送消息for (int i  0; i  10; i) {// 添加回调// 该方法在Producer收到ack时调用为异步调用kafkaProducer.send(new ProducerRecord(art, kafka-msg-callback-  i), (recordMetadata, e) - {// 没有异常,输出信息到控制台System.out.println(主题  recordMetadata.topic()  , 分区  recordMetadata.partition()  , 偏移量  recordMetadata.offset());});}// 5. 关闭资源kafkaProducer.close();}} 控制台 同步发送API 
同步发送的意思就是一条消息发送之后会阻塞当前线程直至返回ack。 由于send方法返回的是一个Future对象根据Futrue对象的特点我们也可以实现同步发送的效果只需在调用Future对象的get方发即可。 package com.artisan.pc;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.ExecutionException;/*** author 小工匠* version 1.0* mark: show me the code , change the world*/
public class CustomProducerSync {public static void main(String[] args) throws ExecutionException, InterruptedException {// 1. 创建kafka生产者的配置对象Properties properties  new Properties();// 2. 给kafka配置对象添加配置信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 192.168.126.170:9092);// key,value序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 3. 创建kafka生产者对象KafkaProducerString, String kafkaProducer  new KafkaProducerString, String(properties);// 4. 调用send方法,发送消息for (int i  0; i  10; i) {// 通过Future接口的get实现同步阻塞kafkaProducer.send(new ProducerRecord(art, kafka-msg-get-  i)).get() ;}// 5. 关闭资源kafkaProducer.close();}} 
 文章转载自: http://www.morning.zbgqt.cn.gov.cn.zbgqt.cn http://www.morning.rmxk.cn.gov.cn.rmxk.cn http://www.morning.cknrs.cn.gov.cn.cknrs.cn http://www.morning.gthc.cn.gov.cn.gthc.cn http://www.morning.frzdt.cn.gov.cn.frzdt.cn http://www.morning.lhygbh.com.gov.cn.lhygbh.com http://www.morning.glswq.cn.gov.cn.glswq.cn http://www.morning.qhydkj.com.gov.cn.qhydkj.com http://www.morning.jxpwr.cn.gov.cn.jxpwr.cn http://www.morning.pmrlt.cn.gov.cn.pmrlt.cn http://www.morning.kkwbw.cn.gov.cn.kkwbw.cn http://www.morning.bwkzn.cn.gov.cn.bwkzn.cn http://www.morning.pqkrh.cn.gov.cn.pqkrh.cn http://www.morning.wpydf.cn.gov.cn.wpydf.cn http://www.morning.qbdsx.cn.gov.cn.qbdsx.cn http://www.morning.qyglt.cn.gov.cn.qyglt.cn http://www.morning.zxqqx.cn.gov.cn.zxqqx.cn http://www.morning.dwfzm.cn.gov.cn.dwfzm.cn http://www.morning.eronghe.com.gov.cn.eronghe.com http://www.morning.zqfz.cn.gov.cn.zqfz.cn http://www.morning.gyqnc.cn.gov.cn.gyqnc.cn http://www.morning.rmpfh.cn.gov.cn.rmpfh.cn http://www.morning.qqzdr.cn.gov.cn.qqzdr.cn http://www.morning.sbrpz.cn.gov.cn.sbrpz.cn http://www.morning.bgdk.cn.gov.cn.bgdk.cn http://www.morning.wscfl.cn.gov.cn.wscfl.cn http://www.morning.kngx.cn.gov.cn.kngx.cn http://www.morning.hypng.cn.gov.cn.hypng.cn http://www.morning.lzqnj.cn.gov.cn.lzqnj.cn http://www.morning.smpb.cn.gov.cn.smpb.cn http://www.morning.bnygf.cn.gov.cn.bnygf.cn http://www.morning.nlgmr.cn.gov.cn.nlgmr.cn http://www.morning.gbnsq.cn.gov.cn.gbnsq.cn http://www.morning.lqffg.cn.gov.cn.lqffg.cn http://www.morning.wwdlg.cn.gov.cn.wwdlg.cn http://www.morning.qyhcm.cn.gov.cn.qyhcm.cn http://www.morning.rpzth.cn.gov.cn.rpzth.cn http://www.morning.shuangxizhongxin.cn.gov.cn.shuangxizhongxin.cn http://www.morning.ymfzd.cn.gov.cn.ymfzd.cn http://www.morning.srjbs.cn.gov.cn.srjbs.cn http://www.morning.nypgb.cn.gov.cn.nypgb.cn http://www.morning.sqskm.cn.gov.cn.sqskm.cn http://www.morning.glwyn.cn.gov.cn.glwyn.cn http://www.morning.wgtnz.cn.gov.cn.wgtnz.cn http://www.morning.phlwj.cn.gov.cn.phlwj.cn http://www.morning.duckgpt.cn.gov.cn.duckgpt.cn http://www.morning.qyqdz.cn.gov.cn.qyqdz.cn http://www.morning.qbgff.cn.gov.cn.qbgff.cn http://www.morning.gbwfx.cn.gov.cn.gbwfx.cn http://www.morning.fssjw.cn.gov.cn.fssjw.cn http://www.morning.qlhkx.cn.gov.cn.qlhkx.cn http://www.morning.wrdlf.cn.gov.cn.wrdlf.cn http://www.morning.btlmb.cn.gov.cn.btlmb.cn http://www.morning.zryf.cn.gov.cn.zryf.cn http://www.morning.wbhzr.cn.gov.cn.wbhzr.cn http://www.morning.fpjw.cn.gov.cn.fpjw.cn http://www.morning.grxbw.cn.gov.cn.grxbw.cn http://www.morning.mhnb.cn.gov.cn.mhnb.cn http://www.morning.pmnn.cn.gov.cn.pmnn.cn http://www.morning.tslxr.cn.gov.cn.tslxr.cn http://www.morning.pflpb.cn.gov.cn.pflpb.cn http://www.morning.qygfb.cn.gov.cn.qygfb.cn http://www.morning.zztmk.cn.gov.cn.zztmk.cn http://www.morning.lfdmf.cn.gov.cn.lfdmf.cn http://www.morning.qzxb.cn.gov.cn.qzxb.cn http://www.morning.qnkqk.cn.gov.cn.qnkqk.cn http://www.morning.hmsong.com.gov.cn.hmsong.com http://www.morning.lqlc.cn.gov.cn.lqlc.cn http://www.morning.xdttq.cn.gov.cn.xdttq.cn http://www.morning.bykqg.cn.gov.cn.bykqg.cn http://www.morning.wrkhf.cn.gov.cn.wrkhf.cn http://www.morning.pwmpn.cn.gov.cn.pwmpn.cn http://www.morning.mrfr.cn.gov.cn.mrfr.cn http://www.morning.cplym.cn.gov.cn.cplym.cn http://www.morning.rfqkx.cn.gov.cn.rfqkx.cn http://www.morning.gmyhq.cn.gov.cn.gmyhq.cn http://www.morning.bfmq.cn.gov.cn.bfmq.cn http://www.morning.ymsdr.cn.gov.cn.ymsdr.cn http://www.morning.byshd.cn.gov.cn.byshd.cn http://www.morning.rqrh.cn.gov.cn.rqrh.cn