asp net做网站视频,镇江网站设计,关于网站建设论文,创建好网站如何把浏览器连接RabbitMQ
连接方式一#xff1a; 也可以选择使用URI的方式来实现
连接方式二#xff1a; Connection接口被用来创建一个Channel#xff0c;在创建之后#xff0c;Channel可以用来发送或者接收消息。
Channel channel conn.createChannel();使用交换器和队列
声明…连接RabbitMQ
连接方式一 也可以选择使用URI的方式来实现
连接方式二 Connection接口被用来创建一个Channel在创建之后Channel可以用来发送或者接收消息。
Channel channel conn.createChannel();使用交换器和队列
声明一个交换器和队列
channel.exchangeDeclare(exchangeName,direct,true);
String queueName channel.queueDeclare().getQueue();
channel.queueBind(queueName,exchangeName,routingKey);上面创建了一个持久化的、非自动删除的、绑定类型为direct的交换器同时也创建了一个非持久化的、排他的、自动删除的队列队列名称由RabbitMQ自动生成这里的交换器和队列都没有设置特殊的参数。
上面声明的队列具备如下特性只对当前应用中同一个Connection层面可用同一个Connection的不同Channel可共用并且也会在应用连接断开时自动删除。
如果要在应用中共享一个队列可做如下声明
channel.exchangeDeclare(exchangeName,direct,true);
channel.queueDeclare(queueName,true,false,false,null);
channel.queueBind(queueName,exchangeName,routingKey);这里的队列被声明为持久化的、非排他的、非自动删除的而且也被分配给另一个确定的已知名称。
exchangeDeclare方法详解
exchangeDeclare有多个重载方法这些重载方法都是由下面这个方法中缺省的某些参数构成的。
Exchange.DeclareOK exchangeDeclare(String exchange,String type,
boolean durable,boolean autoDelete,boolean internal,MapString,Object arguments)throws Exception;exchange: 交换器名称type: 交换器类型常见的有fanout、direct、topicdurable: 是否持久化durable设置true表示持久化持久化后可以将交换器存盘。autoDelete是否自动删除表示一旦这个交换器上没有任何绑定即没有任何队列与其相连RabbitMQ会自动删除这个交换机。这对于临时的、生命周期与某些特定队列或应用程序密切相关的交换机非常有用。internal 如果为true表明是内置的交换器客户端程序无法直接发送消息到这个交换器只能通过交换器路由到交换器这种方式。argument其他一些结构化参数比如alternate-exchange。
此外其他声明交换机的方法
exchangeDeclareNoWait 表示在声明exchange时候不需要服务器返回。这可能会导致一种情况在声明完一个交换器之后实际服务器还并未完成交换器的创建客户端紧接着使用这个交换器必然会发生异常。exchangeDeclarePassive可以用来检测交换机是否存在如果存在则正常返回如果不存在则抛出异常。
queueDeclare方法详解
queueDeclare方法只有如下两个重载方法
1. Queue.DeclareOK queueDeclare() throws IOException;
2. Queue.DeclareOK queueDeclare(String queue,boolean durable,
boolean exclusive,boolean autoDelete,MapString,Object arguments)
throws IOException;不带任何参数的queueDeclare方法默认创建一个由RabbitMQ 自动生成名称 的、排他的、自动删除、非持久化 的队列。
方法参数详解
queue队列名称durable设置是否持久化持久化的队列会存盘在服务器重启的时候保证不丢失消息。exclusive排他队列autoDelete自动删除当队列中没有任何活跃的消费者RabbitMQ会在一段时间后自动删除该队列。当一个队列删除时其上持久化的消息也会随之被删除。arguments 设置队列的其他参数 排他队列的特点和行为 独占性一旦某个连接声明了一个排他队列任何其他连接都无法访问或者声明同名的队列。自动删除当声明排他队列的连接关闭时RabbitMQ会自动删除这个队列即使队列被声明为持久化的。 同一连接通道共享虽然排他队列对其他连接不可见但同一连接内的不同通道可以共享访问这个排他队列。排他队列常用于那些希望队列仅被当前线程或应用实例使用的场景。 注意生产者和消费者都能够使用queueDeclare声明一个队列如果消费者在同一个信道上订阅了另一个队列就无法在声明队列了一个消费者只能订阅一个队列。必须先取消订阅然后将channel设置为“传输模式”之后才能声明队列。
交换机持久化和队列持久化的区别
交换机持久化主要关注的是交换机的配置和元数据的长期存储确保重启后配置不变。队列持久化关注队列自身及其消息的长期存储需要结合消息的持久化设置来防止消息丢失。
queueBind方法详解
queueBind方法如下
1. Queue.BindOK queueBind(String queue,String exchange,String routingKey)
throws IOException;2. Queue.BindOK queueBind(String queue,String exchange,String routingKey,
MapString,Object arguments) throws IOException;3. void queueBindNoWait(String queue,String exchange,String routingKey,
MapString,Object arguments) throws IOException;参数详解
queue队列名称exchange交换机名称routingKey用来绑定队列和交换机的路由键argument定义绑定的一些参数
exchangeBind方法详解
exchangeBind方法如下 1. Exchange.BindOK exchangeBind(String destination,String source,String routingKey)throws IOException;2. Exchange.BindOK exchangeBind(String destination,String source,String routingKey,
MapString,Object arguments) throws IOException;3. Exchange.BindOK exchangeBindNoWait(String destination,String source,
String routingKey,MapString,Object arguments) throws IOException;绑定以后消息从source交换机转发到destination交换机。某种程度上可以将destination交换机看作一个队列。
发送消息
如果要发送一个消息可以使用Channel类的basicPublish方法。
示例
byte[] messageBodyBytes Hello,World!.getBytes();
channel.basicPublish(exchangeName,routingKey,null,messageBodyBytes);对于basicPublish而言有几个重载方法
1. void basicPublish(String exchange,String routingKey,
BasicProperties props,byte[] body) throws IOException;2.void basicPublish(String exchange,String routingKey,boolean mandatory,
BasicProperties props,byte[] body) throws IOException;3. void basicPublish(String exchange,String routingKey,boolean mandatory,
boolean immediate,BasicProperties props,byte[] body) throws IOException;exchange 交换机的名称指明消息需要发送到哪个交换机中如果设置为空则消息会被发送到RabbitMQ默认的交换机中。routingKey路由键交换机根据路由键将消息存储到相应的队列中。props消息的基本属性集包括contentType、deliveryMode等byte[] body: 真正要发送的消息内容
消费消息
消费模式分为两种Push模式和Pull模式。推模式采用Basic.Consume进行消费拉模式采用Basic.Get进行消费。
推模式
当调用Consumer相关API方法时不同的订阅采用不同的消费者标签consumerTag来区分彼此在同一个Channel中的消费者也需要通过唯一的消费者标签以作区分关键消费代码如下所示 对于消费者来说显示的设置autoAck为false接收消息之后进行显示ack操作这个设置是非常必要的。可以防止消息不必要的丢失。
核心方法
String basicConsume(String queue,boolean autoack,String consumerTag,
boolean noLocal,boolean exclusive,MapString,Object arguments,Consumer callback)
throws IOException;queue: 队列的名称autoAck: 设置是否自动确认consumerTag消费者标签用来区分多个消费者noLocal设置true表示不能将一个Connection中生产者发送的消息发送给这个Connection中的消费者。exclusive设置是否排他确保该队列仅对创建他的消费者可见。arguments设置消费者的其他参数callback设置消费者的回调函数。
每个Channel都拥有自己独立的线程最常用的做法是一个Channel对应一个消费者也就意味着消费者彼此之间没有关联也可以在Channel中维持多个消费者但是如果Channel中一个消费者一直在运行那其他消费者的callback会被耽搁。
拉模式
通过channel.basicGet方法可以单条获取消息其返回值是GetResponse。核心方法如下
GetResponse basicGet(String queue,boolean autoAck) throws IOException;如果autoAck为false那么同样需要调用channel.basicAck来确认消息已经被成功接受。
GetResponse response channel.basicGet(QueueName,false);System.out.println(new String(response.getBody()));channel.basicAck(response.getEnvelope().getDeliveryTag(),false);注意Basic.Consume将信道Channel置为接收模式直到取消队列的订阅为止接收模式期间RabbitMQ会不断将消息推送给消费者推送消息的个数受到Basic.QoS的限制。如果只想从队列里获取单条消息而不是持续订阅建议使用Basic.Get进行消费。但是不能将Basic.Get放在一个循环里代替Basic.Consume这样做会严重影响MQ性能。
消费端的确认和拒绝
消息确认
RabbitMQ为了保证消息从队列可靠的到达消费者提供了消息确认机制。消费者在订阅队列时可以指定autoAck参数当autoAck等于false时RabbitMQ会等待消费者显示的回复确认信号后才从内存或者磁盘中移除消息即使消息配置了持久化在被ack以后仍然要被删除。当autoAck等于true时RabbitMQ会自动把发送出去的消息设置为确认然后从内存中删除而不管消费者是否真正消费这些消息。
把消息确认设置为false消费者就有足够的时间处理消息不用担心处理过程中消费者进程挂掉后消息丢失的问题因为RabbitMQ会一直等待持有消息直到消费者显示调用Basic.Ack命令为止。
当autoAck参数置为false对于MQ服务端而言队列中的消息分成了两个部分
等待投递给消费者的消息已经投递给消费者但是还没有收到消费者确认信号的消息如果一直没有收到确认信号消费此消息的消费者已经断开连接则MQ会重新安排该消息进入队列等待投递给下一个消费者。
MQ不会为未确认的消息设置过期时间他判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者连接是否已经断开。如此设计是因为RabbitMQ允许消费者消费一条消息的时间可以很久很久。
消息拒绝
Channel类中的basicReject方法定义如下
void basicReject(long deliveryTag,boolean requeue) throws IOException;其中deliveryTag可以看作消息的编号如果requeue为true则RabbitMQ会重新将这条消息存入队列以便发送给下一个订阅的消费者。
如果想要批量拒绝消息可以使用Basic.Nack这个命令。
void basicNack(long deliveryTag,boolean multiple,boolean requeue) throws IOException;multiple参数为false表示只拒绝编号为deliveryTag的这条消息如果设置为true表示拒绝编号deliveryTag之前所有未被消费者确认的消息。
关闭连接
在应用程序使用完毕之后需要关闭连接释放资源
channel.close();
connection.close();AMQP协议中connection和channel采用同样的方式来管理网络失败、内部错误和显示关闭连接。connection和channel所具备的生命周期如下
Open开启状态代表当前对象可以使用Closing正在关闭状态当前对象被显示通知调用关闭方法这样就产生了一个关闭请求让其内部对象进行相应的操作并等待这些关闭操作的完成。Closed已经关闭当前对象已经接收到所有内部对象以完成关闭动作的通知并且其也关闭了自身。
与关闭操作相关的方法
addShutdownListener(ShutdownListener listener): 当connection或者channel状态转变为closed的时候调用ShutdownListener而如果将一个ShutdownListener注册到一个已经处于Closed状态的对象特指Connection或者Channel对象会立刻调用ShutdownListener。removeShutdownListener(ShutdownListener listener)getCloseReason: 获取connection或者channel关闭原因isOpen:检测当前对象是否开启close(int closeCode,String closeMessage):显示通知连接执行关闭操作。
代码清单 当触发ShutdownListener时候可以获取到ShutdownSingalException这个信号包含了关闭的原因。
ShutdownSingalException 提供多个方法来分析关闭原因isHardError方法可以知道是Connection错误还是Channel错误getReason可以获取Cause相关的信息。