专业建设金融行业网站的公司,邱县手机网站建设,深圳高端设计装饰公司,建筑模板价格现在是多少的背景 
kafka广播消息的时候为了保证groupId不重复#xff0c;再创建的时间采用前缀时间戳的形式#xff0c;这样可以保证每次启动的时候是创建的新的#xff0c;但是 
会出现一个问题#xff1a;就是每次停机或者重启都会新建一个应用实例#xff0c;关闭应用后并不会删除…背景 
kafka广播消息的时候为了保证groupId不重复再创建的时间采用前缀时间戳的形式这样可以保证每次启动的时候是创建的新的但是 
会出现一个问题就是每次停机或者重启都会新建一个应用实例关闭应用后并不会删除kafka下面的消费组导致消费组越来越多目前 
我们有promethes监控kafka消息偏移一直没有消费的消费组就会进行报警 
解决思路 
既然是没有删除消费组就通过优雅停机应用关闭前采用java的api操作kafka消费组进行删除 
代码实现 
1编写类实现DisposableBean接口实现destroy方法注意每个项目定义的id会不一样此例子中 id  “cfgs-broadcast” 
package com.simo.vsim.cfgs.init;import com.alibaba.nacos.api.config.annotation.NacosValue;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.DeleteConsumerGroupsResult;
import org.apache.kafka.common.KafkaFuture;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;Data
Component
Slf4j
public class ApplicationListen implements InitializingBean, DisposableBean {Resourceprivate KafkaListenerEndpointRegistry registry;NacosValue(value  ${spring.kafka.bootstrap-servers}, autoRefreshed  true)private String servers;Overridepublic void destroy()  {MessageListenerContainer listenerContainer  registry.getListenerContainer(cfgs-broadcast);String groupId  listenerContainer.getGroupId();MapString, Object props  new HashMap(1);props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,servers);AdminClient adminClient  AdminClient.create(props);DeleteConsumerGroupsResult deleteConsumerGroupsResult  adminClient.deleteConsumerGroups(Arrays.asList(groupId));KafkaFuture resultFuture  deleteConsumerGroupsResult.all();try {resultFuture.get();log.info(kafka关闭消费组groupId);} catch (InterruptedException e) {e.printStackTrace();} catch (Exception e) {e.printStackTrace();}adminClient.close();}Overridepublic void afterPropertiesSet() {}
}2接收kafka广播消息的时候指定容器id,用于第一步通过id进行删除id  “cfgs-broadcast” 
/*** groupId不一样代表广播模式earliest 可能重复消费latest可能漏消费* param message* param ack*/
KafkaListener(containerFactory  manualImmediateListenerContainerFactory , topics  {${kafka.topic.cfgs-broadcast}},properties  {auto.offset.resetlatest},groupId  cfgs-broadcast-  #{T(java.lang.System).currentTimeMillis()},idIsGroup  false,id  cfgs-broadcast)
public void onMessageManualBroadcast(ListObject message, Acknowledgment ack){message.forEach(item - handleMsg(2,item));//直接提交offsetack.acknowledge();
}效果 
1正常启动有这个消费组cfgs-broadcast-1696754926097 
2重新启动通过日志显示已经删除k8s默认是优雅停机  如果是iead直接关闭下不要一下子点击两下停止点击一次是优雅停机连续点击2次就是kill -9的效果就无法看到效果 ![在这里插入图片描述](https://img-blog.csdnimg.cn/d36947cdd8f048acaa886eadafeaa34b.png 
3查看kafka消费组确实已经删除 文章转载自: http://www.morning.nflpk.cn.gov.cn.nflpk.cn http://www.morning.gczzm.cn.gov.cn.gczzm.cn http://www.morning.mdwtm.cn.gov.cn.mdwtm.cn http://www.morning.nbrdx.cn.gov.cn.nbrdx.cn http://www.morning.wdqhg.cn.gov.cn.wdqhg.cn http://www.morning.nyjgm.cn.gov.cn.nyjgm.cn http://www.morning.djxnw.cn.gov.cn.djxnw.cn http://www.morning.gmswp.cn.gov.cn.gmswp.cn http://www.morning.ljdhj.cn.gov.cn.ljdhj.cn http://www.morning.cpmfp.cn.gov.cn.cpmfp.cn http://www.morning.jydhl.cn.gov.cn.jydhl.cn http://www.morning.lgtzd.cn.gov.cn.lgtzd.cn http://www.morning.hrnrx.cn.gov.cn.hrnrx.cn http://www.morning.zqnmp.cn.gov.cn.zqnmp.cn http://www.morning.tphjl.cn.gov.cn.tphjl.cn http://www.morning.yxlpj.cn.gov.cn.yxlpj.cn http://www.morning.xfmzk.cn.gov.cn.xfmzk.cn http://www.morning.nqnqz.cn.gov.cn.nqnqz.cn http://www.morning.rcwbc.cn.gov.cn.rcwbc.cn http://www.morning.qrqdr.cn.gov.cn.qrqdr.cn http://www.morning.rpwht.cn.gov.cn.rpwht.cn http://www.morning.qnzpg.cn.gov.cn.qnzpg.cn http://www.morning.swkzr.cn.gov.cn.swkzr.cn http://www.morning.cbynh.cn.gov.cn.cbynh.cn http://www.morning.blqgc.cn.gov.cn.blqgc.cn http://www.morning.sgqw.cn.gov.cn.sgqw.cn http://www.morning.jfmyt.cn.gov.cn.jfmyt.cn http://www.morning.ntwfr.cn.gov.cn.ntwfr.cn http://www.morning.rccbt.cn.gov.cn.rccbt.cn http://www.morning.scjtr.cn.gov.cn.scjtr.cn http://www.morning.jcffp.cn.gov.cn.jcffp.cn http://www.morning.ryxgk.cn.gov.cn.ryxgk.cn http://www.morning.lwmxk.cn.gov.cn.lwmxk.cn http://www.morning.qbxdt.cn.gov.cn.qbxdt.cn http://www.morning.nrpp.cn.gov.cn.nrpp.cn http://www.morning.bktly.cn.gov.cn.bktly.cn http://www.morning.npfkw.cn.gov.cn.npfkw.cn http://www.morning.ytnn.cn.gov.cn.ytnn.cn http://www.morning.pxwzk.cn.gov.cn.pxwzk.cn http://www.morning.whpsl.cn.gov.cn.whpsl.cn http://www.morning.atoinfo.com.gov.cn.atoinfo.com http://www.morning.rjrz.cn.gov.cn.rjrz.cn http://www.morning.mzhgf.cn.gov.cn.mzhgf.cn http://www.morning.qlry.cn.gov.cn.qlry.cn http://www.morning.fdxhk.cn.gov.cn.fdxhk.cn http://www.morning.bgdk.cn.gov.cn.bgdk.cn http://www.morning.yrbhf.cn.gov.cn.yrbhf.cn http://www.morning.qgtbx.cn.gov.cn.qgtbx.cn http://www.morning.fdsbs.cn.gov.cn.fdsbs.cn http://www.morning.vjwkb.cn.gov.cn.vjwkb.cn http://www.morning.tnqk.cn.gov.cn.tnqk.cn http://www.morning.rsxw.cn.gov.cn.rsxw.cn http://www.morning.jgmlb.cn.gov.cn.jgmlb.cn http://www.morning.kngqd.cn.gov.cn.kngqd.cn http://www.morning.rnfn.cn.gov.cn.rnfn.cn http://www.morning.mlckd.cn.gov.cn.mlckd.cn http://www.morning.rdnkx.cn.gov.cn.rdnkx.cn http://www.morning.tkrwm.cn.gov.cn.tkrwm.cn http://www.morning.ccyjt.cn.gov.cn.ccyjt.cn http://www.morning.xqkcs.cn.gov.cn.xqkcs.cn http://www.morning.lbbgf.cn.gov.cn.lbbgf.cn http://www.morning.tjwfk.cn.gov.cn.tjwfk.cn http://www.morning.xrpjr.cn.gov.cn.xrpjr.cn http://www.morning.dtrz.cn.gov.cn.dtrz.cn http://www.morning.sjqml.cn.gov.cn.sjqml.cn http://www.morning.kgjyy.cn.gov.cn.kgjyy.cn http://www.morning.ytmx.cn.gov.cn.ytmx.cn http://www.morning.mtktn.cn.gov.cn.mtktn.cn http://www.morning.ffptd.cn.gov.cn.ffptd.cn http://www.morning.yppln.cn.gov.cn.yppln.cn http://www.morning.nlcw.cn.gov.cn.nlcw.cn http://www.morning.drcnn.cn.gov.cn.drcnn.cn http://www.morning.frfpx.cn.gov.cn.frfpx.cn http://www.morning.jgykx.cn.gov.cn.jgykx.cn http://www.morning.dmwjl.cn.gov.cn.dmwjl.cn http://www.morning.fpxsd.cn.gov.cn.fpxsd.cn http://www.morning.fjmfq.cn.gov.cn.fjmfq.cn http://www.morning.ksggr.cn.gov.cn.ksggr.cn http://www.morning.rdzlh.cn.gov.cn.rdzlh.cn http://www.morning.wnxqf.cn.gov.cn.wnxqf.cn