当前位置: 首页 > news >正文

电线电缆做销售哪个网站好培训学校怎么招生

电线电缆做销售哪个网站好,培训学校怎么招生,山东省工程建设信息网官网,南昌做网站建设公司这是本人学习的总结,主要学习资料如下 马士兵教育rocketMq官方文档 目录 1、分布式事务的难题2、解决方式2.1、半事务消息和事务回查2.2、代码样例2.2.1、TransactionListener2.2.2、TransactionMQProducer2.2.3、MessageListenerConcurrently2.2.4、流程图 1、分布…

这是本人学习的总结,主要学习资料如下

  • 马士兵教育
  • rocketMq官方文档

目录

  • 1、分布式事务的难题
  • 2、解决方式
    • 2.1、半事务消息和事务回查
    • 2.2、代码样例
      • 2.2.1、TransactionListener
      • 2.2.2、TransactionMQProducer
      • 2.2.3、MessageListenerConcurrently
      • 2.2.4、流程图


1、分布式事务的难题

现有两个系统,A向B转钱。A系统扣钱和B系统加钱就应该属于同一个事务,任何一个失败都要回滚。两个系统之间唯一的通信方式就是RocketMQ
请添加图片描述

以最朴素的想法,现在就有两个实现分布式事务的方案。但这两个都有比较大的不可靠性。

  • A系统先扣钱再发送MQ:这样的弊端是无法确定消息有没有发送到MQ,或者消息有没有被MQ保存。总之这做法缺少一些回查的机制。
  • A系统先发送MQ再扣钱:这样的弊端是发送消息后,A系统可能出现错误回滚。而B收到了消息就正常消费,完全不知道A那边出了问题。

2、解决方式

2.1、半事务消息和事务回查

  • 半事务消息:半事务消息是指向RocketMQ发送一条消息,但这个消息只存放在CommitLog中,并不在ConsumeQueue展示。也就是说该消息被RocketMQ接收了,但是消费者却无法消费到这条消息。
  • 事务回查:在半事务消息发送成功后。A系统执行事务,如果成功则MQ将消息变成正常消息,失败则不发送消息。这里如果业务太复杂还不能确定事务是否完成的话,还可以发送UNKNOWN给MQ,这样MQ就会有定时器去检查事务是否完成。
    RocketMQ会向生产者询问是否可以把半事务变成正常的消息让消费者可以消费到。在这篇文章的例子就是询问A系统扣款有没有扣成功。如果成功了那就让B系统消费消息。

请添加图片描述

所以呢,通过半事务消息事务回查就能保证A系统和发送消息具有事务,即扣款失败则不发送消息,扣款成功则发送消息。所以半事务消息至少保证了生产者和MQ之间的原子性。MQ和消费者之间的原子性需要另外处理。

消费者需要保证幂等性,失败后重试,即使称为死信后也特殊处理等操作来保证事务。这个例子中B系统成功加钱的话那交易结束,如果尝试多次后还是失败,那就需要一个机制来通知A系统,让他把扣掉的钱加回去。

2.2、代码样例

2.2.1、TransactionListener

一个接口规范,我们需要实现这个接口来定义本地事务和事务回查。

就是本地事务具体执行,成功后怎么办,失败了怎么办。定时的事务回查如何检查事务有没有完成。这些东西都要定义在TransactionListener的实现中。


TransactionListener transactionListener = new TransactionListener() {@Overridepublic LocalTransactionState executeLocalTransaction(Message message, Object o) {// 执行本地事务,A扣100块// 如果成功// return LocalTransactionState.COMMIT_MESSAGE;// 如果失败// return LocalTransactionState.ROLLBACK_MESSAGE;//或者业务比较复杂,不想在这个阶段就关闭事务,可以返回Unknown,之后就需要MQ定时事务回查return LocalTransactionState.UNKNOW;}@Override// 事务回查,默认一分钟一次public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {System.out.println("事务回查, " + new SimpleDateFormat("yyyyMMdd, HH:mm:ss").format(new Date()));// 如果成功// return LocalTransactionState.COMMIT_MESSAGE;// 如果失败// return LocalTransactionState.ROLLBACK_MESSAGE;// 业务比较长,还不确定成功或失败,返回unknown,下次再查return LocalTransactionState.UNKNOW;}};

2.2.2、TransactionMQProducer

半事务消息的生产者,在DefaultMQProducer的基础上新增了一个重要的参数,类型是ExecutorService。这个线程池是用来生产线程去完成事务回查。

但是事务回查的逻辑不需要定义在线程的run()方法中,这一部分放在TransactionListener中。

 TransactionMQProducer producer = new TransactionMQProducer("transaction_producer");producer.setNamesrvAddr("localhost:9876");// build a thread pool used to for MQ to call back to check transactionExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.MINUTES, new ArrayBlockingQueue<>(10), (r) -> {Thread thread = new Thread(r);thread.setName("client-transaction-msg-check-thread");return thread;});producer.setExecutorService(executorService);producer.setTransactionListener(transactionListener);producer.start();try{Message msg = new Message("transaction_producer", null, "A give B 100 dollar".getBytes());SendResult sendResult = producer.sendMessageInTransaction(msg, null);}catch(Exception e) {// rollbackSystem.out.println("rollback");}producer.shutdown();

2.2.3、MessageListenerConcurrently

消费者部分就比较简单,只要listener是MessageListenerConcurrently就好。

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("transaction_consumer");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TransactionalTopic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {try{for(MessageExt msg: list) {// simulate DB actionSystem.out.println("update B where transactionId" + msg.getTransactionId());System.out.println("Success consume msg: " + msg.getMsgId());}} catch (Exception e) {e.printStackTrace();System.out.println("Failed to consume meg, try more times");// means that failed to consume this msg. In next time will still consume this msg.return ConsumeConcurrentlyStatus.RECONSUME_LATER;}// means that success to consume this msg. In the next time will consume next msg.return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
});consumer.start();
while(true){
}

2.2.4、流程图

请添加图片描述

http://www.tj-hxxt.cn/news/110276.html

相关文章:

  • 合肥长丰路网站建设石家庄最新消息今天
  • 网站策划的最终体现是什么成都seo优化外包公司
  • 专门做自助游攻略的网站是哪个山东一级造价师
  • 网站开发年收入在百度怎么发广告做宣传
  • 网站建设投标文件站长之家seo信息
  • 在线视频网站开发互联网站
  • 施工企业安全培训西安关键词优化软件
  • 网络科技公司有什么职位seo外链怎么发
  • 产地证哪个网站做快链友情链接平台
  • 设计云网站建设百度发作品入口在哪里
  • 动态网站制作基础建议免费推广广告链接
  • php做网站csdn百度搜索引擎优化方案
  • 新闻聚合网站怎么做外贸企业网站设计公司
  • 做期货都看那些网站湖北搜索引擎优化
  • 网站流量少怎么做网站增加外链的方法有哪些
  • 哪些网站可以上传自己做的视频网络平台推广方案
  • 重庆营销型网站开发价格宁波seo推广服务
  • 网站建设 网址导航2023年7 8月十大新闻
  • wordpress新建模板网站的seo是什么意思
  • 黔南州住房和城乡建设局网站网络营销的概念
  • 高端大气网络设计建设公司网站织梦模板百度渠道开户
  • 广州专业做网站建设有免费做网站的吗
  • 做网站付钱方式上海做网站优化
  • 北京专业网络直播制作成都网站优化及推广
  • 泰兴网站推广推广搜索怎么选关键词
  • 网络浏览器厦门seo结算
  • 怎么做别人可以上的网站网站运营管理
  • 淘客采集网站怎么做的昆山seo网站优化软件
  • 个人主页网站制作app推广的常用方法
  • 如何做班级网站网址链接