做网站的的步骤怎么写,单页设计思路,做西点的网站,西安市城乡建设管理局网站虚拟主机设计 创建 VirtualHost实现构造⽅法和 getter创建交换机删除交换机创建队列删除队列创建绑定删除绑定发布消息 ★路由规则1) 实现 route ⽅法2) 实现 checkRoutingKeyValid3) 实现 checkBindingKeyValid4) 实现 routeTopic5) 匹配规则测试⽤例6) 测试 Router 订阅消息1… 虚拟主机设计 创建 VirtualHost实现构造⽅法和 getter创建交换机删除交换机创建队列删除队列创建绑定删除绑定发布消息 ★路由规则1) 实现 route ⽅法2) 实现 checkRoutingKeyValid3) 实现 checkBindingKeyValid4) 实现 routeTopic5) 匹配规则测试⽤例6) 测试 Router 订阅消息1) 添加⼀个订阅者2) 创建订阅者管理管理类3) 添加令牌接⼝4) 实现添加订阅者给 MsgQueue 添加⼀个订阅者列表5) 实现扫描线程6) 实现消费消息 ⼩结消息确认测试 VirtualHost 创建 VirtualHost
创建 mqserver.VirtualHost 其中 Router ⽤来定义转发规则, ConsumerManager ⽤来实现消息消费. 这两个内容后续再介绍
实现构造⽅法和 getter 创建交换机
• 此处的 autoDelete, arguments 其实并没有使⽤. 只是先预留出来. (RabbitMQ 是⽀持的) . • 约定, 交换机/队列的名字, 都加上 VirtualHostName 作为前缀. 这样不同 VirtualHost 中就可以存在 同名的交换机或者队列了. • exchangeDeclare 的语义是, 不存在就创建, 存在则直接返回. 因此不叫做 “exchangeCreate”. • 先写硬盘, 后写内存. 因为写硬盘失败概率更⼤. 如果硬盘写失败了, 也就不必写内存了. 删除交换机 创建队列 删除队列 创建绑定 删除绑定 发布消息 ★
• 发布消息其实是把消息发送给指定的 Exchange, 再根据 Exchange 和 Queue 的 Binding 关系, 转发到对应队列中. • 发送消息需要指定 routingKey, 这个值的作⽤和 ExchangeType 是相关的. ◦ Direct: routingKey 就是对应队列的名字. 此时不需要 binding 关系, 也不需要 bindingKey, 就可以直接转发消息. ◦ Fanout: routingKey 不起作⽤, bindingKey 也不起作⽤. 此时消息会转发给绑定到该交换机上的所有队列中. ◦ Topic: routingKey 是⼀个特定的字符串, 会和 bindingKey 进⾏匹配. 如果匹配成功, 则发到对应的队列中. 具体规则后续介绍. • BasicProperties 是消息的元信息. body 是消息本体. // 发送消息到指定的交换机/队列中.public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body) {try {// 1. 转换交换机的名字exchangeName virtualHostName exchangeName;// 2. 检查 routingKey 是否合法.if (!router.checkRoutingKey(routingKey)) {throw new MqException([VirtualHost] routingKey 非法! routingKey routingKey);}// 3. 查找交换机对象Exchange exchange memoryDataCenter.getExchange(exchangeName);if (exchange null) {throw new MqException([VirtualHost] 交换机不存在! exchangeName exchangeName);}// 4. 判定交换机的类型if (exchange.getType() ExchangeType.DIRECT) {// 按照直接交换机的方式来转发消息// 以 routingKey 作为队列的名字, 直接把消息写入指定的队列中.// 此时, 可以无视绑定关系.String queueName virtualHostName routingKey;// 5. 构造消息对象Message message Message.createMessageWithId(routingKey, basicProperties, body);// 6. 查找该队列名对应的对象MSGQueue queue memoryDataCenter.getQueue(queueName);if (queue null) {throw new MqException([VirtualHost] 队列不存在! queueName queueName);}// 7. 队列存在, 直接给队列中写入消息sendMessage(queue, message);} else {// 按照 fanout 和 topic 的方式来转发.// 5. 找到该交换机关联的所有绑定, 并遍历这些绑定对象ConcurrentHashMapString, Binding bindingsMap memoryDataCenter.getBindings(exchangeName);for (Map.EntryString, Binding entry : bindingsMap.entrySet()) {// 1) 获取到绑定对象, 判定对应的队列是否存在Binding binding entry.getValue();MSGQueue queue memoryDataCenter.getQueue(binding.getQueueName());if (queue null) {// 此处咱们就不抛出异常了. 可能此处有多个这样的队列.// 希望不要因为一个队列的失败, 影响到其他队列的消息的传输.System.out.println([VirtualHost] basicPublish 发送消息时, 发现队列不存在! queueName binding.getQueueName());continue;}// 2) 构造消息对象Message message Message.createMessageWithId(routingKey, basicProperties, body);// 3) 判定这个消息是否能转发给该队列.// 如果是 fanout, 所有绑定的队列都要转发的.// 如果是 topic, 还需要判定下, bindingKey 和 routingKey 是不是匹配.if (!router.route(exchange.getType(), binding, message)) {continue;}// 4) 真正转发消息给队列sendMessage(queue, message);}}return true;} catch (Exception e) {System.out.println([VirtualHost] 消息发送失败!);e.printStackTrace();return false;}}private void sendMessage(MSGQueue queue, Message message) throws IOException, MqException, InterruptedException {// 此处发送消息, 就是把消息写入到 硬盘 和 内存 上.int deliverMode message.getDeliverMode();// deliverMode 为 1 , 不持久化. deliverMode 为 2 表示持久化.if (deliverMode 2) {diskDataCenter.sendMessage(queue, message);}// 写入内存memoryDataCenter.sendMessage(queue, message);// 此处还需要补充一个逻辑, 通知消费者可以消费消息了.consumerManager.notifyConsume(queue.getName());}路由规则
1) 实现 route ⽅法 public boolean route(ExchangeType exchangeType, Binding binding, Message message) throws MqException {// 根据不同的 exchangeType 使用不同的判定转发规则.if (exchangeType ExchangeType.FANOUT) {// 如果是 FANOUT 类型, 则该交换机上绑定的所有队列都需要转发return true;} else if (exchangeType ExchangeType.TOPIC) {// 如果是 TOPIC 主题交换机, 规则就要更复杂一些.return routeTopic(binding, message);} else {// 其他情况是不应该存在的.throw new MqException([Router] 交换机类型非法! exchangeType exchangeType);}}2) 实现 checkRoutingKeyValid public boolean checkBindingKey(String bindingKey) {if (bindingKey.length() 0) {// 空字符串, 也是合法情况. 比如在使用 direct / fanout 交换机的时候, bindingKey 是用不上的.return true;}// 检查字符串中不能存在非法字符for (int i 0; i bindingKey.length(); i) {char ch bindingKey.charAt(i);if (ch A ch Z) {continue;}if (ch a ch z) {continue;}if (ch 0 ch 9) {continue;}if (ch _ || ch . || ch * || ch #) {continue;}return false;}// 检查 * 或者 # 是否是独立的部分.// aaa.*.bbb 合法情况; aaa.a*.bbb 非法情况.String[] words bindingKey.split(\\.);for (String word : words) {// 检查 word 长度 1 并且包含了 * 或者 # , 就是非法的格式了.if (word.length() 1 (word.contains(*) || word.contains(#))) {return false;}}// 约定一下, 通配符之间的相邻关系(人为(俺)约定的).// 为啥这么约定? 因为前三种相邻的时候, 实现匹配的逻辑会非常繁琐, 同时功能性提升不大~~// 1. aaa.#.#.bbb 非法// 2. aaa.#.*.bbb 非法// 3. aaa.*.#.bbb 非法// 4. aaa.*.*.bbb 合法for (int i 0; i words.length - 1; i) {// 连续两个 ##if (words[i].equals(#) words[i 1].equals(#)) {return false;}// # 连着 *if (words[i].equals(#) words[i 1].equals(*)) {return false;}// * 连着 #if (words[i].equals(*) words[i 1].equals(#)) {return false;}}return true;}3) 实现 checkBindingKeyValid // bindingKey 的构造规则:// 1. 数字, 字母, 下划线// 2. 使用 . 分割成若干部分// 3. 允许存在 * 和 # 作为通配符. 但是通配符只能作为独立的分段.public boolean checkBindingKey(String bindingKey) {if (bindingKey.length() 0) {// 空字符串, 也是合法情况. 比如在使用 direct / fanout 交换机的时候, bindingKey 是用不上的.return true;}// 检查字符串中不能存在非法字符for (int i 0; i bindingKey.length(); i) {char ch bindingKey.charAt(i);if (ch A ch Z) {continue;}if (ch a ch z) {continue;}if (ch 0 ch 9) {continue;}if (ch _ || ch . || ch * || ch #) {continue;}return false;}// 检查 * 或者 # 是否是独立的部分.// aaa.*.bbb 合法情况; aaa.a*.bbb 非法情况.String[] words bindingKey.split(\\.);for (String word : words) {// 检查 word 长度 1 并且包含了 * 或者 # , 就是非法的格式了.if (word.length() 1 (word.contains(*) || word.contains(#))) {return false;}}// 约定一下, 通配符之间的相邻关系(人为(俺)约定的).// 为啥这么约定? 因为前三种相邻的时候, 实现匹配的逻辑会非常繁琐, 同时功能性提升不大~~// 1. aaa.#.#.bbb 非法// 2. aaa.#.*.bbb 非法// 3. aaa.*.#.bbb 非法// 4. aaa.*.*.bbb 合法for (int i 0; i words.length - 1; i) {// 连续两个 ##if (words[i].equals(#) words[i 1].equals(#)) {return false;}// # 连着 *if (words[i].equals(#) words[i 1].equals(*)) {return false;}// * 连着 #if (words[i].equals(*) words[i 1].equals(#)) {return false;}}return true;}
4) 实现 routeTopic // [测试用例]// binding key routing key result// aaa aaa true// aaa.bbb aaa.bbb true// aaa.bbb aaa.bbb.ccc false// aaa.bbb aaa.ccc false// aaa.bbb.ccc aaa.bbb.ccc true// aaa.* aaa.bbb true// aaa.*.bbb aaa.bbb.ccc false// *.aaa.bbb aaa.bbb false// # aaa.bbb.ccc true// aaa.# aaa.bbb true// aaa.# aaa.bbb.ccc true// aaa.#.ccc aaa.ccc true// aaa.#.ccc aaa.bbb.ccc true// aaa.#.ccc aaa.aaa.bbb.ccc true// #.ccc ccc true// #.ccc aaa.bbb.ccc trueprivate boolean routeTopic(Binding binding, Message message) {// 先把这两个 key 进行切分String[] bindingTokens binding.getBindingKey().split(\\.);String[] routingTokens message.getRoutingKey().split(\\.);// 引入两个下标, 指向上述两个数组. 初始情况下都为 0int bindingIndex 0;int routingIndex 0;// 此处使用 while 更合适, 每次循环, 下标不一定就是 1, 不适合使用 forwhile (bindingIndex bindingTokens.length routingIndex routingTokens.length) {if (bindingTokens[bindingIndex].equals(*)) {// [情况二] 如果遇到 * , 直接进入下一轮. * 可以匹配到任意一个部分!!bindingIndex;routingIndex;continue;} else if (bindingTokens[bindingIndex].equals(#)) {// 如果遇到 #, 需要先看看有没有下一个位置.bindingIndex;if (bindingIndex bindingTokens.length) {// [情况三] 该 # 后面没东西了, 说明此时一定能匹配成功了!return true;}// [情况四] # 后面还有东西, 拿着这个内容, 去 routingKey 中往后找, 找到对应的位置.// findNextMatch 这个方法用来查找该部分在 routingKey 的位置. 返回该下标. 没找到, 就返回 -1routingIndex findNextMatch(routingTokens, routingIndex, bindingTokens[bindingIndex]);if (routingIndex -1) {// 没找到匹配的结果. 匹配失败return false;}// 找到的匹配的情况, 继续往后匹配.bindingIndex;routingIndex;} else {// [情况一] 如果遇到普通字符串, 要求两边的内容是一样的.if (!bindingTokens[bindingIndex].equals(routingTokens[routingIndex])) {return false;}bindingIndex;routingIndex;}}// [情况五] 判定是否是双方同时到达末尾// 比如 aaa.bbb.ccc 和 aaa.bbb 是要匹配失败的.if (bindingIndex bindingTokens.length routingIndex routingTokens.length) {return true;}return false;}private int findNextMatch(String[] routingTokens, int routingIndex, String bindingToken) {for (int i routingIndex; i routingTokens.length; i) {if (routingTokens[i].equals(bindingToken)) {return i;}}return -1;}5) 匹配规则测试⽤例
// [测试用例]
// binding key routing key result
// aaa aaa true
// aaa.bbb aaa.bbb true
// aaa.bbb aaa.bbb.ccc false
// aaa.bbb aaa.ccc false
// aaa.bbb.ccc aaa.bbb.ccc true
// aaa.* aaa.bbb true
// aaa.*.bbb aaa.bbb.ccc false
// *.aaa.bbb aaa.bbb false
// # aaa.bbb.ccc true
// aaa.# aaa.bbb true
// aaa.# aaa.bbb.ccc true
// aaa.#.ccc aaa.ccc true
// aaa.#.ccc aaa.bbb.ccc true
// aaa.#.ccc aaa.aaa.bbb.ccc true
// #.ccc ccc true
// #.ccc aaa.bbb.ccc true6) 测试 Router
package com.example.mq;import com.example.mq.common.MqException;
import com.example.mq.mqserver.core.Binding;
import com.example.mq.mqserver.core.ExchangeType;
import com.example.mq.mqserver.core.Message;
import com.example.mq.mqserver.core.Router;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;SpringBootTest
public class RouterTests {private Router router new Router();private Binding binding null;private Message message null;BeforeEachpublic void setUp() {binding new Binding();message new Message();}AfterEachpublic void tearDown() {binding null;message null;}// [测试用例]// binding key routing key result// aaa aaa true// aaa.bbb aaa.bbb true// aaa.bbb aaa.bbb.ccc false// aaa.bbb aaa.ccc false// aaa.bbb.ccc aaa.bbb.ccc true// aaa.* aaa.bbb true// aaa.*.bbb aaa.bbb.ccc false// *.aaa.bbb aaa.bbb false// # aaa.bbb.ccc true// aaa.# aaa.bbb true// aaa.# aaa.bbb.ccc true// aaa.#.ccc aaa.ccc true// aaa.#.ccc aaa.bbb.ccc true// aaa.#.ccc aaa.aaa.bbb.ccc true// #.ccc ccc true// #.ccc aaa.bbb.ccc trueTestpublic void test1() throws MqException {binding.setBindingKey(aaa);message.setRoutingKey(aaa);Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}Testpublic void test2() throws MqException {binding.setBindingKey(aaa.bbb);message.setRoutingKey(aaa.bbb);Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}Testpublic void test3() throws MqException {binding.setBindingKey(aaa.bbb);message.setRoutingKey(aaa.bbb.ccc);Assertions.assertFalse(router.route(ExchangeType.TOPIC, binding, message));}Testpublic void test4() throws MqException {binding.setBindingKey(aaa.bbb);message.setRoutingKey(aaa.ccc);Assertions.assertFalse(router.route(ExchangeType.TOPIC, binding, message));}Testpublic void test5() throws MqException {binding.setBindingKey(aaa.bbb.ccc);message.setRoutingKey(aaa.bbb.ccc);Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}Testpublic void test6() throws MqException {binding.setBindingKey(aaa.*);message.setRoutingKey(aaa.bbb);Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}Testpublic void test7() throws MqException {binding.setBindingKey(aaa.*.bbb);message.setRoutingKey(aaa.bbb.ccc);Assertions.assertFalse(router.route(ExchangeType.TOPIC, binding, message));}Testpublic void test8() throws MqException {binding.setBindingKey(*.aaa.bbb);message.setRoutingKey(aaa.bbb);Assertions.assertFalse(router.route(ExchangeType.TOPIC, binding, message));}Testpublic void test9() throws MqException {binding.setBindingKey(#);message.setRoutingKey(aaa.bbb.ccc);Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}Testpublic void test10() throws MqException {binding.setBindingKey(aaa.#);message.setRoutingKey(aaa.bbb);Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}Testpublic void test11() throws MqException {binding.setBindingKey(aaa.#);message.setRoutingKey(aaa.bbb.ccc);Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}Testpublic void test12() throws MqException {binding.setBindingKey(aaa.#.ccc);message.setRoutingKey(aaa.ccc);Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}Testpublic void test13() throws MqException {binding.setBindingKey(aaa.#.ccc);message.setRoutingKey(aaa.bbb.ccc);Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}Testpublic void test14() throws MqException {binding.setBindingKey(aaa.#.ccc);message.setRoutingKey(aaa.aaa.bbb.ccc);Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}Testpublic void test15() throws MqException {binding.setBindingKey(#.ccc);message.setRoutingKey(ccc);Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}Testpublic void test16() throws MqException {binding.setBindingKey(#.ccc);message.setRoutingKey(aaa.bbb.ccc);Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}
}
订阅消息
1) 添加⼀个订阅者 // 订阅消息.// 添加一个队列的订阅者, 当队列收到消息之后, 就要把消息推送给对应的订阅者.// consumerTag: 消费者的身份标识// autoAck: 消息被消费完成后, 应答的方式. 为 true 自动应答. 为 false 手动应答.// consumer: 是一个回调函数. 此处类型设定成函数式接口. 这样后续调用 basicConsume 并且传实参的时候, 就可以写作 lambda 样子了.public boolean basicConsume(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {// 构造一个 ConsumerEnv 对象, 把这个对应的队列找到, 再把这个 Consumer 对象添加到该队列中.queueName virtualHostName queueName;try {consumerManager.addConsumer(consumerTag, queueName, autoAck, consumer);System.out.println([VirtualHost] basicConsume 成功! queueName queueName);return true;} catch (Exception e) {System.out.println([VirtualHost] basicConsume 失败! queueName queueName);e.printStackTrace();return false;}}Consumer 相当于⼀个回调函数. 放到 common.Consumer 中
FunctionalInterface
public interface Consumer {// Delivery 的意思是 投递, 这个方法预期是在每次服务器收到消息之后, 来调用.// 通过这个方法把消息推送给对应的消费者.// (注意! 这里的方法名和参数, 也都是参考 RabbitMQ 展开的)void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException;
}
2) 创建订阅者管理管理类 • parent ⽤来记录虚拟主机. • 使⽤⼀个阻塞队列⽤来触发消息消费. 称为令牌队列. 每次有消息过来了, 都往队列中放⼀个令牌(也就是队列名), 然后消费者再去消费对应队列的消息. • 使⽤⼀个线程池⽤来执⾏消息回调. 这样令牌队列的设定避免搞出来太多线程. 否则就需要给每个队列都安排⼀个单独的线程了, 如果队列很多则开销就⽐较⼤了 3) 添加令牌接⼝ // 这个方法的调用时机就是发送消息的时候.public void notifyConsume(String queueName) throws InterruptedException {tokenQueue.put(queueName);}4) 实现添加订阅者
• 新来订阅者的时候, 需要先消费掉之前积压的消息. • consumeMessage 真正的消息消费操作, ⼀会再实现. public void addConsumer(String consumerTag, String queueName, boolean autoAck, Consumer consumer) throws MqException {// 找到对应的队列.MSGQueue queue parent.getMemoryDataCenter().getQueue(queueName);if (queue null) {throw new MqException([ConsumerManager] 队列不存在! queueName queueName);}ConsumerEnv consumerEnv new ConsumerEnv(consumerTag, queueName, autoAck, consumer);synchronized (queue) {queue.addConsumerEnv(consumerEnv);// 如果当前队列中已经有了一些消息了, 需要立即就消费掉.int n parent.getMemoryDataCenter().getMessageCount(queueName);for (int i 0; i n; i) {// 这个方法调用一次就消费一条消息.consumeMessage(queue);}}}创建 ConsumerEnv , 这个类表⽰⼀个订阅者的执⾏环境
public class ConsumerEnv {private String consumerTag;private String queueName;private boolean autoAck;// 通过这个回调来处理收到的消息.private Consumer consumer;public ConsumerEnv(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {this.consumerTag consumerTag;this.queueName queueName;this.autoAck autoAck;this.consumer consumer;}
给 MsgQueue 添加⼀个订阅者列表 此处的 chooseConsumer 是实现⼀个轮询效果. 如果⼀个队列有多个订阅者, 将会按照轮询的⽅式轮 流拿到消息
5) 实现扫描线程
在 ConsumerManager 中创建⼀个线程, 不停的尝试扫描令牌队列. 如果拿到了令牌, 就真正触发消费消息操作 public ConsumerManager(VirtualHost p) {parent p;scannerThread new Thread(() - {while (true) {try {// 1. 拿到令牌String queueName tokenQueue.take();// 2. 根据令牌, 找到队列MSGQueue queue parent.getMemoryDataCenter().getQueue(queueName);if (queue null) {throw new MqException([ConsumerManager] 取令牌后发现, 该队列名不存在! queueName queueName);}// 3. 从这个队列中消费一个消息.synchronized (queue) {consumeMessage(queue);}} catch (InterruptedException | MqException e) {e.printStackTrace();}}});// 把线程设为后台线程.scannerThread.setDaemon(true);scannerThread.start();}6) 实现消费消息 private void consumeMessage(MSGQueue queue) {// 1. 按照轮询的方式, 找个消费者出来.ConsumerEnv luckyDog queue.chooseConsumer();if (luckyDog null) {// 当前队列没有消费者, 暂时不消费. 等后面有消费者出现再说.return;}// 2. 从队列中取出一个消息Message message parent.getMemoryDataCenter().pollMessage(queue.getName());if (message null) {// 当前队列中还没有消息, 也不需要消费.return;}// 3. 把消息带入到消费者的回调方法中, 丢给线程池执行.workerPool.submit(() - {try {// 1. 把消息放到待确认的集合中. 这个操作势必在执行回调之前.parent.getMemoryDataCenter().addMessageWaitAck(queue.getName(), message);// 2. 真正执行回调操作luckyDog.getConsumer().handleDelivery(luckyDog.getConsumerTag(), message.getBasicProperties(),message.getBody());// 3. 如果当前是 自动应答 , 就可以直接把消息删除了.// 如果当前是 手动应答 , 则先不处理, 交给后续消费者调用 basicAck 方法来处理.if (luckyDog.isAutoAck()) {// 1) 删除硬盘上的消息if (message.getDeliverMode() 2) {parent.getDiskDataCenter().deleteMessage(queue, message);}// 2) 删除上面的待确认集合中的消息parent.getMemoryDataCenter().removeMessageWaitAck(queue.getName(), message.getMessageId());// 3) 删除内存中消息中心里的消息parent.getMemoryDataCenter().removeMessage(message.getMessageId());System.out.println([ConsumerManager] 消息被成功消费! queueName queue.getName());}} catch (Exception e) {e.printStackTrace();}});}注意: ⼀个队列可能有 N 个消费者, 此处应该按照轮询的⽅式挑⼀个消费者进⾏消费.
⼩结
⼀. 消费消息的两种典型情况
订阅者已经存在了, 才发送消息 这种直接获取队列的订阅者, 从中按照轮询的⽅式挑⼀个消费者来调⽤回调即可.消息先发送到队列了, 订阅者还没到. 此时当订阅者到达, 就快速把指定队列中的消息全都消费掉. ⼆. 关于消息不丢失的论证 每个消息在从内存队列中出队列时, 都会先进⼊ 待确认 中. • 如果 autoAck 为 true 消息被消费完毕后(执⾏完消息回调之后), 再执⾏清除⼯作. 分别清除硬盘数据, 待确认队列, 消息中⼼. • 如果 autoAck 为 false 在回调内部, 进⾏清除⼯作. 分别清除硬盘数据, 待确认队列, 消息中⼼.执⾏消息回调的时候抛出异常 此时消息仍然处在待确认队列中. 此时可以⽤⼀个线程扫描待确认队列, 如果发现队列中的消息超时未确认, 则放⼊死信队列. 死信队列咱们此处暂不实现.执⾏消息回调的时候服务器宕机 内存所有数据都没了, 但是消息在硬盘上仍然存在. 会在服务下次启动的时候, 加载回内存. 重新被消费到.
消息确认 测试 VirtualHost
package com.example.mq;import com.example.mq.common.Consumer;
import com.example.mq.mqserver.VirtualHost;
import com.example.mq.mqserver.core.BasicProperties;
import com.example.mq.mqserver.core.ExchangeType;
import org.apache.tomcat.util.http.fileupload.FileUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.test.context.SpringBootTest;import java.io.File;
import java.io.IOException;SpringBootTest
public class VirtualHostTests {private VirtualHost virtualHost null;BeforeEachpublic void setUp() {MqApplication.context SpringApplication.run(MqApplication.class);virtualHost new VirtualHost(default);}AfterEachpublic void tearDown() throws IOException {MqApplication.context.close();virtualHost null;// 把硬盘的目录删除掉File dataDir new File(./data);FileUtils.deleteDirectory(dataDir);}Testpublic void testExchangeDeclare() {boolean ok virtualHost.exchangeDeclare(testExchange, ExchangeType.DIRECT,true, false, null);Assertions.assertTrue(ok);}Testpublic void testExchangeDelete() {boolean ok virtualHost.exchangeDeclare(testExchange, ExchangeType.DIRECT,true, false, null);Assertions.assertTrue(ok);ok virtualHost.exchangeDelete(testExchange);Assertions.assertTrue(ok);}Testpublic void testQueueDeclare() {boolean ok virtualHost.queueDeclare(testQueue, true,false, false, null);Assertions.assertTrue(ok);}Testpublic void testQueueDelete() {boolean ok virtualHost.queueDeclare(testQueue, true,false, false, null);Assertions.assertTrue(ok);ok virtualHost.queueDelete(testQueue);Assertions.assertTrue(ok);}Testpublic void testQueueBind() {boolean ok virtualHost.queueDeclare(testQueue, true,false, false, null);Assertions.assertTrue(ok);ok virtualHost.exchangeDeclare(testExchange, ExchangeType.DIRECT,true, false, null);Assertions.assertTrue(ok);ok virtualHost.queueBind(testQueue, testExchange, testBindingKey);Assertions.assertTrue(ok);}Testpublic void testQueueUnbind() {boolean ok virtualHost.queueDeclare(testQueue, true,false, false, null);Assertions.assertTrue(ok);ok virtualHost.exchangeDeclare(testExchange, ExchangeType.DIRECT,true, false, null);Assertions.assertTrue(ok);ok virtualHost.queueBind(testQueue, testExchange, testBindingKey);Assertions.assertTrue(ok);ok virtualHost.queueUnbind(testQueue, testExchange);Assertions.assertTrue(ok);}Testpublic void testBasicPublish() {boolean ok virtualHost.queueDeclare(testQueue, true,false, false, null);Assertions.assertTrue(ok);ok virtualHost.exchangeDeclare(testExchange, ExchangeType.DIRECT,true, false, null);Assertions.assertTrue(ok);ok virtualHost.basicPublish(testExchange, testQueue, null,hello.getBytes());Assertions.assertTrue(ok);}// 先订阅队列, 后发送消息Testpublic void testBasicConsume1() throws InterruptedException {boolean ok virtualHost.queueDeclare(testQueue, true,false, false, null);Assertions.assertTrue(ok);ok virtualHost.exchangeDeclare(testExchange, ExchangeType.DIRECT,true, false, null);Assertions.assertTrue(ok);// 先订阅队列ok virtualHost.basicConsume(testConsumerTag, testQueue, true, new Consumer() {Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {try {// 消费者自身设定的回调方法.System.out.println(messageId basicProperties.getMessageId());System.out.println(body new String(body, 0, body.length));Assertions.assertEquals(testQueue, basicProperties.getRoutingKey());Assertions.assertEquals(1, basicProperties.getDeliverMode());Assertions.assertArrayEquals(hello.getBytes(), body);} catch (Error e) {// 断言如果失败, 抛出的是 Error, 而不是 Exception!e.printStackTrace();System.out.println(error);}}});Assertions.assertTrue(ok);Thread.sleep(500);// 再发送消息ok virtualHost.basicPublish(testExchange, testQueue, null,hello.getBytes());Assertions.assertTrue(ok);}// 先发送消息, 后订阅队列.Testpublic void testBasicConsume2() throws InterruptedException {boolean ok virtualHost.queueDeclare(testQueue, true,false, false, null);Assertions.assertTrue(ok);ok virtualHost.exchangeDeclare(testExchange, ExchangeType.DIRECT,true, false, null);Assertions.assertTrue(ok);// 先发送消息ok virtualHost.basicPublish(testExchange, testQueue, null,hello.getBytes());Assertions.assertTrue(ok);// 再订阅队列ok virtualHost.basicConsume(testConsumerTag, testQueue, true, new Consumer() {Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {// 消费者自身设定的回调方法.System.out.println(messageId basicProperties.getMessageId());System.out.println(body new String(body, 0, body.length));Assertions.assertEquals(testQueue, basicProperties.getRoutingKey());Assertions.assertEquals(1, basicProperties.getDeliverMode());Assertions.assertArrayEquals(hello.getBytes(), body);}});Assertions.assertTrue(ok);Thread.sleep(500);}Testpublic void testBasicConsumeFanout() throws InterruptedException {boolean ok virtualHost.exchangeDeclare(testExchange, ExchangeType.FANOUT, false, false, null);Assertions.assertTrue(ok);ok virtualHost.queueDeclare(testQueue1, false, false, false, null);Assertions.assertTrue(ok);ok virtualHost.queueBind(testQueue1, testExchange, );Assertions.assertTrue(ok);ok virtualHost.queueDeclare(testQueue2, false, false, false, null);Assertions.assertTrue(ok);ok virtualHost.queueBind(testQueue2, testExchange, );Assertions.assertTrue(ok);// 往交换机中发布一个消息ok virtualHost.basicPublish(testExchange, , null, hello.getBytes());Assertions.assertTrue(ok);Thread.sleep(500);// 两个消费者订阅上述的两个队列.ok virtualHost.basicConsume(testConsumer1, testQueue1, true, new Consumer() {Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {System.out.println(consumerTag consumerTag);System.out.println(messageId basicProperties.getMessageId());Assertions.assertArrayEquals(hello.getBytes(), body);}});Assertions.assertTrue(ok);ok virtualHost.basicConsume(testConsumer2, testQueue2, true, new Consumer() {Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {System.out.println(consumerTag consumerTag);System.out.println(messageId basicProperties.getMessageId());Assertions.assertArrayEquals(hello.getBytes(), body);}});Assertions.assertTrue(ok);Thread.sleep(500);}Testpublic void testBasicConsumeTopic() throws InterruptedException {boolean ok virtualHost.exchangeDeclare(testExchange, ExchangeType.TOPIC, false, false, null);Assertions.assertTrue(ok);ok virtualHost.queueDeclare(testQueue, false, false, false, null);Assertions.assertTrue(ok);ok virtualHost.queueBind(testQueue, testExchange, aaa.*.bbb);Assertions.assertTrue(ok);ok virtualHost.basicPublish(testExchange, aaa.ccc.bbb, null, hello.getBytes());Assertions.assertTrue(ok);ok virtualHost.basicConsume(testConsumer, testQueue, true, new Consumer() {Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {System.out.println(consumerTag consumerTag);System.out.println(messageId basicProperties.getMessageId());Assertions.assertArrayEquals(hello.getBytes(), body);}});Assertions.assertTrue(ok);Thread.sleep(500);}Testpublic void testBasicAck() throws InterruptedException {boolean ok virtualHost.queueDeclare(testQueue, true,false, false, null);Assertions.assertTrue(ok);ok virtualHost.exchangeDeclare(testExchange, ExchangeType.DIRECT,true, false, null);Assertions.assertTrue(ok);// 先发送消息ok virtualHost.basicPublish(testExchange, testQueue, null,hello.getBytes());Assertions.assertTrue(ok);// 再订阅队列 [要改的地方, 把 autoAck 改成 false]ok virtualHost.basicConsume(testConsumerTag, testQueue, false, new Consumer() {Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {// 消费者自身设定的回调方法.System.out.println(messageId basicProperties.getMessageId());System.out.println(body new String(body, 0, body.length));Assertions.assertEquals(testQueue, basicProperties.getRoutingKey());Assertions.assertEquals(1, basicProperties.getDeliverMode());Assertions.assertArrayEquals(hello.getBytes(), body);// [要改的地方, 新增手动调用 basicAck]boolean ok virtualHost.basicAck(testQueue, basicProperties.getMessageId());Assertions.assertTrue(ok);}});Assertions.assertTrue(ok);Thread.sleep(500);}
} 文章转载自: http://www.morning.clgbb.cn.gov.cn.clgbb.cn http://www.morning.mzpd.cn.gov.cn.mzpd.cn http://www.morning.fnzbx.cn.gov.cn.fnzbx.cn http://www.morning.fdsbs.cn.gov.cn.fdsbs.cn http://www.morning.yrycb.cn.gov.cn.yrycb.cn http://www.morning.qnwyf.cn.gov.cn.qnwyf.cn http://www.morning.mmtbn.cn.gov.cn.mmtbn.cn http://www.morning.dxqfh.cn.gov.cn.dxqfh.cn http://www.morning.jcjgh.cn.gov.cn.jcjgh.cn http://www.morning.krjrb.cn.gov.cn.krjrb.cn http://www.morning.jzxqj.cn.gov.cn.jzxqj.cn http://www.morning.qmbtn.cn.gov.cn.qmbtn.cn http://www.morning.cpfbg.cn.gov.cn.cpfbg.cn http://www.morning.dddcfr.cn.gov.cn.dddcfr.cn http://www.morning.bzfwn.cn.gov.cn.bzfwn.cn http://www.morning.qmbtn.cn.gov.cn.qmbtn.cn http://www.morning.dangaw.com.gov.cn.dangaw.com http://www.morning.mjdbd.cn.gov.cn.mjdbd.cn http://www.morning.nwrzf.cn.gov.cn.nwrzf.cn http://www.morning.zcqgf.cn.gov.cn.zcqgf.cn http://www.morning.cwrpd.cn.gov.cn.cwrpd.cn http://www.morning.nmqdk.cn.gov.cn.nmqdk.cn http://www.morning.pslzp.cn.gov.cn.pslzp.cn http://www.morning.fgxr.cn.gov.cn.fgxr.cn http://www.morning.sqqkr.cn.gov.cn.sqqkr.cn http://www.morning.cyysq.cn.gov.cn.cyysq.cn http://www.morning.kgrwh.cn.gov.cn.kgrwh.cn http://www.morning.srnth.cn.gov.cn.srnth.cn http://www.morning.kynf.cn.gov.cn.kynf.cn http://www.morning.kwdfn.cn.gov.cn.kwdfn.cn http://www.morning.jlboyuan.cn.gov.cn.jlboyuan.cn http://www.morning.ttshf.cn.gov.cn.ttshf.cn http://www.morning.ghryk.cn.gov.cn.ghryk.cn http://www.morning.rsqpc.cn.gov.cn.rsqpc.cn http://www.morning.zljqb.cn.gov.cn.zljqb.cn http://www.morning.ntkpc.cn.gov.cn.ntkpc.cn http://www.morning.qftzk.cn.gov.cn.qftzk.cn http://www.morning.zlkps.cn.gov.cn.zlkps.cn http://www.morning.cryb.cn.gov.cn.cryb.cn http://www.morning.lwtfr.cn.gov.cn.lwtfr.cn http://www.morning.rsqpc.cn.gov.cn.rsqpc.cn http://www.morning.zmyhn.cn.gov.cn.zmyhn.cn http://www.morning.gqmhq.cn.gov.cn.gqmhq.cn http://www.morning.svrud.cn.gov.cn.svrud.cn http://www.morning.ccpnz.cn.gov.cn.ccpnz.cn http://www.morning.ymtbr.cn.gov.cn.ymtbr.cn http://www.morning.sqqhd.cn.gov.cn.sqqhd.cn http://www.morning.ryfpx.cn.gov.cn.ryfpx.cn http://www.morning.fwwkr.cn.gov.cn.fwwkr.cn http://www.morning.jzklb.cn.gov.cn.jzklb.cn http://www.morning.xkbdx.cn.gov.cn.xkbdx.cn http://www.morning.cpqqf.cn.gov.cn.cpqqf.cn http://www.morning.dfkby.cn.gov.cn.dfkby.cn http://www.morning.wnjsp.cn.gov.cn.wnjsp.cn http://www.morning.xqnzn.cn.gov.cn.xqnzn.cn http://www.morning.wwgpy.cn.gov.cn.wwgpy.cn http://www.morning.bzkgn.cn.gov.cn.bzkgn.cn http://www.morning.nzklw.cn.gov.cn.nzklw.cn http://www.morning.nhdw.cn.gov.cn.nhdw.cn http://www.morning.ptmch.com.gov.cn.ptmch.com http://www.morning.lmjtp.cn.gov.cn.lmjtp.cn http://www.morning.phgz.cn.gov.cn.phgz.cn http://www.morning.qypjk.cn.gov.cn.qypjk.cn http://www.morning.ppqzb.cn.gov.cn.ppqzb.cn http://www.morning.ntzbr.cn.gov.cn.ntzbr.cn http://www.morning.tknqr.cn.gov.cn.tknqr.cn http://www.morning.hdzty.cn.gov.cn.hdzty.cn http://www.morning.zkqsc.cn.gov.cn.zkqsc.cn http://www.morning.dkzwx.cn.gov.cn.dkzwx.cn http://www.morning.ncqzb.cn.gov.cn.ncqzb.cn http://www.morning.mbbgk.com.gov.cn.mbbgk.com http://www.morning.xcdph.cn.gov.cn.xcdph.cn http://www.morning.swimstaracademy.cn.gov.cn.swimstaracademy.cn http://www.morning.trplf.cn.gov.cn.trplf.cn http://www.morning.gwtbn.cn.gov.cn.gwtbn.cn http://www.morning.qzqjz.cn.gov.cn.qzqjz.cn http://www.morning.ptwzy.cn.gov.cn.ptwzy.cn http://www.morning.rjqtq.cn.gov.cn.rjqtq.cn http://www.morning.zlxrg.cn.gov.cn.zlxrg.cn http://www.morning.kxqwg.cn.gov.cn.kxqwg.cn