微信做代理的网站,linux做网站配置,网站建设补救方法,wordpress怎么保存xml最近在使用Golang做了一个网盘项目#xff08;类似百度网盘#xff09;#xff0c;这个网盘项目有一个功能描述如下#xff1a;用户会删除一个文件到垃圾回收站#xff0c;回收站的文件有一个时间期限#xff0c;比如24h#xff0c;24h后数据库中记录和oss中文件会被删除…最近在使用Golang做了一个网盘项目类似百度网盘这个网盘项目有一个功能描述如下用户会删除一个文件到垃圾回收站回收站的文件有一个时间期限比如24h24h后数据库中记录和oss中文件会被删除在之前的版本中可以使用定时任务来检查数据库记录中删除时间来判断是否删除但是这不是最佳的因此考虑如何基于RabbitMQ来实现这个功能。
使用RabbitMQ的架构 代码
因为前端有点麻烦这里全部使用Golang后端来模拟实现整个架构包括生产端和消费端。这里有一些细节
注意交换机和队列的绑定一定要细心交换机一旦声明了就不能更改如果要发生一些属性的更改就要删除原来的内容重新生成下列的内容不包含RabbitMQ持久化的内容
package mainimport (fmtgithub.com/streadway/amqplogstrings
)func InitRabbitMQ() *amqp.Connection {mq : amqphost : 127.0.0.1port : 5672user : rootpwd : rootdns : strings.Join([]string{mq, ://, user, :, pwd,, host, :, port, /}, )conn, err : amqp.Dial(dns)if err ! nil {log.Fatalf(Failed to connect to RabbitMQ: %v, err)}return conn
}func InitMainExchangeAndQueue(ch *amqp.Channel, userID string) *amqp.Channel {// 队列信息exchangeName : main_exchangequeueName : fmt.Sprintf(user_queue_%s, userID)messageTTL : int32(300000)// 声明主交换机err : ch.ExchangeDeclare(exchangeName, // 交换机名direct, // Exchange typefalse, // Durablefalse, // Auto-deletedfalse, // Internalfalse, // No-waitnil, // Arguments)if err ! nil {log.Fatalf(Failed to declare an main exchange: %v, err)}// 声明用户队列_, err ch.QueueDeclare(queueName, // 队列名false, // Durablefalse, // Delete when unusedfalse, // Exclusivefalse, // No-waitamqp.Table{x-dead-letter-routing-key: dead, // routing-keyx-dead-letter-exchange: dead_exchange, // 死信交换机x-message-ttl: messageTTL, // TTL},)if err ! nil {log.Fatalf(Failed to declare a queue: %v, err)}// 绑定err ch.QueueBind(queueName, userID, main_exchange, false, nil)if err ! nil {log.Fatalf(Failed to bind queue to exchange: %v, err)}return ch
}func InitDeadExchangeAndQueue(ch *amqp.Channel) {// 声明死信交换机err : ch.ExchangeDeclare(dead_exchange,amqp.ExchangeDirect,true,false,false,false,nil,)if err ! nil {log.Fatalf(Failed to declare an dead exchange: %v, err)}// 声明一个死信队列_, err ch.QueueDeclare(dead_queue,true,false,false,false,nil)if err ! nil {log.Fatalf(Failed to declare a queue: %v, err)}// 绑定err ch.QueueBind(dead_queue, dead, dead_exchange, false, nil)if err ! nil {log.Fatalf(Failed to bind queue to exchange: %v, err)}
}func PublishMessage(ch *amqp.Channel, userID, fileID string) {// 用户信息message : fmt.Sprintf(%s|%s, userID, fileID)exchangeName : main_exchange// 发布用户消息err : ch.Publish(exchangeName, // ExchangeuserID, // Routing keyfalse, // Mandatoryfalse, // Immediateamqp.Publishing{ContentType: text/plain,Body: []byte(message),})if err ! nil {log.Fatalf(Failed to publish a message: %v, err)}log.Printf(Message sent to user %s: %s, userID, message)
}func ConsumeTTL(ch *amqp.Channel) {// 声明死信交换机err : ch.ExchangeDeclare(dead_exchange, // 交换机名direct, // Exchange typetrue, // Durablefalse, // Auto-deletedfalse, // Internalfalse, // No-waitnil, // Arguments)if err ! nil {log.Fatalf(Failed to declare a dead letter exchange: %v, err)}// 创建消费者并阻塞等待消费死信队列中的消息megs, err : ch.Consume(dead_queue, // Queue, // Consumerfalse, // Auto-acknowledgefalse, // Exclusivefalse, // No-localfalse, // No-waitnil, // Args)if err ! nil {log.Fatalf(Failed to register a consumer for dead letter queue: %v, err)}// 使用无限循环一直监听fmt.Println(Waiting for message from dead_queue......)for d : range megs {// 实际中处理消息的逻辑例如删除文件或其他操作fmt.Println(string(d.Body))// 消费完成后手动确认消息err d.Ack(false)if err ! nil {log.Fatalf(Failed to ack message: %v, err)}}
}func Consume(ch *amqp.Channel, userID string) {// 下面的信息可以通过前后端进行传递queueName : fmt.Sprintf(user_queue_%s, userID)// 消费消息megs, err : ch.Consume(queueName, // Queue, // Consumertrue, // Auto-acknowledgefalse, // Exclusivefalse, // No-localfalse, // No-waitnil, // Args)if err ! nil {log.Fatalf(Failed to register a consumer: %v, err)}// 这里直接是由前端发送过来的API进行触发所以不用一直阻塞监听d, ok : -megsif !ok {log.Fatalf(Failed to get message: %v, err)}fmt.Println(string(d.Body))// 消息完成后确认消息err d.Ack(true)if err ! nil {log.Fatalf(Failed to ack message: %v, err)}
}func main() {// 获取客户端client : InitRabbitMQ()defer client.Close()ch, err : client.Channel()if err ! nil {log.Fatalf(Failed to open a channel: %v, err)}defer ch.Close()//ConsumeTTL(ch)// 构造dead_exchange及dead_queue// InitDeadExchangeAndQueue(ch)// 假设这是web请求信息//var userID1 test-id1//var fileID1 file1// 构造main_exchange及user_queue//ch InitMainExchangeAndQueue(ch, userID1)// 针对用户1:假设还消息没有过期时候就被recovery即在user_queue中就被消费实际中发布消息的这部分逻辑应当放在前端中//PublishMessage(ch, userID1, fileID1)//time.Sleep(20 * time.Second)// 模拟后端消费消息//Consume(ch, userID1)// 针对用户2模拟其不被后端消费过期到死信队列中var userID2 test-id2var fileID2 file2ch InitMainExchangeAndQueue(ch, userID2)PublishMessage(ch, userID2, fileID2)// 注意这个消息没有被消费理论上应当被死信队列消费
}
从dead_exchange中消费 文章转载自: http://www.morning.rnrfs.cn.gov.cn.rnrfs.cn http://www.morning.rtsd.cn.gov.cn.rtsd.cn http://www.morning.bwttp.cn.gov.cn.bwttp.cn http://www.morning.sprbs.cn.gov.cn.sprbs.cn http://www.morning.rqqn.cn.gov.cn.rqqn.cn http://www.morning.czcbl.cn.gov.cn.czcbl.cn http://www.morning.zwfgh.cn.gov.cn.zwfgh.cn http://www.morning.mxnrl.cn.gov.cn.mxnrl.cn http://www.morning.rmryl.cn.gov.cn.rmryl.cn http://www.morning.drcnf.cn.gov.cn.drcnf.cn http://www.morning.ssjtr.cn.gov.cn.ssjtr.cn http://www.morning.kcdts.cn.gov.cn.kcdts.cn http://www.morning.wxgd.cn.gov.cn.wxgd.cn http://www.morning.lxmmx.cn.gov.cn.lxmmx.cn http://www.morning.lxdbn.cn.gov.cn.lxdbn.cn http://www.morning.dxsyp.cn.gov.cn.dxsyp.cn http://www.morning.nfccq.cn.gov.cn.nfccq.cn http://www.morning.fhsgw.cn.gov.cn.fhsgw.cn http://www.morning.yhwxn.cn.gov.cn.yhwxn.cn http://www.morning.pqwrg.cn.gov.cn.pqwrg.cn http://www.morning.xnflx.cn.gov.cn.xnflx.cn http://www.morning.lbpfl.cn.gov.cn.lbpfl.cn http://www.morning.xrsqb.cn.gov.cn.xrsqb.cn http://www.morning.mjtft.cn.gov.cn.mjtft.cn http://www.morning.nfbnl.cn.gov.cn.nfbnl.cn http://www.morning.ksgjn.cn.gov.cn.ksgjn.cn http://www.morning.wkqrp.cn.gov.cn.wkqrp.cn http://www.morning.jwxmn.cn.gov.cn.jwxmn.cn http://www.morning.ftync.cn.gov.cn.ftync.cn http://www.morning.jjzjn.cn.gov.cn.jjzjn.cn http://www.morning.zztmk.cn.gov.cn.zztmk.cn http://www.morning.fhqdb.cn.gov.cn.fhqdb.cn http://www.morning.csdgt.cn.gov.cn.csdgt.cn http://www.morning.dbtdy.cn.gov.cn.dbtdy.cn http://www.morning.yrjfb.cn.gov.cn.yrjfb.cn http://www.morning.hlfnh.cn.gov.cn.hlfnh.cn http://www.morning.wyfpc.cn.gov.cn.wyfpc.cn http://www.morning.ptqpd.cn.gov.cn.ptqpd.cn http://www.morning.jcwhk.cn.gov.cn.jcwhk.cn http://www.morning.tfsyk.cn.gov.cn.tfsyk.cn http://www.morning.wgzzj.cn.gov.cn.wgzzj.cn http://www.morning.wpspf.cn.gov.cn.wpspf.cn http://www.morning.xlyt.cn.gov.cn.xlyt.cn http://www.morning.swkzr.cn.gov.cn.swkzr.cn http://www.morning.gmdtk.cn.gov.cn.gmdtk.cn http://www.morning.hbpjb.cn.gov.cn.hbpjb.cn http://www.morning.zmyzt.cn.gov.cn.zmyzt.cn http://www.morning.sbwr.cn.gov.cn.sbwr.cn http://www.morning.pfcrq.cn.gov.cn.pfcrq.cn http://www.morning.kcsx.cn.gov.cn.kcsx.cn http://www.morning.kpgft.cn.gov.cn.kpgft.cn http://www.morning.kstgt.cn.gov.cn.kstgt.cn http://www.morning.fpjxs.cn.gov.cn.fpjxs.cn http://www.morning.wynnb.cn.gov.cn.wynnb.cn http://www.morning.ypcbm.cn.gov.cn.ypcbm.cn http://www.morning.pzbjy.cn.gov.cn.pzbjy.cn http://www.morning.jzxqj.cn.gov.cn.jzxqj.cn http://www.morning.fxxmj.cn.gov.cn.fxxmj.cn http://www.morning.ttvtv.cn.gov.cn.ttvtv.cn http://www.morning.xiaobaixinyong.cn.gov.cn.xiaobaixinyong.cn http://www.morning.mzydm.cn.gov.cn.mzydm.cn http://www.morning.nrzbq.cn.gov.cn.nrzbq.cn http://www.morning.pbwcq.cn.gov.cn.pbwcq.cn http://www.morning.zwgrf.cn.gov.cn.zwgrf.cn http://www.morning.cjsnj.cn.gov.cn.cjsnj.cn http://www.morning.kcypc.cn.gov.cn.kcypc.cn http://www.morning.cpnsh.cn.gov.cn.cpnsh.cn http://www.morning.ktyww.cn.gov.cn.ktyww.cn http://www.morning.lpmdy.cn.gov.cn.lpmdy.cn http://www.morning.dplmq.cn.gov.cn.dplmq.cn http://www.morning.zlces.com.gov.cn.zlces.com http://www.morning.mrfbp.cn.gov.cn.mrfbp.cn http://www.morning.hlmkx.cn.gov.cn.hlmkx.cn http://www.morning.yjdql.cn.gov.cn.yjdql.cn http://www.morning.fhhry.cn.gov.cn.fhhry.cn http://www.morning.qrwdg.cn.gov.cn.qrwdg.cn http://www.morning.plgbh.cn.gov.cn.plgbh.cn http://www.morning.gnbtp.cn.gov.cn.gnbtp.cn http://www.morning.pnmnl.cn.gov.cn.pnmnl.cn http://www.morning.mnkz.cn.gov.cn.mnkz.cn