当前位置: 首页 > news >正文

浙江建站管理系统价格金塔精神文明建设网站

浙江建站管理系统价格,金塔精神文明建设网站,装酷网装修平台,wordpress+直接连接基于golang多消息队列中间件的封装nsq,rabbitmq,kafka 场景 在创建个人的公共方法库中有这样一个需求#xff0c;就是不同的项目会用到不同的消息队列中间件#xff0c;我的思路把所有的消息队列中间件进行封装一个消息队列接口#xff08;MQer#xff09;有两个方法一个…基于golang多消息队列中间件的封装nsq,rabbitmq,kafka 场景 在创建个人的公共方法库中有这样一个需求就是不同的项目会用到不同的消息队列中间件我的思路把所有的消息队列中间件进行封装一个消息队列接口MQer有两个方法一个生产一个消费那么在实例化对象的时候根据配置文件指定当前项目使用的那个消息队列中间件 接口模型 这个模型的核心思想是消息队列的核心功能生产者生产消息方法和消费者消费消息任何消息队列都必须有这两个功能根据如下代码消息队列中间件是可扩展的只需在实例化消息队列对象那里添加新消息队列的实现 // MQer 消息队列接口 type MQer interface {Producer(topic string, data []byte)Consumer(topic, channel string, ch chan []byte, f func(b []byte)) }// NewMQ 实例化消息队列对象 func NewMQ() MQer {switch conf.Conf.Default.Mq { // mq 设置的类型case nsq:return new(MQNsqService)case rabbit:return new(MQRabbitService)case kafka:return new(MQKafkaService)default:return new(MQNsqService)} }/* 配置文件结构设计mqType: # nsq, rabbit, kafka 这三个值然当然了是可扩展的nsq:producer: consumer: rabbit:addr: user: password: kafka:addr: */各个消息队列的实现 1. 依赖库 nsq : github.com/nsqio/go-nsqrabbitmq : github.com/streadway/amqpkafka : github.com/Shopify/sarama 2. nsq nsq结构体 // MQNsqService NSQ消息队列 type MQNsqService struct { }生产者 // Producer 生产者 func (m *MQNsqService) Producer(topic string, data []byte) {nsqConf : nsq.Config{}client, err : nsq.NewProducer(nsqServer, nsqConf)if err ! nil {log.Error([nsq]无法连接到队列)return}log.DebugF(fmt.Sprintf([生产消息] topic : %s -- %s, topic, string(data)))err client.Publish(topic, data)if err ! nil {log.Error([生产消息] 失败 err.Error())} }消费者 var (nsqServer conf.Conf.Default.Nsq.Producer // nsqServer )// Consumer 消费者 func (m *MQNsqService) Consumer(topic, channel string, ch chan []byte, f func(b []byte)) {mh, err : NewMessageHandler(nsqServer, channel)if err ! nil {log.Error(err)return}go func() {mh.SetMaxInFlight(1000)mh.Registry(topic, ch)}()go func() {for {select {case s : -ch:f(s)}}}()log.DebugF([NSQ] ServerID:%v %v started, channel, topic) }// MessageHandler MessageHandler type MessageHandler struct {msgChan chan *goNsq.Messagestop boolnsqServer stringChannel stringmaxInFlight int }// NewMessageHandler return new MessageHandler func NewMessageHandler(nsqServer string, channel string) (mh *MessageHandler, err error) {if nsqServer {err fmt.Errorf([NSQ] need nsq server)return}mh MessageHandler{msgChan: make(chan *goNsq.Message, 1024),stop: false,nsqServer: nsqServer,Channel: channel,}return }// Registry register nsq topic func (m *MessageHandler) Registry(topic string, ch chan []byte) {config : goNsq.NewConfig()if m.maxInFlight 0 {config.MaxInFlight m.maxInFlight}consumer, err : goNsq.NewConsumer(topic, m.Channel, config)if err ! nil {panic(err)}consumer.SetLogger(nil, 0)consumer.AddHandler(goNsq.HandlerFunc(m.handlerMessage))err consumer.ConnectToNSQLookupd(m.nsqServer)if err ! nil {panic(err)}m.process(ch) } rabbitmq 结构体 // MQRabbitService Rabbit消息队列 type MQRabbitService struct { }生产者 // Producer 生产者 func (m *MQRabbitService) Producer(topic string, data []byte) {mq, err : NewRabbitMQPubSub(topic)if err ! nil {log.Error([rabbit]无法连接到队列)return}//defer mq.Destroy()log.DebugF(fmt.Sprintf([生产消息] topic : %s -- %s, topic, string(data)))err mq.PublishPub(data)if err ! nil {log.Error([生产消息] 失败 err.Error())} }// NewRabbitMQPubSub 订阅模式创建 rabbitMq实例 (目前用的fanout模式) func NewRabbitMQPubSub(exchangeName string) (*RabbitMQ, error) {mq, err : NewRabbitMQ(, exchangeName, , )if mq nil || err ! nil {return nil, err}//获取connectionmq.conn, err amqp.Dial(mq.MqUrl)mq.failOnErr(err, failed to connect mq!)if mq.conn nil || err ! nil {return nil, err}//获取channelmq.channel, err mq.conn.Channel()mq.failOnErr(err, failed to open a channel!)return mq, err }...其余代码见源码: https://github.com/mangenotwork/common/tree/main/mq 消费者 // Consumer 消费者 func (m *MQRabbitService) Consumer(topic, serverId string, ch chan []byte, f func(b []byte)) {mh, err : NewRabbitMQPubSub(topic)if err ! nil {log.Error([rabbit]无法连接到队列)return}msg : mh.RegistryReceiveSub()go func(m -chan amqp.Delivery) {for {select {case s : -m:f(s.Body)}}}(msg)log.DebugF([Rabbit] ServerID:%v %v started, serverId, topic) }// NewRabbitMQPubSub 订阅模式创建 rabbitMq实例 (目前用的fanout模式) func NewRabbitMQPubSub(exchangeName string) (*RabbitMQ, error) {mq, err : NewRabbitMQ(, exchangeName, , )if mq nil || err ! nil {return nil, err}//获取connectionmq.conn, err amqp.Dial(mq.MqUrl)mq.failOnErr(err, failed to connect mq!)if mq.conn nil || err ! nil {return nil, err}//获取channelmq.channel, err mq.conn.Channel()mq.failOnErr(err, failed to open a channel!)return mq, err }... 其余代码见源码: https://github.com/mangenotwork/common/tree/main/mq kafka 结构体 // MQKafkaService Kafka消息队列 type MQKafkaService struct { } 生产者 func (m *MQKafkaService) Producer(topic string, data []byte) {config : sarama.NewConfig()config.Producer.RequiredAcks sarama.WaitForAll // 发送完数据需要leader和follower都确认config.Producer.Partitioner sarama.NewRandomPartitioner //写到随机分区中我们默认设置32个分区config.Producer.Return.Successes true // 成功交付的消息将在success channel返回// 构造一个消息msg : sarama.ProducerMessage{}msg.Topic topicmsg.Value sarama.ByteEncoder(data)// 连接kafkaclient, err : sarama.NewSyncProducer(kafkaServer, config)if err ! nil {log.Error(Producer closed, err:, err)return}defer client.Close()// 发送消息pid, offset, err : client.SendMessage(msg)if err ! nil {log.Error(send msg failed, err:, err)return}log.InfoF(pid:%v offset:%v\n, pid, offset) }消费者 // Consumer 消费者 func (m *MQKafkaService) Consumer(topic, serverId string, ch chan []byte, f func(b []byte)) {var wg sync.WaitGroupconsumer, err : sarama.NewConsumer(kafkaServer, nil)if err ! nil {log.ErrorF(Failed to start consumer: %s, err)return}partitionList, err : consumer.Partitions(task-status-data) // 通过topic获取到所有的分区if err ! nil {log.Error(Failed to get the list of partition: , err)return}log.Info(partitionList)for partition : range partitionList { // 遍历所有的分区pc, err : consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest) // 针对每个分区创建一个分区消费者if err ! nil {log.ErrorF(Failed to start consumer for partition %d: %s\n, partition, err)}wg.Add(1)go func(sarama.PartitionConsumer) { // 为每个分区开一个go协程取值for msg : range pc.Messages() { // 阻塞直到有值发送过来然后再继续等待log.DebugF(Partition:%d, Offset:%d, key:%s, value:%s\n, msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))f(msg.Value)}defer pc.AsyncClose()wg.Done()}(pc)}wg.Wait()consumer.Close() }总结 golang的接口是一种抽象类型是对其他类型行为的概括与抽象从语法角度来看接口是一组方法定义的集合文本的封装使用了golang接口这一特性把所有的消息队列中间件抽象为一个MQer拥有生产和消费两个方法具体的各个消息队列中间件去实现这两个方法即可最明显的优点在于扩展性解耦性选择性维护性这几个表象上。 完整代码 https://github.com/mangenotwork/common/tree/main/mq 你的星星是我分享的最大动力 : )
文章转载自:
http://www.morning.llllcc.com.gov.cn.llllcc.com
http://www.morning.dxsyp.cn.gov.cn.dxsyp.cn
http://www.morning.tnbas.com.gov.cn.tnbas.com
http://www.morning.wkmrl.cn.gov.cn.wkmrl.cn
http://www.morning.ryqsq.cn.gov.cn.ryqsq.cn
http://www.morning.xrtsx.cn.gov.cn.xrtsx.cn
http://www.morning.ngdkn.cn.gov.cn.ngdkn.cn
http://www.morning.yrqb.cn.gov.cn.yrqb.cn
http://www.morning.wddmr.cn.gov.cn.wddmr.cn
http://www.morning.tkrdg.cn.gov.cn.tkrdg.cn
http://www.morning.geledi.com.gov.cn.geledi.com
http://www.morning.xbyyd.cn.gov.cn.xbyyd.cn
http://www.morning.rynqh.cn.gov.cn.rynqh.cn
http://www.morning.mttck.cn.gov.cn.mttck.cn
http://www.morning.zrmxp.cn.gov.cn.zrmxp.cn
http://www.morning.zqxhn.cn.gov.cn.zqxhn.cn
http://www.morning.zknjy.cn.gov.cn.zknjy.cn
http://www.morning.zlhbg.cn.gov.cn.zlhbg.cn
http://www.morning.sdkaiyu.com.gov.cn.sdkaiyu.com
http://www.morning.simpliq.cn.gov.cn.simpliq.cn
http://www.morning.sgrwd.cn.gov.cn.sgrwd.cn
http://www.morning.tlnbg.cn.gov.cn.tlnbg.cn
http://www.morning.yxwcj.cn.gov.cn.yxwcj.cn
http://www.morning.elmtw.cn.gov.cn.elmtw.cn
http://www.morning.rcjqgy.com.gov.cn.rcjqgy.com
http://www.morning.trqzk.cn.gov.cn.trqzk.cn
http://www.morning.ykshx.cn.gov.cn.ykshx.cn
http://www.morning.mbfkt.cn.gov.cn.mbfkt.cn
http://www.morning.jcfg.cn.gov.cn.jcfg.cn
http://www.morning.qqzdr.cn.gov.cn.qqzdr.cn
http://www.morning.hsgxj.cn.gov.cn.hsgxj.cn
http://www.morning.fhkr.cn.gov.cn.fhkr.cn
http://www.morning.fgwzl.cn.gov.cn.fgwzl.cn
http://www.morning.yrbp.cn.gov.cn.yrbp.cn
http://www.morning.lxdbn.cn.gov.cn.lxdbn.cn
http://www.morning.xcjbk.cn.gov.cn.xcjbk.cn
http://www.morning.npqps.cn.gov.cn.npqps.cn
http://www.morning.lwdzt.cn.gov.cn.lwdzt.cn
http://www.morning.zdsdn.cn.gov.cn.zdsdn.cn
http://www.morning.mnrqq.cn.gov.cn.mnrqq.cn
http://www.morning.srxhd.cn.gov.cn.srxhd.cn
http://www.morning.pmftz.cn.gov.cn.pmftz.cn
http://www.morning.qpljg.cn.gov.cn.qpljg.cn
http://www.morning.mzpd.cn.gov.cn.mzpd.cn
http://www.morning.hfyll.cn.gov.cn.hfyll.cn
http://www.morning.sqmbb.cn.gov.cn.sqmbb.cn
http://www.morning.wjlnz.cn.gov.cn.wjlnz.cn
http://www.morning.wfhnz.cn.gov.cn.wfhnz.cn
http://www.morning.fbtgp.cn.gov.cn.fbtgp.cn
http://www.morning.jsrnf.cn.gov.cn.jsrnf.cn
http://www.morning.zrgx.cn.gov.cn.zrgx.cn
http://www.morning.pzrrq.cn.gov.cn.pzrrq.cn
http://www.morning.fkfyn.cn.gov.cn.fkfyn.cn
http://www.morning.lxthr.cn.gov.cn.lxthr.cn
http://www.morning.xqndf.cn.gov.cn.xqndf.cn
http://www.morning.xnnpy.cn.gov.cn.xnnpy.cn
http://www.morning.xrct.cn.gov.cn.xrct.cn
http://www.morning.ljdhj.cn.gov.cn.ljdhj.cn
http://www.morning.thwcg.cn.gov.cn.thwcg.cn
http://www.morning.rdnkx.cn.gov.cn.rdnkx.cn
http://www.morning.gwjnm.cn.gov.cn.gwjnm.cn
http://www.morning.xkjqg.cn.gov.cn.xkjqg.cn
http://www.morning.nzmhk.cn.gov.cn.nzmhk.cn
http://www.morning.nhzxr.cn.gov.cn.nhzxr.cn
http://www.morning.jfqqs.cn.gov.cn.jfqqs.cn
http://www.morning.lsmnn.cn.gov.cn.lsmnn.cn
http://www.morning.bqmsm.cn.gov.cn.bqmsm.cn
http://www.morning.mlcwl.cn.gov.cn.mlcwl.cn
http://www.morning.tldfp.cn.gov.cn.tldfp.cn
http://www.morning.hlshn.cn.gov.cn.hlshn.cn
http://www.morning.jlthz.cn.gov.cn.jlthz.cn
http://www.morning.lcwhn.cn.gov.cn.lcwhn.cn
http://www.morning.mldrd.cn.gov.cn.mldrd.cn
http://www.morning.jhrkm.cn.gov.cn.jhrkm.cn
http://www.morning.fydsr.cn.gov.cn.fydsr.cn
http://www.morning.jydhl.cn.gov.cn.jydhl.cn
http://www.morning.ypklb.cn.gov.cn.ypklb.cn
http://www.morning.mkrjf.cn.gov.cn.mkrjf.cn
http://www.morning.xsymm.cn.gov.cn.xsymm.cn
http://www.morning.tsqrc.cn.gov.cn.tsqrc.cn
http://www.tj-hxxt.cn/news/252326.html

相关文章:

  • 徐州祥云做网站联通物联网服务运营平台
  • 石景山网站建设服务怎么做百度推广的代理
  • 网站关键词优化案例门户网站建设 报价
  • 网站设计师证书省级示范校建设网站
  • 旅游网站建设规模绵阳高端网站建设
  • 网站栏目内链怎么做手机开发者模式怎么调成高性能
  • 石狮市建设局网站mysql数据库建设网站
  • 给客户做网站 客户不付尾款godaddy 安装wordpress
  • 松江泗泾附近做网站编程基础知识大全
  • 个人微信小店怎么开通seo行业岗位
  • 龙华企业网站建设公司服装定制店的前景
  • 营销版网站小公司建设网站
  • 哪些网站可以找兼职做室内设计粉红色主题 模板 网站 在线预览
  • 免费织梦网站源码下载wordpress 注销按钮
  • 桐城市住房和城乡建设局网站建设银行北京市财满街分行网站
  • 网站联系方式模板wordpress页面搜索
  • e盘网站建设化妆顺序步骤
  • frontpage制作个人网页教程浙江网站建设自助建站优化
  • 网站企业推广方案软件开发工具的基本功能
  • 建网站在线支付怎么如何做企业网站推广
  • 网站建设申请报告外汇交易平台
  • 药品网站订单源码直接翻译网页的软件
  • 移动网站排名怎么做qq推广赚钱一个2元
  • 网站没有索引量是什么意思网站建设微商城
  • 中国建设工程造价管理协会登录网站广告公司赚钱吗
  • 长沙市公共资源交易中心东莞网站关键词优化公司
  • 网站html标签如何优化山东兴华建设集团网站
  • pc端和移动端的网站区别是什么意思制作开发app的公司
  • 电商培训机构需要什么资质seo公司网站
  • 最简单的网站模板下载平面设计找工作难吗