网站语言那种好,公司部门名称及部门职能,宿主选择 网站建设,九一赣州人才网招聘任务队列/工作队列 在上一个教程中#xff0c;我们编写程序从命名的队列发送和接收消息。在这一节中#xff0c;我们将创建一个工作队列#xff0c;该队列将用于在多个工人之间分配耗时的任务。 
工作队列#xff08;又称任务队列#xff09;的主要思想是避免立即执行某些…任务队列/工作队列 在上一个教程中我们编写程序从命名的队列发送和接收消息。在这一节中我们将创建一个工作队列该队列将用于在多个工人之间分配耗时的任务。 
工作队列又称任务队列的主要思想是避免立即执行某些资源密集型任务并且不得不等待这些任务完成。相反我们安排任务异步地同时或在当前任务之后完成。我们将任务封装为消息并将其发送到队列在后台运行的工作进程将取出消息并最终执行任务。当你运行多个工作进程时任务将在他们之间共享。 
这个概念在Web应用中特别有用因为在Web应用中不可能在较短的HTTP请求窗口内处理复杂的任务译注例如注册时发送邮件或短信验证码等场景。 
准备工作 
在本教程的上一部分我们发送了一条包含“ Hello World”的消息。现在我们将发送代表复杂任务的字符串。我们没有实际的任务例如调整图像大小或渲染pdf文件所以我们通过借助time.Sleep函数模拟一些比较耗时的任务。我们会将一些包含.的字符串封装为消息发送到队列中其中每有一个.就表示需要耗费1秒钟的工作例如hello...表示一个将花费三秒钟的假任务。 
我们将稍微修改上一个示例中的send.go代码以允许从命令行发送任意消息。该程序会将任务安排到我们的工作队列中因此我们将其命名为new_task.go 
body : bodyFrom(os.Args)  // 从参数中获取要发送的消息正文
err  ch.Publish(,           // exchangeq.Name,       // routing keyfalse,        // mandatoryfalse,amqp.Publishing {DeliveryMode: amqp.Persistent,ContentType:  text/plain,Body:         []byte(body),})
failOnError(err, Failed to publish a message)
log.Printf( [x] Sent %s, body)下面是bodyFrom函数 
func bodyFrom(args []string) string {var s stringif (len(args)  2) || os.Args[1]   {s  hello} else {s  strings.Join(args[1:],  )}return s
}我们以前的receive.go程序也需要进行一些更改它需要为消息正文中出现的每个.伪造一秒钟的工作。它将从队列中弹出消息并执行任务因此我们将其称为worker.go 
msgs, err : ch.Consume(q.Name, // queue,     // consumertrue,   // auto-ackfalse,  // exclusivefalse,  // no-localfalse,  // no-waitnil,    // args
)
failOnError(err, Failed to register a consumer)forever : make(chan bool)go func() {for d : range msgs {log.Printf(Received a message: %s, d.Body)dot_count : bytes.Count(d.Body, []byte(.))  // 数一下有几个.t : time.Duration(dot_count)time.Sleep(t * time.Second)  // 模拟耗时的任务log.Printf(Done)}
}()log.Printf( [*] Waiting for messages. To exit press CTRLC)
-forever请注意我们的假任务模拟执行时间。 
然后我们就可以打开两个终端分别执行new_task.go和worker.go了。 
# shell 1
go run worker.go# shell 2
go run new_task.go循环调度 
使用任务队列的优点之一是能够轻松并行化工作。如果我们的工作正在积压我们可以增加更多的工人这样就可以轻松扩展。 
首先让我们尝试同时运行两个worker.go脚本。它们都将从队列中获取消息但是究竟是怎样呢让我们来看看。 
你需要打开三个控制台。其中两个将运行worker.go脚本。这些控制台将成为我们的两个消费者——C1和C2。 
# shell 1
go run worker.go
#  [*] Waiting for messages. To exit press CTRLC# shell 2
go run worker.go
#  [*] Waiting for messages. To exit press CTRLC在第三个控制台中我们将发布新任务。启动消费者之后你可以发布一些消息 
# shell 3
go run new_task.go msg1.
go run new_task.go msg2..
go run new_task.go msg3...
go run new_task.go msg4....
go run new_task.go msg5.....然后我们在shell1和 shell2 两个窗口看到如下输出结果了 
# shell 1
go run worker.go
#  [*] Waiting for messages. To exit press CTRLC
#  [x] Received a message: msg1.
#  [x] Received a message: msg3...
#  [x] Received a message: msg5.....# shell 2
go run worker.go
#  [*] Waiting for messages. To exit press CTRLC
#  [x] Received a message: msg2..
#  [x] Received a message: msg4....默认情况下RabbitMQ将按顺序将每个消息发送给下一个消费者。平均而言每个消费者都会收到相同数量的消息。这种分发消息的方式称为轮询。使用三个或者更多worker试一下。 
消息确认 
work 完成任务可能需要耗费几秒钟如果一个worker在任务执行过程中宕机了该怎么办呢我们当前的代码中RabbitMQ一旦向消费者传递了一条消息便立即将其标记为删除。在这种情况下如果你终止一个worker那么你就可能会丢失这个任务我们还将丢失所有已经交付给这个worker的尚未处理的消息。 
我们不想丢失任何任务如果一个worker意外宕机了那么我们希望将任务交付给其他worker来处理。 
为了确保消息永不丢失RabbitMQ支持 消息确认。消费者发送回一个确认acknowledgement以告知RabbitMQ已经接收处理了特定的消息并且RabbitMQ可以自由删除它。 
如果使用者在不发送确认的情况下死亡其通道已关闭连接已关闭或TCP连接丢失RabbitMQ将了解消息未完全处理并将对其重新排队。如果同时有其他消费者在线它将很快将其重新分发给另一个消费者。这样您可以确保即使工人偶尔死亡也不会丢失任何消息。 
没有任何消息超时RabbitMQ将在消费者死亡时重新传递消息。即使处理一条消息需要很长时间也没关系。 
在本教程中我们将使用手动消息确认方法是为“auto-ack”参数传递一个false然后在完成任务后使用d.Ack(false)从worker发送一个正确的确认这将确认一次传递。 
msgs, err : ch.Consume(q.Name, // queue,     // consumerfalse,  // 注意这里传false,关闭自动消息确认false,  // exclusivefalse,  // no-localfalse,  // no-waitnil,    // args
)
if err ! nil {fmt.Printf(ch.Consume failed, err:%v\n, err)return
}// 开启循环不断地消费消息
forever : make(chan bool)
go func() {for d : range msgs {log.Printf(Received a message: %s, d.Body)dotCount : bytes.Count(d.Body, []byte(.))t : time.Duration(dotCount)time.Sleep(t * time.Second)log.Printf(Done)d.Ack(false) // 手动传递消息确认}
}()使用这段代码我们可以确保即使你在处理消息时使用CTRLC杀死一个worker也不会丢失任何内容。在worker死后不久所有未确认的消息都将被重新发送。 
消息确认必须在接收消息的同一通道Channel上发送。尝试使用不同的通道Channel进行消息确认将导致通道级协议异常。 
忘记确认 
忘记确认是一个常见的错误。这是一个简单的错误但后果是严重的。当你的客户机退出时消息将被重新传递这看起来像随机重新传递但是RabbitMQ将消耗越来越多的内存因为它无法释放任何未确认的消息。 
为了调试这种错误可以使用rabbitmqctl打印messages_unacknowledged字段 sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged在Windows平台去掉sudobashrabbitmqctl.bat list_queues name messages_ready messages_unacknowledged### 消息持久化我们已经学会了如何确保即使消费者死亡任务也不会丢失。但是如果RabbitMQ服务器停止运行我们的任务仍然会丢失。当RabbitMQ退出或崩溃时它将忘记队列和消息除非您告诉它不要这样做。要确保消息不会丢失需要做两件事我们需要将队列和消息都标记为持久的。首先我们需要确保队列能够在RabbitMQ节点重新启动后继续运行。为此我们需要声明它是持久的go
q, err : ch.QueueDeclare(hello, // nametrue,    // 声明为持久队列false,   // delete when unusedfalse,   // exclusivefalse,   // no-waitnil,     // arguments
)虽然这个命令本身是正确的但它在我们当前的设置中不起作用。这是因为我们已经定义了一个名为hello的队列它不是持久的。RabbitMQ不允许你使用不同的参数重新定义现有队列并将向任何尝试重新定义的程序返回错误。但是有一个快速的解决方法——让我们声明一个具有不同名称的队列例如task_queue 
q, err : ch.QueueDeclare(task_queue, // nametrue,         // 声明为持久队列false,        // delete when unusedfalse,        // exclusivefalse,        // no-waitnil,          // arguments
)这种持久的选项更改需要同时应用于生产者代码和消费者代码。 
在这一点上我们确信即使RabbitMQ重新启动任务队列队列也不会丢失。现在我们需要将消息标记为持久的——通过使用amqp.Publishing中的持久性选项amqp.Persistent。 
err  ch.Publish(,     // exchangeq.Name, // routing keyfalse,  // 立即false,  // 强制amqp.Publishing{DeliveryMode: amqp.Persistent, // 持久交付模式瞬态/持久ContentType:  text/plain,Body:         []byte(body),})有关消息持久性的说明 将消息标记为持久性并不能完全保证消息不会丢失。尽管它告诉RabbitMQ将消息保存到磁盘上但是RabbitMQ接受了一条消息并且还没有保存它时仍然有一个很短的时间窗口。而且RabbitMQ并不是对每个消息都执行fsync(2)——它可能只是保存到缓存中而不是真正写入磁盘。持久性保证不是很强但是对于我们的简单任务队列来说已经足够了。如果您需要更强有力的担保那么您可以使用publisher confirms。 公平分发 
你可能已经注意到调度仍然不能完全按照我们的要求工作。例如在一个有两个worker的情况下当所有的奇数消息都是重消息而偶数消息都是轻消息时一个worker将持续忙碌而另一个worker几乎不做任何工作。嗯RabbitMQ对此一无所知仍然会均匀地发送消息。 
这是因为RabbitMQ只是在消息进入队列时发送消息。它不考虑消费者未确认消息的数量。只是盲目地向消费者发送信息。  为了避免这种情况我们可以将预取计数设置为1。这告诉RabbitMQ不要一次向一个worker发出多个消息。或者换句话说在处理并确认前一条消息之前不要向worker发送新消息。相反它将把它发送给下一个不忙的worker。 
err  ch.Qos(1,     // prefetch count0,     // prefetch sizefalse, // global
)关于队列大小的说明 如果所有的worker都很忙你的queue随时可能会满。你会想继续关注这一点也许需要增加更多的worker或者有一些其他的策略。 完整的代码实例 
我们的new_task.go的最终代码代入如下 
package mainimport (github.com/streadway/amqplogosstrings
)func failOnErrorNew(err error, msg string) {if err ! nil {log.Fatalf(%s: %s, msg, err)}
}
func bodyFrom(args []string) string {var s stringif len(args)  2 || os.Args[1]   {s  hello} else {s  strings.Join(args[1:], )}return s
}func main() {//建立连接conn, err : amqp.Dial(amqp://licong:1234568.130.85.112:5672/)failOnErrorNew(err, Failed to connect to RabbitMQ)defer conn.Close()//获取channelch, err : conn.Channel()failOnErrorNew(err, Failed to open a channel)defer ch.Close()q, err : ch.QueueDeclare(task_queue, //nametrue,         //durablefalse,        //delete when unusedfalse,        //exclusivefalse,        //no-waitnil,          //arguments)failOnErrorNew(err, Failed to declare a queue )body : bodyFrom(os.Args)err  ch.Publish(,     // exchangeq.Name, // routing keyfalse,  // mandatoryfalse,amqp.Publishing{DeliveryMode: amqp.Persistent, //表示将消息设置为持久模式确保在 RabbitMQ 重启后消息能够被恢复。ContentType:  text/plain,Body:         []byte(body),})failOnErrorNew(err, Failed to publish a message)log.Printf( [x] Sent %s, body)} 
work.go内容如下 
package mainimport (bytesgithub.com/streadway/amqplogtime
)func failOnErrorWork(err error, msg string) {if err ! nil {log.Fatalf(%s: %s, msg, err)}
}
func main() {//建立连接conn, err : amqp.Dial(amqp://licong:1234568.130.85.112:5672/)failOnErrorWork(err, Failed to connect to RabbitMQ)defer conn.Close()//获取channelch, err : conn.Channel()failOnErrorWork(err, Failed to open a channel)defer ch.Close()//声明队列q, err : ch.QueueDeclare(task_queue, //nametrue,         //durablefalse,        //delete when unusedfalse,        //exclusivefalse,        //no-waitnil,          //argument)failOnErrorWork(err, Failed to declare a queue)err  ch.Qos(1,     //prefetch count 消费者一次能获取的最大未确认消息数量0,     //prefetch size 消费者一次能获取的最大未确认消息的总大小以字节为单位false, //global  是否将预取配置应用于所有通道。如果为 true则应用于所有通道如果为 false则仅应用于当前通道。)//获取接收消息的Delivery通道msgs, err : ch.Consume(q.Name, //queue,     //consumerfalse,  //注意这里传false,关闭自动消息确认false,  //exclusivefalse,  //no-localfalse,  //no-waitnil,    //args)failOnErrorWork(err, Failed to register a consumer)forever : make(chan bool)go func() {for d : range msgs {log.Printf(Reveived a message:%s, d.Body)dot_count : bytes.Count(d.Body, []byte(.)) //数一下有几个t : time.Duration(dot_count)time.Sleep(t * time.Second)log.Printf(Done)d.Ack(false) //手动传递消息确认}}()log.Printf([*] Waiting for messages. To exit press CTRLC)-forever
} 
源自https://www.rabbitmq.com/getstarted.html 文章转载自: http://www.morning.nyzmm.cn.gov.cn.nyzmm.cn http://www.morning.zbhfs.cn.gov.cn.zbhfs.cn http://www.morning.fbzyc.cn.gov.cn.fbzyc.cn http://www.morning.pqsys.cn.gov.cn.pqsys.cn http://www.morning.jydky.cn.gov.cn.jydky.cn http://www.morning.rfmzc.cn.gov.cn.rfmzc.cn http://www.morning.tgydf.cn.gov.cn.tgydf.cn http://www.morning.mfjfh.cn.gov.cn.mfjfh.cn http://www.morning.fstdf.cn.gov.cn.fstdf.cn http://www.morning.mcmpq.cn.gov.cn.mcmpq.cn http://www.morning.tpdg.cn.gov.cn.tpdg.cn http://www.morning.pffqh.cn.gov.cn.pffqh.cn http://www.morning.jwmws.cn.gov.cn.jwmws.cn http://www.morning.spwln.cn.gov.cn.spwln.cn http://www.morning.lmpfk.cn.gov.cn.lmpfk.cn http://www.morning.bpmth.cn.gov.cn.bpmth.cn http://www.morning.xfncq.cn.gov.cn.xfncq.cn http://www.morning.bzsqr.cn.gov.cn.bzsqr.cn http://www.morning.xgmf.cn.gov.cn.xgmf.cn http://www.morning.zbqsg.cn.gov.cn.zbqsg.cn http://www.morning.pxlql.cn.gov.cn.pxlql.cn http://www.morning.lgtcg.cn.gov.cn.lgtcg.cn http://www.morning.blfgh.cn.gov.cn.blfgh.cn http://www.morning.cjxqx.cn.gov.cn.cjxqx.cn http://www.morning.lhqw.cn.gov.cn.lhqw.cn http://www.morning.xkgyh.cn.gov.cn.xkgyh.cn http://www.morning.gmplp.cn.gov.cn.gmplp.cn http://www.morning.bnbtp.cn.gov.cn.bnbtp.cn http://www.morning.tgtrk.cn.gov.cn.tgtrk.cn http://www.morning.ckhpg.cn.gov.cn.ckhpg.cn http://www.morning.lstmq.cn.gov.cn.lstmq.cn http://www.morning.nmfml.cn.gov.cn.nmfml.cn http://www.morning.bxrlt.cn.gov.cn.bxrlt.cn http://www.morning.dgmjm.cn.gov.cn.dgmjm.cn http://www.morning.frllr.cn.gov.cn.frllr.cn http://www.morning.kwblwbl.cn.gov.cn.kwblwbl.cn http://www.morning.hlfnh.cn.gov.cn.hlfnh.cn http://www.morning.yfcbf.cn.gov.cn.yfcbf.cn http://www.morning.skql.cn.gov.cn.skql.cn http://www.morning.fnlnp.cn.gov.cn.fnlnp.cn http://www.morning.nrddx.com.gov.cn.nrddx.com http://www.morning.xnflx.cn.gov.cn.xnflx.cn http://www.morning.zlbjx.cn.gov.cn.zlbjx.cn http://www.morning.thpns.cn.gov.cn.thpns.cn http://www.morning.lmxzw.cn.gov.cn.lmxzw.cn http://www.morning.lhytw.cn.gov.cn.lhytw.cn http://www.morning.jkfyt.cn.gov.cn.jkfyt.cn http://www.morning.yfmxn.cn.gov.cn.yfmxn.cn http://www.morning.lmhwm.cn.gov.cn.lmhwm.cn http://www.morning.jzccn.cn.gov.cn.jzccn.cn http://www.morning.lqrpk.cn.gov.cn.lqrpk.cn http://www.morning.fhqdb.cn.gov.cn.fhqdb.cn http://www.morning.clzly.cn.gov.cn.clzly.cn http://www.morning.myhpj.cn.gov.cn.myhpj.cn http://www.morning.yunease.com.gov.cn.yunease.com http://www.morning.fsfz.cn.gov.cn.fsfz.cn http://www.morning.qwbht.cn.gov.cn.qwbht.cn http://www.morning.cpwmj.cn.gov.cn.cpwmj.cn http://www.morning.mdpkf.cn.gov.cn.mdpkf.cn http://www.morning.hwsgk.cn.gov.cn.hwsgk.cn http://www.morning.pgkpt.cn.gov.cn.pgkpt.cn http://www.morning.gwmjy.cn.gov.cn.gwmjy.cn http://www.morning.kghhl.cn.gov.cn.kghhl.cn http://www.morning.rlhgx.cn.gov.cn.rlhgx.cn http://www.morning.cjsrg.cn.gov.cn.cjsrg.cn http://www.morning.tlpsd.cn.gov.cn.tlpsd.cn http://www.morning.sfrw.cn.gov.cn.sfrw.cn http://www.morning.hymmq.cn.gov.cn.hymmq.cn http://www.morning.qlhwy.cn.gov.cn.qlhwy.cn http://www.morning.zcwzl.cn.gov.cn.zcwzl.cn http://www.morning.vuref.cn.gov.cn.vuref.cn http://www.morning.ltkms.cn.gov.cn.ltkms.cn http://www.morning.rmyqj.cn.gov.cn.rmyqj.cn http://www.morning.wxccm.cn.gov.cn.wxccm.cn http://www.morning.wqbrg.cn.gov.cn.wqbrg.cn http://www.morning.ftsmg.com.gov.cn.ftsmg.com http://www.morning.fpngg.cn.gov.cn.fpngg.cn http://www.morning.rbnnq.cn.gov.cn.rbnnq.cn http://www.morning.kgtyj.cn.gov.cn.kgtyj.cn http://www.morning.sbrjj.cn.gov.cn.sbrjj.cn