有没有外包活的网站,棋牌小程序制作开发,抖音怎么开通小程序推广,手机网站应该怎么做目录 一、事务基础概念二、分布式事务概念什么是分布式事务分布式事务场景CAP定理CAP理论理解CAPCAP的应用 BASE定理强一致性和最终一致性BASE理论 分布式事务分类刚性事务柔性事务 三、分布式事务解决方案方案汇总XA规范方案1#xff1a;2PC第一阶段#xff1a;准备阶段第二… 目录 一、事务基础概念二、分布式事务概念什么是分布式事务分布式事务场景CAP定理CAP理论理解CAPCAP的应用 BASE定理强一致性和最终一致性BASE理论 分布式事务分类刚性事务柔性事务 三、分布式事务解决方案方案汇总XA规范方案12PC第一阶段准备阶段第二阶段提交阶段优缺点 方案23PC阶段一CanCommit 准备阶段阶段二PreCommit阶段三doCommit阶段2PC和3PC的区别优缺点 补偿型事务方案3TCCTry(尝试)Confirm(确认)Cancel(取消)TCC事务模型的要求优缺点TCC与2PC对比 方案4SAGA概念命令协调事件编排异常恢复命令VS事件Saga和TCC对比 通知型事务方案5本地消息表操作流程优缺点 方案6MQ消息事务操作流程基于 RocketMQ实现MQ异步确保型事务优缺点MQ事务消息 VS 本地消息表 方案7最大努力通知方案选择 四、分布式事务最佳实践-Seata简介常见术语项目准备数据库准备库存服务账户服务订单服务业务服务测试 Seata配置下载修改配置文件nacos配置数据库建表启动seata-server Seata使用XA模式原理代码优缺点 Seata使用AT模式原理问题代码优缺点踩坑 Seata使用TCC模式原理问题代码优缺点 Seata使用SAGA模式概述原理优缺点 模式选择 一、事务基础概念
什么是事务
事务是并发控制的单位是用户定义的一个操作序列。
事务特性
原子性(Atomicity) 事务是数据库的逻辑工作单位事务中包括的诸操作要么全做要么全不做。一致性(Consistency) 事务执行的结果必须是使数据库从一个一致性状态变到另一个一致性状态。一致性与原子性是密切相关的。隔离性(Isolation) 一个事务的执行不能被其他事务干扰。持续性/永久性(Durability) 一个事务一旦提交它对数据库中数据的改变就应该是永久性的。
数据库事务在实现时会将一次事务的所有操作全部纳入到一个不可分割的执行单元该执行单元的所有操作要么都成功要么都失败只要其中任一操作执行失败都将导致整个事务的回滚。
MySQL的本地事务实现
大多数场景下我们的应用都只需要操作单一的数据库这种情况下的事务称之为本地事务(Local Transaction)。本地事务的ACID特性是数据库直接提供支持。为了达成本地事务MySQL做了很多的工作比如回滚日志重做日志MVCC读写锁等。
以MySQL 的InnoDB InnoDB 是 MySQL 的一个存储引擎为例介绍一下单一数据库的事务实现原理。
通过数据库锁的机制保障事务的隔离性通过 Redo Log重做日志来保障事务的持久性通过 Undo Log 撤销日志来保障事务的原子性通过 Undo Log 撤销日志来保障事务的一致性
Undo Log 如何保障事务的原子性呢
具体的方式为在操作任何数据之前首先将数据备份到一个地方这个存储数据备份的地方称为 Undo Log然后进行数据的修改。如果出现了错误或者用户执行了 Rollback 语句系统可以利用 Undo Log 中的备份将数据恢复到事务开始之前的状态。
Redo Log如何保障事务的持久性呢
具体的方式为Redo Log 记录的是新数据的备份和 Undo Log 相反。在事务提交前只要将 Redo Log 持久化即可不需要将数据持久化。当系统崩溃时虽然数据没有持久化但是 Redo Log 已经持久化。系统可以根据 Redo Log 的内容将所有数据恢复到崩溃之前的状态。
二、分布式事务概念
什么是分布式事务
分布式事务是针对分布式系统而言。分布式事务需要保证分布式系统中的数据一致性保证数据在子系统中始终保持一致避免业务出现问题。分布式系统中对数要么一起成功要么一起失败必须是一个整体性的事务。
分布式事务指事务的参与者、支持事务的服务器、资源服务器以及事务管理器分别位于不同的分布式系统的不同节点之上。
简单的说在分布式系统上一次大的操作由不同的小操作组成这些小的操作分布在不同的服务节点上且属于不同的应用分布式事务需要保证这些小操作要么全部成功要么全部失败。
举个例子在电商网站中用户对商品进行下单需要在订单表中创建一条订单数据同时需要在库存表中修改当前商品的剩余库存数量两步操作一个添加一个修改我们一定要保证这两步操作一定同时操作成功或失败否则业务就会出现问题。
任何事务机制在实现时都应该考虑事务的ACID特性包括本地事务、分布式事务。对于分布式事务而言即使不能都很好的满足也要考虑支持到什么程度。
分布式事务场景
跨库事务
跨库事务指的是一个应用某个功能需要操作多个库不同的库中存储不同的业务数据。笔者见过一个相对比较复杂的业务一个业务中同时操作了9个库。
下图演示了一个服务同时操作2个库的情况 分库分表
通常一个库数据量比较大或者预期未来的数据量比较大都会进行水平拆分也就是分库分表。
如下图将数据库B拆分成了2个库 对于分库分表的情况一般开发人员都会使用一些数据库中间件来降低sql操作的复杂性。
微服务化
微服务架构是目前一个比较一个比较火的概念。例如上面笔者提到的一个案例某个应用同时操作了9个库这样的应用业务逻辑必然非常复杂对于开发人员是极大的挑战应该拆分成不同的独立服务以简化业务逻辑。拆分后独立服务之间通过RPC框架来进行远程调用实现彼此的通信。下图演示了一个3个服务之间彼此调用的架构 Service A完成某个功能需要直接操作数据库同时需要调用Service B和Service C而Service B又同时操作了2个数据库Service C也操作了一个库。需要保证这些跨服务的对多个数据库的操作要不都成功要不都失败实际上这可能是最典型的分布式事务场景。
分布式事务实现方案必须要考虑性能的问题如果为了严格保证ACID特性导致性能严重下降那么对于一些要求快速响应的业务是无法接受的。
CAP定理
CAP 是 Consistency、Availability、Partition tolerance 三个单词的缩写分别表示一致性、可用性、分区容忍性。
CAP理论
CAP定理是由加州大学伯克利分校Eric Brewer教授提出来的他指出WEB服务无法同时满足一下3个属性
一致性(Consistency) 客户端知道一系列的操作都会同时发生(生效)可用性(Availability) 每个操作都必须以可预期的响应结束分区容错性(Partition tolerance) 即使出现单个组件无法可用操作依然可以完成
具体地讲在分布式系统中一个Web应用至多只能同时支持上面的两个属性。因此设计人员必须在一致性与可用性之间做出选择。 理解CAP
下面为了方便对CAP理论的理解我们结合电商系统中的一些业务场景来理解CAP。
如下图是商品信息管理的执行流程 C - Consistency
一致性是指写操作后的读操作可以读取到最新的数据状态当数据分布在多个节点上从任意结点读取到的数据都是最新的状态。
上图中商品信息的读写要满足一致性就是要实现如下目标
商品服务写入主数据库成功则向从数据库查询新数据也成功。 商品服务写入主数据库失败则向从数据库查询新数据也失败。
如何实现一致性
写入主数据库后要将数据同步到从数据库。写入主数据库后在向从数据库同步期间要将从数据库锁定待同步完成后再释放锁以免在新数据写入成功后向从数据库查询到旧的数据。
分布式系统一致性的特点
由于存在数据同步的过程写操作的响应会有一定的延迟。 为了保证数据一致性会对资源暂时锁定待数据同步完成释放锁定资源。 如果请求数据同步失败的结点则会返回错误信息一定不会返回旧数据。
A - Availability
可用性是指任何事务操作都可以得到响应结果且不会出现响应超时或响应错误。
上图中商品信息读取满足可用性就是要实现如下目标
从数据库接收到数据查询的请求则立即能够响应数据查询结果从数据库不允许出现响应超时或响应错误。
如何实现可用性
由于要保证从数据库的可用性不可将从数据库中的资源进行锁定即时数据还没有同步过来从数据库也要返回要查询的数据哪怕是旧数据如果连旧数据也没有则可以按照约定返回一个默认信息但不能返回错误或响应超时
分布式系统可用性的特点
所有请求都有响应且不会出现响应超时或响应错误。
P - Partition tolerance
通常分布式系统的各各结点部署在不同的子网这就是网络分区不可避免的会出现由于网络问题而导致结点之间通信失败此时仍可对外提供服务这叫分区容忍性。
上图中商品信息读写满足分区容忍性就是要实现如下目标
主数据库向从数据库同步数据失败不影响读写操作。其一个结点挂掉不影响另一个结点对外提供服务。
如何实现分区容忍性
尽量使用异步取代同步操作例如使用异步方式将数据从主数据库同步到从数据这样结点之间能有效的实现松耦合。添加从数据库结点其中一个从结点挂掉其它从结点提供服务。
分布式分区容忍性的特点
分区容忍性分是布式系统具备的基本能力。
CAP的应用
目前很多大型网站及应用都是分布式部署的分布式场景中的数据一致性问题一直是一个比较重要的话题。
基于 CAP理论很多系统在设计之初就要对这三者做出取舍任何一个分布式系统都无法同时满足一致性Consistency、可用性Availability和分区容错性Partition tolerance最多只能同时满足两项。在互联网领域的绝大多数的场景中都需要牺牲强一致性来换取系统的高可用性系统往往只需要保证最终一致性。
在生产中对分布式事务处理时要根据需求来确定满足 CAP 的以下哪两个方面 AP 放弃一致性追求分区容忍性和可用性。这是很多分布式系统设计时的选择。 例如上边的商品管理完全可以实现 AP前提是只要用户可以接受所查询到的数据在一定时间内不是最新的即可。 通常实现 AP 都会保证最终一致性后面将的 BASE 理论就是根据 AP 来扩展的一些业务场景比如订单退款今日退款成功明日账户到账只要用户可以接受在一定的时间内到账即可。 CP 放弃可用性追求一致性和分区容错性zookeeper 其实就是追求的强一致又比如跨行转账一次转账请求要等待双方银行系统都完成整个事务才算完成。 CA 放弃分区容忍性即不进行分区不考虑由于网络不通或结点挂掉的问题则可以实现一致性和可用性。那么系统将不是一个标准的分布式系统最常用的关系型数据就满足了 CA。
对于分布式系统而言分区容错性是一个最基本的要求因此基本上我们在设计分布式系统的时候只能从一致性C和可用性A之间进行取舍。
BASE定理
强一致性和最终一致性
CAP 理论告诉我们一个分布式系统最多只能同时满足一致性Consistency、可用性Availability和分区容忍性Partition tolerance这三项中的两项其中AP在实际应用中较多AP 即舍弃一致性保证可用性和分区容忍性但是在实际生产中很多场景都要实现一致性比如前边我们举的例子主数据库向从数据库同步数据即使不要一致性但是最终也要将数据同步成功来保证数据一致这种一致性和 CAP 中的一致性不同CAP 中的一致性要求 在任何时间查询每个结点数据都必须一致它强调的是强一致性但是最终一致性是允许可以在一段时间内每个结点的数据不一致但是经过一段时间每个结点的数据必须一致它强调的是最终数据的一致性。
BASE理论
BASE是Basically Available(基本可用、**Soft state(软状态和Eventually consistent(最终一致性**三个短语的简写。
BASE是对CAP中一致性和可用性权衡的结果其来源于对大规模互联网系统分布式实践的总结是基于CAP定理逐步演化而来的其核心思想是即使无法做到强一致性但每个应用都可以根据自身的业务特点采用适当的方法来使系统达到最终一致性。
Basically Available基本可用
基本可用是指分布式系统在出现不可预知的故障的时候允许损失部分可用性但不等于系统不可用。
响应时间上的损失当出现故障时响应时间增加
功能上的损失 当流量高峰期时屏蔽一些功能的使用以保证系统稳定性服务降级
Soft state软状态
指允许系统中的数据存在中间状态并认为该中间状态的存在不会影响系统的整体可用性。
与硬状态相对即是指允许系统中的数据存在中间状态并认为该中间状态的存在不会影响系统的整体可用性即允许系统在不同节点的数据副本之间进行数据同步的过程存在延时。
Eventually consistent最终一致性
强调系统中所有的数据副本在经过一段时间的同步后最终能够达到一个一致的状态。其本质是需要系统保证最终数据能够达到一致而不需要实时保证系统数据的强一致性。
分布式事务分类
刚性事务
刚性事务指的是要使分布式事务达到像本地式事务一样具备数据强一致性从CAP来看就是说要达到CP状态。
通常无业务改造强一致性原生支持回滚/隔离性低并发适合短事务。
刚性事务XA 协议2PC、JTA、JTS、3PC但由于同步阻塞处理效率低不适合大型网站分布式场景。
柔性事务
柔性事务指的是不要求强一致性而是要求最终一致性允许有中间状态也就是Base理论换句话说就是AP状态。
与刚性事务相比柔性事务的特点为有业务改造最终一致性实现补偿接口实现资源锁定接口高并发适合长事务。
柔性事务分为
补偿型异步确保型最大努力通知型。
柔型事务TCC/FMT、Saga状态机模式、Aop模式、本地事务消息、消息事务半消息
三、分布式事务解决方案
方案汇总 很明显可以看出分布式事务后续演变成2条路径
CP(一致性 分区)
放弃可用性,保证数据强一致性.
**经典方案: 12PC 23PC **
AP(可用性 分区)
暂时放弃一致性,保证可用,后续通过某种手段(比如: MQ/程序补偿)打到最终一致性性.
经典方案: 1本地消息表 2MQ消息事务 3TCC 4SAGA
XA规范
XA 规范 是 X/Open 组织定义的分布式事务处理DTPDistributed Transaction Processing标准。
XA 规范 描述了全局的事务管理器与局部的资源管理器之间的接口。 XA规范 的目的是允许的多个资源如数据库应用服务器消息队列等在同一事务中访问这样可以使 ACID 属性跨越应用程序而保持有效。
XA 规范 使用两阶段提交2PCTwo-Phase Commit协议来保证所有资源同时提交或回滚任何特定的事务。
DTP标准中包含有几个角色
AP(Application Program) : 既应用程序可以理解为使用DTP分布式事务的程序。RM(Resource Manager) : 即资源管理器可以理解为事务的参与者一般情况下是指一个数据库实例通过资源管理器对该数据库进行控制资源管理器控制着分支事务。TM(Transaction Manager) : 事务管理器负责协调和管理事务事务管理器控制着全局事务管理事务生命周期并协调各个RM。全局事务是指分布式事务处理环境中需要操作多个数据库共同完成一个工作这个工作即是一个全局事务。 XA则规范了TM与RM之间的通信接口在TM与多个RM之间形成一个双向通信桥梁从而在多个数据库资源下保证ACID四个特性。目前知名的数据库如Oracle, DB2,mysql等都是实现了XA接口的都可以作为RM。
XA是数据库的分布式事务强一致性在整个过程中数据一张锁住状态即从prepare到commit、rollback的整个过程中TM一直把持折数据库的锁如果有其他人要修改数据库的该条数据就必须等待锁的释放存在长事务风险。
XA的主要限制
必须要拿到所有数据源而且数据源还要支持XA协议。目前MySQL中只有InnoDB存储引擎支持XA协议。性能比较差要把所有涉及到的数据都要锁定是强一致性的会产生长事务。
方案12PC
2PCTwo-phase commit protocol中文叫二阶段提交。 二阶段提交是一种强一致性设计2PC 引入一个事务协调者(TM)的角色来协调管理各参与者也可称之为各本地资源RM的提交和回滚二阶段分别指的是准备和提交两个阶段。
第一阶段准备阶段
准备阶段事务协调者™会给各事务参与者(RM)发送准备命令(prepare)参与者准备成功后返回(ready) 协调者向所有参与者发送事务操作指令参与者执行除了事务提交外所有操作如参与者执行成功给协调者反馈执行成功否则反馈中止表示事务失败
第二阶段提交阶段
协调者收到各个参与者的准备消息后根据反馈情况通知各个参与者commit提交或者rollback回滚
1commit提交
当第一阶段所有参与者都反馈成功时协调者发起正式提交事务的请求当所有参与者都回复提交成功时则意味着完成事务。 协调者节点向所有参与者发出正式提交的 commit 请求。收到协调者的 commit 请求后参与者正式执行事务提交操作并释放在整个事务期间内占用的资源。参与者完成事务提交后向协调者节点发送已提交消息。协调者节点收到所有参与者节点反馈的已提交消息后完成事务。
2rollback回滚
如果任意一个参与者节点在第一阶段返回的消息为中止(或者异常)或者协调者节点在第一阶段的询问超时无法获取到全部参数者反馈那么这个事务将会被回滚。 协调者向所有参与者发出 rollback 回滚操作的请求 参与者执行事务回滚并释放在整个事务期间内占用的资源 参与者在完成事务回滚之后向协调者发送回滚完成的反馈消息 协调者收到所有参与者反馈的消息后取消事务
优缺点
缺点 性能问题执行过程中所有参与节点都是事务阻塞性的当参与者占有公共资源时其他第三方节点访问公共资源就不得不处于阻塞状态为了数据的一致性而牺牲了可用性对性能影响较大不适合高并发高性能场景 可靠性问题2PC非常依赖协调者当协调者发生故障时尤其是第二阶段那么所有的参与者就会都处于锁定事务资源的状态中而无法继续完成事务操作 数据一致性问题在阶段二中当协调者向参与者发送commit请求之后发生了局部网络异常或者在发送commit请求过程中协调者发生了故障这会导致只有一部分参与者接受到了commit请求。而在这部分参与者接到commit请求之后就会执行commit操作。但是其他部分未接到commit请求的机器则无法执行事务提交。于是整个分布式系统便出现了数据不一致性的现象。 二阶段无法解决的问题协调者在发出 commit 消息之后宕机而唯一接收到这条消息的参与者同时也宕机了那么即使协调者通过选举协议产生了新的协调者这条事务的状态也是不确定的没人知道事务是否被已经提交。
优点
尽量保证了数据的强一致适合对数据强一致要求很高的关键领域。
方案23PC
3PC三阶段提交协议是二阶段提交协议的改进版本以解决2PC存在的缺陷问题, 具体改进如下: 在协调者和参与者中都引入超时机制 引入确认机制当所有参与者能正常工作才执行事务
所以3PC分为3个阶段CanCommit 准备阶段、PreCommit 预提交阶段、DoCommit 提交阶段。 阶段一CanCommit 准备阶段
协调者向参与者发送 canCommit 请求参与者如果可以提交就返回Yes响应否则返回No响应具体流程如下
事务询问协调者向所有参与者发出包含事务内容的 canCommit 请求询问是否可以提交事务并等待所有参与者答复。响应反馈参与者收到 canCommit 请求后如果认为可以执行事务操作则反馈 yes 并进入预备状态否则反馈 no。
阶段二PreCommit
协调者根据参与者的反应情况来决定是否可以进行事务的 PreCommit 操作。根据响应情况有以下两种可能
执行事务返回都是yes
所有参与者向协调者发送了Yes响应将会执行执行事务
协调者向参与者发送 PreCommit 请求并进入准备阶段参与者接收到 PreCommit 请求后会执行本地事务操作但不提交事务参与者成功的执行了事务操作后返回ACK响应同时开始等待最终指令。 中断事务返回存在no
如果存在一个参与者向协调者发送了No响应或者等待超时之后协调者都没有接到参与者的响应那么就执行事务的中断
协调者向所有参与者发送 abort 请求。参与者收到来自协调者的 abort 请求之后或超时之后仍未收到协调者的请求执行事务的中断。 阶段三doCommit阶段
该阶段进行真正的事务提交也存在2种情况
提交事务返回都是yes
第二阶段的preCommit 请求所有参与者向协调者发送了Yes响应将会提交事务
协调接收到所有参与者发送的ACK响应那么他将从预提交状态进入到提交状态并向所有参与者发送 doCommit 请求参与者接收到doCommit请求之后执行正式的事务提交并在完成事务提交之后释放所有事务资源事务提交完之后参与者向协调者发送ack响应。协调者接收到所有参与者的ack响应之后完成事务。 中断事务返回存在no
如果存在一个参与者向协调者发送了No响应或者等待超时之后协调者都没有接到参与者的响应那么就执行事务的中断
协调者处向所有参与者发出 abort 请求参与者接收到abort请求之后马上回滚事务释放所有的事务资源。参与者完成事务回滚之后向协调者反馈ACK消息协调者接收到参与者反馈的ACK消息之后执行事务的中断。 2PC和3PC的区别
三阶段提交协议在协调者和参与者中都引入 超时机制并且把两阶段提交协议的第一个阶段拆分成了两步询问然后再锁资源最后真正提交。 在doCommit阶段如果参与者无法及时接收到来自协调者的doCommit或者abort请求时会在等待超时之后继续进行事务的提交。
相对于2PC3PC主要解决的单点故障问题并减少阻塞 因为一旦参与者无法及时收到来自协调者的信息之后他会默认执行commit。而不会一直持有事务资源并处于阻塞状态。
但是这种机制也会导致数据一致性问题因为由于网络原因协调者发送的abort响应没有及时被参与者接收到那么参与者在等待超时之后执行了commit操作。这样就和其他接到abort命令并执行回滚的参与者之间存在数据不一致的情况。
优缺点
缺点
数据不一致问题依然存在当在参与者收到 preCommit 请求后等待 doCommit 指令时此时如果协调者请求中断事务而协调者无法与参与者正常通信会导致参与者继续提交事务造成数据不一致。
优点
相比二阶段提交三阶段提交降低了阻塞范围在等待超时后协调者或参与者会中断事务。避免了协调者单点问题阶段 3 中协调者出现问题时参与者会继续提交事务。
补偿型事务
补偿模式使用一个额外的协调服务来协调各个需要保证一致性的业务服务协调服务按顺序调用各个业务微服务如果某个业务服务调用异常包括业务异常和技术异常就取消之前所有已经调用成功的业务服务。
方案3TCC
TCCTry Confirm Cancel方案是一种应用层面侵入业务的两阶段提交。是目前最火的一种分布式事务方案其核心思想是针对每个操作都要注册一个与其对应的确认和补偿撤销操作。
TCC 模型认为对于业务系统中一个特定的业务逻辑其对外提供服务时必须接受一些不确定性即对业务逻辑初步操作的调用仅是一个临时性操作调用它的主业务服务保留了后续的取消权。如果主业务服务认为全局事务应该回滚它会要求取消之前的临时性操作这就对应从业务服务的取消操作。而当主业务服务认为全局事务应该提交时它会放弃之前临时性操作的取消权这对应从业务服务的确认操作。每一个初步操作最终都会被确认或取消。
一个完整的 TCC 业务由一个主业务服务和若干个从业务服务组成主业务服务发起并完成整个业务活动TCC 模式要求从服务提供三个接口Try、Confirm、Cancel。
TCC 分布式事务模型包括三部分
主业务服务主业务服务为整个业务活动的发起方服务的编排者负责发起并完成整个业务活动。从业务服务从业务服务是整个业务活动的参与方负责提供 TCC 业务操作实现初步操作(Try)、确认操作(Confirm)、取消操作(Cancel)三个接口供主业务服务调用。业务活动管理器业务活动管理器管理控制整个业务活动包括记录维护 TCC 全局事务的事务状态和每个从业务服务的子事务状态并在业务活动提交时调用所有从业务服务的 Confirm 操作在业务活动取消时调用所有从业务服务的 Cancel 操作。 TCC 提出了一种新的事务模型基于业务层面的事务定义锁粒度完全由业务自己控制目的是解决复杂业务中跨表跨库等大颗粒度资源锁定的问题。
相对于 XA 等传统模型其特征在于它不依赖资源管理器(RM)对分布式事务的支持而是通过对业务逻辑的分解来实现分布式事务。
TCC 中会添加事务日志如果 Confirm 或者 Cancel 阶段出错则会进行重试所以这两个阶段需要支持幂等如果重试失败则需要人工介入进行恢复和处理等。
Try(尝试)
这个过程并未执行业务只是完成所有业务的一致性检查并预留好执行所需的全部资源。
以电商中订单系统为例用户下单创建订单扣库存扣款流程。假设 库存总数10购买2账户余额1000扣款200 Confirm(确认)
确认执行业务操作不做任何业务检查 只使用Try阶段预留的业务资源。通常情况下采用TCC则认为 Confirm阶段是不会出错的。即只要Try成功Confirm一定成功。若Confirm阶段真的出错了需引入重试机制或人工处理。 Cancel(取消)
取消Try阶段预留的业务资源。通常情况下采用TCC则认为Cancel阶段也是一定成功的。若Cancel阶段真的出错了需引入重试机制或人工处理。 TCC事务模型的要求
可查询操作服务操作具有全局唯一的标识操作唯一的确定的时间。幂等操作重复调用多次产生的业务结果与调用一次产生的结果相同。一是通过业务操作实现幂等性二是系统缓存所有请求与处理的结果最后是检测到重复请求之后自动返回之前的处理结果。TCC操作Try阶段尝试执行业务完成所有业务的检查实现一致性预留必须的业务资源实现准隔离性。Confirm阶段真正的去执行业务不做任何检查仅适用Try阶段预留的业务资源Confirm操作还要满足幂等性。Cancel阶段取消执行业务释放Try阶段预留的业务资源Cancel操作要满足幂等性。TCC与2PC(两阶段提交)协议的区别TCC位于业务服务层而不是资源层TCC没有单独准备阶段Try操作兼备资源操作与准备的能力TCC中Try操作可以灵活的选择业务资源锁定粒度。TCC的开发成本比2PC高。实际上TCC也属于两阶段操作但是TCC不等同于2PC操作。可补偿操作Do阶段真正的执行业务处理业务处理结果外部可见。Compensate阶段抵消或者部分撤销正向业务操作的业务结果补偿操作满足幂等性。约束补偿操作在业务上可行由于业务执行结果未隔离或者补偿不完整带来的风险与成本可控。实际上TCC的Confirm和Cancel操作可以看做是补偿操作。
优缺点
优点
性能提升具体业务来实现控制资源锁的粒度变小不会锁定整个资源。数据最终一致性基于 Confirm 和 Cancel 的幂等性保证事务最终完成确认或者取消保证数据的一致性。可靠性解决了 XA 协议的协调者单点故障问题由主业务方发起并控制整个业务活动业务活动管理器也变成多点引入集群
缺点
TCC 的 Try、Confirm 和 Cancel 操作功能要按具体业务来实现业务耦合度较高提高了开发成本。
TCC与2PC对比
TCC与XA两阶段提交有着异曲同工之妙下图列出了二者之间的对比 在阶段1
在XA中各个RM准备提交各自的事务分支事实上就是准备提交资源的更新操作(insert、delete、update等)而在TCC中是主业务活动请求(try)各个从业务服务预留资源。
在阶段2
XA根据第一阶段每个RM是否都prepare成功判断是要提交还是回滚。如果都prepare成功那么就commit每个事务分支反之则rollback每个事务分支。TCC中如果在第一阶段所有业务资源都预留成功那么confirm各个从业务服务否则取消(cancel)所有从业务服务的资源预留请求。
方案4SAGA
概念
Saga是分布式事务领域最有名气的解决方案之一最初出现在1987年Hector Garcaa-Molrna Kenneth Salem发表的论文SAGAS里。
Saga是由一系列的本地事务构成。每一个本地事务在更新完数据库之后会发布一条消息或者一个事件来触发Saga中的下一个本地事务的执行。如果一个本地事务因为某些业务规则无法满足而失败Saga会执行在这个失败的事务之前成功提交的所有事务的补偿操作。
Saga的实现有很多种方式其中最流行的两种方式是 命令协调Order Orchestrator这种方式的工作形式就像一只乐队由一个指挥家协调中心来协调大家的工作。协调中心来告诉Saga的参与方应该执行哪一个本地事务。 事件编排Event Choreographyo这种方式没有协调中心整个模式的工作方式就像舞蹈一样各个舞蹈演员按照预先编排的动作和走位各自表演最终形成一只舞蹈。处于当前Saga下的各个服务会产生某类事件或者监听其它服务产生的事件并决定是否需要针对监听到的事件做出响应。
命令协调
中央协调器Orchestrator简称 OSO以命令/回复的方式与每项服务进行通信全权负责告诉每个参与者该做什么以及什么时候该做什么。 主业务接口发起事务业务开启订单事务Saga协调器库存服务请求扣减库存库存服务操作后回复处理结果。Saga协调器账户服务请求扣减余额账户服务操作后回复处理结果。处理结果。Saga协调器订单服务请求创建订单订单服务操作后回复主业务逻辑接收并处理Saga协调器事务处理结果回复。
中央协调器 OSO 必须事先知道执行整个事务所需的流程如果有任何失败它还负责通过向每个参与者发送命令来撤销之前的操作来协调分布式的回滚基于中央协调器协调一切时回滚要容易得多因为协调器默认是执行正向流程回滚时只要执行反向流程即可。
执行顺序 A–B–C 回滚顺序 C–B—A
事件编排
在基于事件的方式中第一个服务执行完本地事务之后会产生一个事件。其它服务会监听这个事件触发该服务本地事务的执行并产生新的事件。当最后一个服务执行本地事务并且不发布任何事件时意味着分布式事务结束或者它发布的事件没有被任何 Saga 参与者听到都意味着事务结束。 主业务接口发布下单事件。库存服务监听下单事件扣减库存并发布库存已扣减事件。账户服务监听已扣减库存事件扣减余额并发已扣减余额事件。订单服务监听已扣减余额事件创建订单并发布下单成功事件。主业务逻辑监听下单成功事件后执行后续处理。
异常恢复
前面讲到saga模式在本地事务因为某些业务规则无法满足而失败Saga会执行在这个失败的事务之前成功提交的所有事务的补偿操作。
上面意思可以理解为saga模式下每个事务参与者提供一对接口一个做正常事务操作一个做异常事务回滚操作。比如支付与退款扣款与回补等。 saga支持事务恢复策略
向后恢复(backward recovery)
当执行事务失败时补偿所有已完成的事务是“一退到底”的方式这种做法的效果是撤销掉之前所有成功的子事务使得整个 Saga 的执行结果撤销。 从上图可知事务执行到了支付事务T3但是失败了因此事务回滚需要从C3,C2,C1依次进行回滚补偿对应的执行顺序为T1,T2,T3,C3,C2,C1。
向前恢复(forward recovery)
对于执行不通过的事务会尝试重试事务这里有一个假设就是每个子事务最终都会成功这种方式适用于必须要成功的场景事务失败了重试不需要补偿。 命令VS事件
命令协调设计
优点
服务之间关系简单避免服务间循环依赖因为 Saga 协调器会调用 Saga 参与者但参与者不会调用协调器。程序开发简单只需要执行命令/回复(其实回复消息也是一种事件消息)降低参与者的复杂性。易维护扩展在添加新步骤时事务复杂性保持线性回滚更容易管理更容易实施和测试。
缺点
中央协调器处理逻辑容易变得庞大复杂导致难以维护。存在协调器单点故障风险。
事件编排设计
优点
避免中央协调器单点故障风险。当涉及的步骤较少服务开发简单容易实现。
缺点
服务之间存在循环依赖的风险。当涉及的步骤较多服务间关系混乱难以追踪调测。
命令协调方式与事件编排方式2者怎么选择
系统复杂性如果系统的业务逻辑复杂事务需要严格控制和编排命令方式可以提供更好的可见性和可控性。系统扩展性如果系统需要频繁扩展和修改需要一定的灵活性事件方式可以提供解耦和扩展性更好的架构。性能需求如果需要更好的性能和可伸缩性并行执行事务的各个步骤事件方式更适合。异步需求如果系统需要异步处理和解耦事件方式提供了更好的可行性。
Saga和TCC对比
Saga和TCC都是补偿型事务他们的区别为
劣势
都无法保证隔离性
优势
都在一阶段提交本地事务无锁高性能都是事件驱动模式参与者可异步执行高吞吐Saga 对业务侵入较小只需要提供一个逆向操作的Cancel即可而TCC需要对业务进行全局性的流程改造
通知型事务
通知型事务的主流实现是通过MQ消息队列来通知其他事务参与者自己事务的执行状态引入MQ组件有效的将事务参与者进行解耦各参与者都可以异步执行所以通知型事务又被称为异步事务。
通知型事务主要适用于那些需要异步更新数据并且对数据的实时性要求较低的场景主要包含:
异步确保型事务主要适用于内部系统的数据最终一致性保障因为内部相对比较可控如订单和购物车、收货与清算、支付与结算等等场景最大努力通知主要用于外部系统因为外部的网络环境更加复杂和不可信所以只能尽最大努力去通知实现数据最终一致性比如充值平台与运营商、支付对接等等跨网络系统级别对接
方案5本地消息表
操作流程
本地消息表的核心思路就是将分布式事务拆分成本地事务进行处理在该方案中主要有两种角色事务主动方和事务被动方。事务主动发起方需要额外新建事务消息表并在本地事务中完成业务处理和记录事务消息并轮询事务消息表的数据发送事务消息事务被动方基于消息中间件消费事务消息表中的事务。 操作步骤
发生分布式事务操作时 事务主动方在DB1中的操作业务表 记录事务信息在消息表中状态为未处理事务主动方向消息中间件推送一个事务操作消息并通知事务被动方处理事务消息。事务被动方监控消息中间件读取事务消息完成DB2中业务操作往消息中间件返回ack事务主动方监控消息中间件读取事务消息更新消息表状态为已处理
异常情况处理
当1处理出错事务主动方在本地事务中直接回滚就行。
当2处理出错由于DB1中还是保存事务消息可以设置轮询逻辑将消息重新推送给消息中间件在通知事务被动方。
当3处理出错重复获取消息重复执行即可。
如果是业务上处理失败事务被动方可以发消息给事务主动方回滚事务
如果事务被动方已经消费了消息事务主动方需要回滚事务的话需要发消息通知事务主动方进行回滚事务。
优缺点
优点
从应用设计开发的角度实现了消息数据的可靠性消息数据的可靠性不依赖于消息中间件弱化了对 MQ 中间件特性的依赖。方案轻量容易实现。
缺点
与具体的业务场景绑定耦合性强不可公用消息数据与业务数据同库占用业务系统资源业务系统在使用关系型数据库的情况下消息服务性能会受到关系型数据库并发性能的局限
方案6MQ消息事务
操作流程
基于MQ的事务消息方案主要依靠MQ的半消息机制来实现投递消息和参与者自身本地事务的一致性保障。半消息机制实现原理其实借鉴的2PC的思路是二阶段提交的广义拓展。 半消息在原有队列消息执行后的逻辑如果后面的本地逻辑出错则不发送该消息如果通过则告知MQ发送 流程 事务发起方首先发送半消息到MQMQ通知发送方消息发送成功在发送半消息成功后执行本地事务根据本地事务执行结果返回commit或者是rollback如果消息是rollback, MQ将丢弃该消息不投递如果是commitMQ将会消息发送给消息订阅方订阅方根据消息执行本地事务订阅方执行本地事务成功后再从MQ中将该消息标记为已消费如果执行本地事务过程中执行端挂掉或者超时MQ服务器端将不停的询问producer来获取事务状态Consumer端的消费成功机制有MQ保证
可以看到该事务形态过程简单性能消耗小发起方与跟随方之间的流量峰谷可以使用队列填平同时业务开发工作量也基本与单机事务没有差别都不需要编写反向的业务逻辑过程
因此基于消息队列实现的事务是我们除了单机事务外最优先考虑使用的形态。
基于 RocketMQ实现MQ异步确保型事务
有一些第三方的MQ是支持事务消息的这些消息队列支持半消息机制比如RocketMQActiveMQ。但是有一些常用的MQ也不支持事务消息比如 RabbitMQ 和 Kafka 都不支持。
以阿里的 RocketMQ 中间件为例其思路大致为
1.producer(本例中指A系统)发送半消息到broker这个半消息不是说消息内容不完整 它包含完整的消息内容 在producer端和普通消息的发送逻辑一致
2.broker存储半消息半消息存储逻辑与普通消息一致只是属性有所不同topic是固定的RMQ_SYS_TRANS_HALF_TOPICqueueId也是固定为0这个tiopic中的消息对消费者是不可见的所以里面的消息永远不会被消费。这就保证了在半消息提交成功之前消费者是消费不到这个半消息的
3.broker端半消息存储成功并返回后A系统执行本地事务并根据本地事务的执行结果来决定半消息的提交状态为提交或者回滚
4.A系统发送结束半消息的请求并带上提交状态(提交 or 回滚)
5.broker端收到请求后首先从RMQ_SYS_TRANS_HALF_TOPIC的queue中查出该消息设置为完成状态。如果消息状态为提交则把半消息从RMQ_SYS_TRANS_HALF_TOPIC队列中复制到这个消息原始topic的queue中去(之后这条消息就能被正常消费了)如果消息状态为回滚则什么也不做。
6.producer发送的半消息结束请求是 oneway 的也就是发送后就不管了只靠这个是无法保证半消息一定被提交的rocketMq提供了一个兜底方案这个方案叫消息反查机制Broker启动时会启动一个TransactionalMessageCheckService 任务该任务会定时从半消息队列中读出所有超时未完成的半消息针对每条未完成的消息Broker会给对应的Producer发送一个消息反查请求根据反查结果来决定这个半消息是需要提交还是回滚或者后面继续来反查
7.consumer(本例中指B系统)消费消息执行本地数据变更(至于B是否能消费成功消费失败是否重试这属于正常消息消费需要考虑的问题) 在rocketMq中不论是producer收到broker存储半消息成功返回后执行本地事务还是broker向producer反查消息状态都是通过回调机制完成。
优缺点
优点
消息数据独立存储 降低业务系统与消息系统之间的耦合吞吐量大于使用本地消息表方案
缺点
一次消息发送需要两次网络请求(half 消息 commit/rollback 消息) 。业务处理服务需要实现消息状态回查接口。
MQ事务消息 VS 本地消息表
二者的共性 1、 事务消息都依赖MQ进行事务通知所以都是异步的。 2、 事务消息在投递方都是存在重复投递的可能需要有配套的机制去降低重复投递率实现更友好的消息投递去重。 3、 事务消息的消费方因为投递重复的无法避免因此需要进行消费去重设计或者服务幂等设计。
二者的区别
MQ事务消息
需要MQ支持半消息机制或者类似特性在重复投递上具有比较好的去重处理具有比较大的业务侵入性需要业务方进行改造提供对应的本地操作成功的回查功能
DB本地消息表
使用了数据库来存储事务消息降低了对MQ的要求但是增加了存储成本事务消息使用了异步投递增大了消息重复投递的可能性
方案7最大努力通知
最大努力通知也称为定期校对是对MQ事务方案的进一步优化。它在事务主动方增加了消息校对的接口如果事务被动方没有接收到主动方发送的消息此时可以调用事务主动方提供的消息校对的接口主动获取。 在可靠消息事务中事务主动方需要将消息发送出去并且让接收方成功接收消息这种可靠性发送是由事务主动方保证的但是最大努力通知事务主动方仅仅是尽最大努力重试轮询…将事务发送给事务接收方所以存在事务被动方接收不到消息的情况此时需要事务被动方主动调用事务主动方的消息校对接口查询业务消息并消费这种通知的可靠性是由事务被动方保证的。
方案选择
属性2PC/3PCTCCSaga本地消息表尽最大努力通知(MQ)事务一致性强弱弱弱弱复杂性中高中低低业务侵入性小大小中中使用局限性大大中小中性能低中高高高维护成本低高中低中
2PC/3PC依赖于数据库能够很好的提供强一致性和强事务性但延迟比较高比较适合传统的单体应用在同一个方法中存在跨库操作的情况不适合高并发和高性能要求的场景。TCC适用于执行时间确定且较短实时性要求高对数据一致性要求高比如互联网金融企业最核心的三个服务交易、支付、账务。本地消息表/MQ 事务适用于事务中参与方支持操作幂等对一致性要求不高业务上能容忍数据不一致到一个人工检查周期事务涉及的参与方、参与环节较少业务上有对账/校验系统兜底 性能高。Saga 事务由于 Saga 事务不能保证隔离性需要在业务层控制并发适合于业务场景事务并发操作同一资源较少的情况。Saga 由于缺少预提交动作导致补偿动作的实现比较麻烦例如业务是发送短信补偿动作则得再发送一次短信说明撤销用户体验比较差。所以Saga 事务较适用于补偿动作容易处理的场景
四、分布式事务最佳实践-Seata
简介
官网http://seata.io/zh-cn/
源码https://github.com/seata/seata
Seata 是一款开源的分布式事务解决方案致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式为用户打造一站式的分布式解决方案。 常见术语
TC (Transaction Coordinator) - 事务协调者
维护全局和分支事务的状态驱动全局事务提交或回滚。
TM (Transaction Manager) - 事务管理器
定义全局事务的范围开始全局事务、提交或回滚全局事务。
RM (Resource Manager) - 资源管理器
管理分支事务处理的资源与TC交谈以注册分支事务和报告分支事务的状态并驱动分支事务提交或回滚。
项目准备
需求用户下单扣款扣库存。 根据上面分析项目设计出3个微服务
业务服务business-service
订单服务order-service
账户服务account-service
库存服务stock-service
代码如下 数据库准备
创建3个数据库与3张表
seata-account
CREATE TABLE t_account
(id int(11) NOT NULL AUTO_INCREMENT,user_id varchar(255) DEFAULT NULL,money int(11) DEFAULT 0,PRIMARY KEY (id)
) ENGINEInnoDB DEFAULT CHARSETutf8;INSERT INTO t_account VALUES (1, U100000, 900);seata-order
CREATE TABLE t_order
(id int(11) NOT NULL AUTO_INCREMENT,user_id varchar(255) DEFAULT NULL,commodity_code varchar(255) DEFAULT NULL,count int(11) DEFAULT 0,money int(11) DEFAULT 0,PRIMARY KEY (id)
) ENGINEInnoDB DEFAULT CHARSETutf8;seata-stock
CREATE TABLE t_stock
(id int(11) NOT NULL AUTO_INCREMENT,commodity_code varchar(255) DEFAULT NULL,count int(11) DEFAULT 0,PRIMARY KEY (id),UNIQUE KEY (commodity_code)
) ENGINEInnoDB DEFAULT CHARSETutf8;INSERT INTO t_stock VALUES (1, C100000, 10);库存服务
stock-service
依赖
dependencies!-- bootstrap 启动器 --dependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-starter-bootstrap/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactId/dependencydependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion1.2.56/version/dependencydependencygroupIdcom.baomidou/groupIdartifactIdmybatis-plus-boot-starter/artifactIdversion3.4.2/version/dependency!--nacos客户端--dependencygroupIdcom.alibaba.cloud/groupIdartifactIdspring-cloud-starter-alibaba-nacos-discovery/artifactId/dependency!--fegin组件--dependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-starter-openfeign/artifactId/dependency!--sentinel组件--dependencygroupIdcom.alibaba.cloud/groupIdartifactIdspring-cloud-starter-alibaba-sentinel/artifactId/dependencydependencygroupIdcom.alibaba.cloud/groupIdartifactIdspring-cloud-starter-alibaba-nacos-config/artifactId/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactId/dependencydependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-loadbalancer/artifactId/dependency
/dependencies配置文件
# Tomcat
server:port: 8083
# Spring
spring:application:# 应用名称name: stock-serviceprofiles:# 环境配置active: devcloud:nacos:discovery:# 服务注册地址server-addr: 127.0.0.1:8848datasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql:///seata-stock?serverTimezoneUTCuseUnicodetruecharacterEncodingutf-8useSSLtrueusername: rootpassword: 123456
mybatis-plus:configuration:log-impl: org.apache.ibatis.logging.stdout.StdOutImpldomain
package cn.wolfcode.tx.stock.domain;import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;Data
TableName(t_stock)
public class Stock {TableId(type IdType.AUTO)private Integer id;private String commodityCode;private Integer count;
}
mapper
package cn.wolfcode.tx.stock.mapper;import cn.wolfcode.tx.stock.domain.Stock;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;public interface StockMapper extends BaseMapperStock {
}
service
package cn.wolfcode.tx.stock.service;import cn.wolfcode.tx.stock.domain.Stock;
import com.baomidou.mybatisplus.extension.service.IService;public interface IStockService extends IServiceStock {/*** 扣库存* param commodityCode* param count*/void deduct(String commodityCode, int count);
}
service.impl
package cn.wolfcode.tx.stock.service.impl;import cn.wolfcode.tx.stock.domain.Stock;
import cn.wolfcode.tx.stock.mapper.StockMapper;
import cn.wolfcode.tx.stock.service.IStockService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import io.seata.core.context.RootContext;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;Service
public class StockServiceImpl extends ServiceImplStockMapper, Stock implements IStockService {OverrideTransactionalpublic void deduct(String commodityCode, int count) {Stock one lambdaQuery().eq(Stock::getCommodityCode, commodityCode).one();if(one ! null one.getCount() count){throw new RuntimeException(Not Enough Count ...);}lambdaUpdate().setSql(count count- count).eq(Stock::getCommodityCode, commodityCode).update();}
}
controller
package cn.wolfcode.tx.stock.controller;import cn.wolfcode.tx.stock.service.IStockService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;RestController
RequestMapping(stocks)
public class StockController {Autowiredprivate IStockService StockService;Autowiredprivate IStockService stockService;GetMapping(value /deduct)public String deduct(String commodityCode, int count) {try {stockService.deduct(commodityCode, count);} catch (Exception exx) {exx.printStackTrace();return FAIL;}return SUCCESS;}
}
启动类
package cn.wolfcode.tx;import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.openfeign.EnableFeignClients;SpringBootApplication
EnableDiscoveryClient
EnableFeignClients
MapperScan(cn.wolfcode.tx.stock.mapper)
public class StockApplication {public static void main(String[] args) {SpringApplication.run(StockApplication.class, args);}
}
账户服务
account-service
依赖
dependencies!-- bootstrap 启动器 --dependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-starter-bootstrap/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactId/dependencydependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion1.2.56/version/dependencydependencygroupIdcom.baomidou/groupIdartifactIdmybatis-plus-boot-starter/artifactIdversion3.4.2/version/dependency!--nacos客户端--dependencygroupIdcom.alibaba.cloud/groupIdartifactIdspring-cloud-starter-alibaba-nacos-discovery/artifactId/dependency!--fegin组件--dependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-starter-openfeign/artifactId/dependency!--sentinel组件--dependencygroupIdcom.alibaba.cloud/groupIdartifactIdspring-cloud-starter-alibaba-sentinel/artifactId/dependencydependencygroupIdcom.alibaba.cloud/groupIdartifactIdspring-cloud-starter-alibaba-nacos-config/artifactId/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactId/dependency
/dependencies配置文件
# Tomcat
server:port: 8081
# Spring
spring:application:# 应用名称name: account-serviceprofiles:# 环境配置active: devcloud:nacos:discovery:# 服务注册地址server-addr: 127.0.0.1:8848datasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql:///seata-account?serverTimezoneUTCuseUnicodetruecharacterEncodingutf-8useSSLtrueusername: rootpassword: 123456
mybatis-plus:configuration:log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
domain
package cn.wolfcode.tx.account.domain;import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;Data
TableName(t_account)
public class Account {TableId(type IdType.AUTO)private Integer id;private String userId;private int money;
}
mapper
package cn.wolfcode.tx.account.mapper;import cn.wolfcode.tx.account.domain.Account;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;public interface AccountMapper extends BaseMapperAccount {
}
service
package cn.wolfcode.tx.account.service;import cn.wolfcode.tx.account.domain.Account;
import com.baomidou.mybatisplus.extension.service.IService;public interface IAccountService extends IServiceAccount {/*** 账户扣款* param userId* param money* return*/void reduce(String userId, int money);
}
service.impl
package cn.wolfcode.tx.account.service.impl;import cn.wolfcode.tx.account.domain.Account;
import cn.wolfcode.tx.account.mapper.AccountMapper;
import cn.wolfcode.tx.account.service.IAccountService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import io.seata.core.context.RootContext;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;Service
public class AccountServiceImpl extends ServiceImplAccountMapper, Account implements IAccountService {OverrideTransactionalpublic void reduce(String userId, int money) {Account one lambdaQuery().eq(Account::getUserId, userId).one();if(one ! null one.getMoney() money){throw new RuntimeException(Not Enough Money ...);}lambdaUpdate().setSql(money money - money).eq(Account::getUserId, userId).update();}
}
controller
package cn.wolfcode.tx.account.controller;import cn.wolfcode.tx.account.domain.Account;
import cn.wolfcode.tx.account.service.IAccountService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;RestController
RequestMapping(accounts)
public class AccountController {Autowiredprivate IAccountService accountService;GetMapping(value /reduce)public String reduce(String userId, int money) {try {accountService.reduce(userId, money);} catch (Exception exx) {exx.printStackTrace();return FAIL;}return SUCCESS;}}
启动类
package cn.wolfcode.tx;import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.openfeign.EnableFeignClients;SpringBootApplication
MapperScan(cn.wolfcode.tx.account.mapper)
EnableDiscoveryClient
EnableFeignClients
public class AccountApplication {public static void main(String[] args) {SpringApplication.run(AccountApplication.class, args);}
}
订单服务
order-service
依赖
dependencies!-- bootstrap 启动器 --dependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-starter-bootstrap/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactId/dependencydependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion1.2.56/version/dependencydependencygroupIdcom.baomidou/groupIdartifactIdmybatis-plus-boot-starter/artifactIdversion3.4.2/version/dependency!--nacos客户端--dependencygroupIdcom.alibaba.cloud/groupIdartifactIdspring-cloud-starter-alibaba-nacos-discovery/artifactId/dependency!--fegin组件--dependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-starter-openfeign/artifactId/dependency!--sentinel组件--dependencygroupIdcom.alibaba.cloud/groupIdartifactIdspring-cloud-starter-alibaba-sentinel/artifactId/dependencydependencygroupIdcom.alibaba.cloud/groupIdartifactIdspring-cloud-starter-alibaba-nacos-config/artifactId/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactId/dependencydependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-loadbalancer/artifactId/dependency
/dependencies配置文件
# Tomcat
server:port: 8082
# Spring
spring:application:# 应用名称name: order-serviceprofiles:# 环境配置active: devcloud:nacos:discovery:# 服务注册地址server-addr: 127.0.0.1:8848datasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql:///seata-order?serverTimezoneUTCuseUnicodetruecharacterEncodingutf-8useSSLtrueusername: rootpassword: 123456mybatis-plus:configuration:log-impl: org.apache.ibatis.logging.stdout.StdOutImpldomain
package cn.wolfcode.tx.order.domain;import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;Data
TableName(t_order)
public class Order {TableId(type IdType.AUTO)private Integer id;private String userId;private String commodityCode;private Integer count;private Integer money;
}
mapper
package cn.wolfcode.tx.order.mapper;import cn.wolfcode.tx.order.domain.Order;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;public interface OrderMapper extends BaseMapperOrder {
}
service
package cn.wolfcode.tx.order.service;import cn.wolfcode.tx.order.domain.Order;
import com.baomidou.mybatisplus.extension.service.IService;public interface IOrderService extends IServiceOrder {/*** 创建订单*/void create(String userId, String commodityCode, int orderCount);
}
service.impl
package cn.wolfcode.tx.order.service.impl;import cn.wolfcode.tx.order.domain.Order;
import cn.wolfcode.tx.order.feign.AccountFeignClient;
import cn.wolfcode.tx.order.mapper.OrderMapper;
import cn.wolfcode.tx.order.service.IOrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import io.seata.core.context.RootContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;Service
Transactional
public class OrderServiceImpl extends ServiceImplOrderMapper, Order implements IOrderService {Autowiredprivate AccountFeignClient accountFeignClient;OverrideTransactionalpublic void create(String userId, String commodityCode, int count) {// 定单总价 订购数量(count) * 商品单价(100)int orderMoney count * 100;// 生成订单Order order new Order();order.setCount(count);order.setCommodityCode(commodityCode);order.setUserId(userId);order.setMoney(orderMoney);super.save(order);// 调用账户余额扣减String result accountFeignClient.reduce(userId, orderMoney);if (!SUCCESS.equals(result)) {throw new RuntimeException(Failed to call Account Service. );}}
}
controller
package cn.wolfcode.tx.order.controller;import cn.wolfcode.tx.order.domain.Order;
import cn.wolfcode.tx.order.service.IOrderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;RestController
RequestMapping(orders)
public class OrderController {Autowiredprivate IOrderService orderService;GetMapping(value /create)public String create(String userId, String commodityCode, int orderCount) {try {orderService.create(userId, commodityCode, orderCount);} catch (Exception exx) {exx.printStackTrace();return FAIL;}return SUCCESS;}
}
Feign接口
package cn.wolfcode.tx.order.feign;import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;FeignClient(name account-service)
public interface AccountFeignClient {GetMapping(/accounts/reduce)String reduce(RequestParam(userId) String userId, RequestParam(money) int money);
}
启动类
package cn.wolfcode.tx;import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.openfeign.EnableFeignClients;SpringBootApplication
MapperScan(cn.wolfcode.tx.order.mapper)
EnableDiscoveryClient
EnableFeignClients
public class OrderApplication {public static void main(String[] args) {SpringApplication.run(OrderApplication.class, args);}
}
业务服务
business-service
依赖
dependencies!-- bootstrap 启动器 --dependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-starter-bootstrap/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion1.2.56/version/dependency!--nacos客户端--dependencygroupIdcom.alibaba.cloud/groupIdartifactIdspring-cloud-starter-alibaba-nacos-discovery/artifactId/dependency!--fegin组件--dependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-starter-openfeign/artifactId/dependency!--sentinel组件--dependencygroupIdcom.alibaba.cloud/groupIdartifactIdspring-cloud-starter-alibaba-sentinel/artifactId/dependencydependencygroupIdcom.alibaba.cloud/groupIdartifactIdspring-cloud-starter-alibaba-nacos-config/artifactId/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactId/dependencydependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-loadbalancer/artifactId/dependency
/dependencies配置文件
# Tomcat
server:port: 8088
# Spring
spring:application:# 应用名称name: business-serviceprofiles:# 环境配置active: devcloud:nacos:discovery:# 服务注册地址server-addr: 127.0.0.1:8848
测试数据
package cn.wolfcode.tx.business;public class TestDatas {public static final String USER_ID U100000;public static final String COMMODITY_CODE C100000;
}
service
package cn.wolfcode.tx.business.service;public interface IBusinessService{void purchase(String userId, String commodityCode, int orderCount, boolean rollback);
}
service.impl
package cn.wolfcode.tx.business.service.impl;import cn.wolfcode.tx.business.feign.OrderFeignClient;
import cn.wolfcode.tx.business.feign.StockFeignClient;
import cn.wolfcode.tx.business.service.IBusinessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;Service
public class BusinessServiceImpl implements IBusinessService {private static final Logger LOGGER LoggerFactory.getLogger(BusinessServiceImpl.class);Autowiredprivate StockFeignClient stockFeignClient;Autowiredprivate OrderFeignClient orderFeignClient;Overridepublic void purchase(String userId, String commodityCode, int orderCount, boolean rollback) {String result stockFeignClient.deduct(commodityCode, orderCount);if (!SUCCESS.equals(result)) {throw new RuntimeException(库存服务调用失败,事务回滚!);}result orderFeignClient.create(userId, commodityCode, orderCount);if (!SUCCESS.equals(result)) {throw new RuntimeException(订单服务调用失败,事务回滚!);}if (rollback) {throw new RuntimeException(Force rollback ... );}}
}
feign接口
package cn.wolfcode.tx.business.feign;import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;FeignClient(name order-service)
public interface OrderFeignClient {GetMapping(/orders/create)String create(RequestParam(userId) String userId, RequestParam(commodityCode) String commodityCode,RequestParam(orderCount) int orderCount);}
package cn.wolfcode.tx.business.feign;import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;FeignClient(name stock-service)
public interface StockFeignClient {GetMapping(/stocks/deduct)String deduct(RequestParam(commodityCode) String commodityCode, RequestParam(count) int count);}
controller
package cn.wolfcode.tx.business.controller;import cn.wolfcode.tx.business.TestDatas;
import cn.wolfcode.tx.business.service.IBusinessService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;RestController
RequestMapping(businesses)
public class BusinessController {Autowiredprivate IBusinessService businessService;GetMapping(value /purchase)public String purchase(Boolean rollback, Integer count) {int orderCount 10;if (count ! null) {orderCount count;}try {businessService.purchase(TestDatas.USER_ID, TestDatas.COMMODITY_CODE, orderCount,rollback null ? false : rollback.booleanValue());} catch (Exception exx) {return Purchase Failed: exx.getMessage();}return SUCCESS;}
}
启动类
package cn.wolfcode.tx;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.openfeign.EnableFeignClients;SpringBootApplication
EnableDiscoveryClient
EnableFeignClients
public class BusinessApplication {public static void main(String[] args) {SpringApplication.run(BusinessApplication.class, args);}
}测试
启动nacos, 启动4个服务,
访问: http://localhost:8088/businesses/purchase?rollbackfalsecount10
Seata配置
下载
官网https://github.com/seata/seata/releases/tag/v1.7.0
修改配置文件
1.5.0之前的版本配置文件是有多个的都位于conf文件夹下如file.conf,registry,conf等。在1.5.0版本之后都整合到一个配置文件里了即application.yml。以下配置项请按照自己版本查找修改。
以seata-1.7.0为例打开conf/application.yml进行修改重点修改nacos部分配置。
其中的application.example.yml 各种注册中心配置中心配置方式默认是配置本地这里以配置在nacos为例子。
server:port: 7091spring:application:name: seata-serverlogging:config: classpath:logback-spring.xmlfile:path: ${user.home}/logs/seataextend:logstash-appender:destination: 127.0.0.1:4560kafka-appender:bootstrap-servers: 127.0.0.1:9092topic: logback_to_logstashconsole:user:username: seatapassword: seata
seata:config:# support: nacos, consul, apollo, zk, etcd3type: nacosnacos:server-addr: 127.0.0.1:8848namespace:group: SEATA_GROUPusername:password:context-path:##if use MSE Nacos with auth, mutex with username/password attribute#access-key:#secret-key:data-id: seataServer.propertiesregistry:# support: nacos, eureka, redis, zk, consul, etcd3, sofatype: nacosnacos:application: seata-serverserver-addr: 127.0.0.1:8848group: SEATA_GROUPnamespace:cluster: defaultusername:password:context-path:##if use MSE Nacos with auth, mutex with username/password attribute#access-key:#secret-key:store:# support: file 、 db 、 redismode: file
# server:
# service-port: 8091 #If not configured, the default is ${server.port} 1000security:secretKey: SeataSecretKey0c382ef121d778043159209298fd40bf3850a017tokenValidityInMilliseconds: 1800000ignore:urls: /,/**/*.css,/**/*.js,/**/*.html,/**/*.map,/**/*.svg,/**/*.png,/**/*.jpeg,/**/*.ico,/api/v1/auth/login修改成功后意味着seata将从nacos获取配置信息同时注册自身服务到nacos中心。
nacos配置
上面配置项中有一项seata.config.data-idseataServer.properties意思为要读nacos上的seataServer.properties配置文件接下来去Nacos创建该配置文件注意Group与第2步中的保持一致这里是SEATA_GROUP。
配置内容从seata-server-1.7.0/seata/script/config-center/config.txt粘贴修改而来这里只使用对我们有用的配置主要是数据库配置信息。
#Transaction storage configuration, only for the server.
store.modedb
store.lock.modedb
store.session.modedb#These configurations are required if the store mode is db.
store.db.datasourcedruid
store.db.dbTypemysql
store.db.driverClassNamecom.mysql.cj.jdbc.Driver
store.db.urljdbc:mysql://127.0.0.1:3306/seata?useSSLfalseuseUnicodetruerewriteBatchedStatementstrue
store.db.userroot
store.db.passwordadmin
store.db.minConn5
store.db.maxConn30
store.db.globalTableglobal_table
store.db.branchTablebranch_table
store.db.distributedLockTabledistributed_lock
store.db.queryLimit100
store.db.lockTablelock_table
store.db.maxWait5000数据库建表
在seata数据库内执行seata-server-1.7.0/seata/script/server/db目录下的sql脚本根据数据库类型创建服务端所需的表。此处选择mysql
-- -------------------------------- The script used when storeMode is db --------------------------------
-- the table to store GlobalSession data
CREATE TABLE IF NOT EXISTS global_table
(xid VARCHAR(128) NOT NULL,transaction_id BIGINT,status TINYINT NOT NULL,application_id VARCHAR(32),transaction_service_group VARCHAR(32),transaction_name VARCHAR(128),timeout INT,begin_time BIGINT,application_data VARCHAR(2000),gmt_create DATETIME,gmt_modified DATETIME,PRIMARY KEY (xid),KEY idx_status_gmt_modified (status , gmt_modified),KEY idx_transaction_id (transaction_id)
) ENGINE InnoDBDEFAULT CHARSET utf8mb4;-- the table to store BranchSession data
CREATE TABLE IF NOT EXISTS branch_table
(branch_id BIGINT NOT NULL,xid VARCHAR(128) NOT NULL,transaction_id BIGINT,resource_group_id VARCHAR(32),resource_id VARCHAR(256),branch_type VARCHAR(8),status TINYINT,client_id VARCHAR(64),application_data VARCHAR(2000),gmt_create DATETIME(6),gmt_modified DATETIME(6),PRIMARY KEY (branch_id),KEY idx_xid (xid)
) ENGINE InnoDBDEFAULT CHARSET utf8mb4;-- the table to store lock data
CREATE TABLE IF NOT EXISTS lock_table
(row_key VARCHAR(128) NOT NULL,xid VARCHAR(128),transaction_id BIGINT,branch_id BIGINT NOT NULL,resource_id VARCHAR(256),table_name VARCHAR(32),pk VARCHAR(36),status TINYINT NOT NULL DEFAULT 0 COMMENT 0:locked ,1:rollbacking,gmt_create DATETIME,gmt_modified DATETIME,PRIMARY KEY (row_key),KEY idx_status (status),KEY idx_branch_id (branch_id),KEY idx_xid (xid)
) ENGINE InnoDBDEFAULT CHARSET utf8mb4;CREATE TABLE IF NOT EXISTS distributed_lock
(lock_key CHAR(20) NOT NULL,lock_value VARCHAR(20) NOT NULL,expire BIGINT,primary key (lock_key)
) ENGINE InnoDBDEFAULT CHARSET utf8mb4;INSERT INTO distributed_lock (lock_key, lock_value, expire) VALUES (AsyncCommitting, , 0);
INSERT INTO distributed_lock (lock_key, lock_value, expire) VALUES (RetryCommitting, , 0);
INSERT INTO distributed_lock (lock_key, lock_value, expire) VALUES (RetryRollbacking, , 0);
INSERT INTO distributed_lock (lock_key, lock_value, expire) VALUES (TxTimeoutCheck, , 0);启动seata-server
运行bin下的bat脚本启动服务。
访问http://127.0.0.1:7091
默认账号与密码都是seata
Seata使用XA模式
原理
seata 的XA模式相对于2PC做了一些调整。首先seata 的XA模式增加TC-事务协调者这个角色用来维护全局和分支事务的状态驱动全局事务提交或回滚。TC的作用相当于实际执行TM指令相当于一个”秘书“。
流程如下 第一阶段
1注册全局事务
2调用RM事务接口注册分支事务
3执行RM事务操作不提交
4往TC报告事务状态
第二阶段
1所有RM执行完本地事务TM发起全局事务提交/回滚
2TC检查所有RM事务状态yes or no?
全部yes通知所有RM提交事务
存在no通知所有RM回滚事务
代码
项目集成seata
依赖
所有微服务导入seata依赖
!-- 注意一定要引入对版本要引入spring-cloud版本seata而不是springboot版本的seata--
dependencygroupIdcom.alibaba.cloud/groupIdartifactIdspring-cloud-starter-alibaba-seata/artifactId!-- 排除掉springcloud默认的seata版本以免版本不一致出现问题--exclusionsexclusiongroupIdio.seata/groupIdartifactIdseata-spring-boot-starter/artifactId/exclusionexclusiongroupIdio.seata/groupIdartifactIdseata-all/artifactId/exclusion/exclusions
/dependency
dependencygroupIdio.seata/groupIdartifactIdseata-spring-boot-starter/artifactIdversion1.7.0/version
/dependency配置文件
在application.yml文件中配置 每个微服务都要
#seata客户端配置
seata:enabled: trueapplication-id: seata_txtx-service-group: seata_tx_groupservice:vgroup-mapping:seata_tx_group: defaultregistry:type: nacosnacos:application: seata-serverserver-addr: 127.0.0.1:8848namespace:group: SEATA_GROUPdata-source-proxy-mode: XA其中seata_tx_group为我们自定义的事务组名字随便起但是下面service.vgroup-mapping下一定要有一个对应这个名字的映射映射到defaultseata默认的集群名称。 nacos方面我们仅配置注册项即registry下的配置配置内容与服务端保持一致。
配置全局事务
在business-service服务的purchase 方法中加上全局事务标签GlobalTransactional
Override
GlobalTransactional
public void purchase(String userId, String commodityCode, int orderCount, boolean rollback) {String result stockFeignClient.deduct(commodityCode, orderCount);if (!SUCCESS.equals(result)) {throw new RuntimeException(库存服务调用失败,事务回滚!);}result orderFeignClient.create(userId, commodityCode, orderCount);if (!SUCCESS.equals(result)) {throw new RuntimeException(订单服务调用失败,事务回滚!);}if (rollback) {throw new RuntimeException(Force rollback ... );}
}测试
正常http://localhost:8088/businesses/purchase?rollbackfalsecount2
超库存http://localhost:8088/businesses/purchase?rollbackfalsecount12
超余额http://localhost:8088/businesses/purchase?rollbackfalsecount8
优缺点
优点
事务强一致性满足ACID原则常用的数据库都支持实现简单并且没有代码侵入
缺点
第一阶段锁定数据库资源等待二阶段结束才释放锁定资源过长性能较差依赖关系型数据库的实现事务
Seata使用AT模式
原理
AT是seata-1.7.0默认的模式。
AT模式同样是分阶段提交事务模式操作起来算是XA模式的优化版。XA模式在第一阶段存在锁定资源的操作时间长之后会影响性能。
AT模式在第一阶段直接提交事务弥补了XA模式中资源锁定周期过长缺陷。 操作流程:
第一阶段
1注册全局事务
2调用RM事务接口注册分支事务
3执行RM事务操作并提交记录undo log日志快照
4往TC报告事务状态
第二阶段
1所有RM执行完本地事务TM发起全局事务提交/回滚
2TC检查所有RM事务状态yes or no?
全部yes通知所有RM提交事务删除undo log日志快照
存在no通知所有RM回滚事务恢复undo log日志快照
XA vs AT
XA模式一阶段不提交事务锁定资源 AT模式一阶段直接提交不锁定资源XA模式依赖数据库实现回滚 AT利用数据快照实现数据回顾XA模式强一致性AT模式最终一致(一阶段提交此时有事务查询就存在不一致)
问题
AT模式因为在全局事务中第一阶段就提交了事务释放资源。如果这个时另外RM/外部事务(非RM)操作相同资源可能存在读写隔离问题(更新丢失问题)。
问题出现原理 读写隔离问题-2个seata事务解决方案
加全局锁 1个seata事务 非seata事务解 代码
配置seata-AT相关快照/全局锁/快照表数据库
配置数据库
TM数据库-seata中
源sql seata-server-1.7.0/script/server/db 中
各个RM数据库
添加undo_log表 源sql https://seata.io/zh-cn/docs/dev/mode/at-mode.html
CREATE TABLE undo_log (id bigint(20) NOT NULL AUTO_INCREMENT,branch_id bigint(20) NOT NULL,xid varchar(100) NOT NULL,context varchar(128) NOT NULL,rollback_info longblob NOT NULL,log_status int(11) NOT NULL,log_created datetime NOT NULL,log_modified datetime NOT NULL,ext varchar(100) DEFAULT NULL,PRIMARY KEY (id),UNIQUE KEY ux_undo_log (xid,branch_id)
) ENGINEInnoDB AUTO_INCREMENT1 DEFAULT CHARSETutf8;
配置文件
在application.yml文件中配置把模式XT改为AT即可 每个微服务都要
#seata客户端配置
seata:enabled: trueapplication-id: seata_txtx-service-group: seata_tx_groupservice:vgroup-mapping:seata_tx_group: defaultregistry:type: nacosnacos:application: seata-serverserver-addr: 127.0.0.1:8848namespace:group: SEATA_GROUPdata-source-proxy-mode: AT测试
正常http://localhost:8088/businesses/purchase?rollbackfalsecount2
超库存http://localhost:8088/businesses/purchase?rollbackfalsecount12
超余额http://localhost:8088/businesses/purchase?rollbackfalsecount8
优缺点
优点
一阶段完成直接提交事务释放资源性能较好利用全局锁实现读写隔离没有代码侵入框架自动完成回滚与提交
缺点
两阶段之间存在数据不一致情况只能保证最终一致框架的快照功能影响性能但比XA模式要好很多
踩坑
测试时遇到错误如下
java.sql.SQLException: io.seata.core.exception.RmTransactionException: branch register failed, xid: 192.168.7.91:8091:2072106933610864919, errMsg: TransactionException[branch register request failed. xid192.168.7.91:8091:2072106933610864919, msgUnknown column status in field list] 后来发现用了原来的旧版本的lock_table表少了字段status建议从官网找到对应版本的最新的sql。重新使用新的执行如下
CREATE TABLE IF NOT EXISTS lock_table
(row_key VARCHAR(128) NOT NULL,xid VARCHAR(128),transaction_id BIGINT,branch_id BIGINT NOT NULL,resource_id VARCHAR(256),table_name VARCHAR(32),pk VARCHAR(36),status TINYINT NOT NULL DEFAULT 0 COMMENT 0:locked ,1:rollbacking,gmt_create DATETIME,gmt_modified DATETIME,PRIMARY KEY (row_key),KEY idx_status (status),KEY idx_branch_id (branch_id),KEY idx_xid_and_branch_id (xid , branch_id)
) ENGINE InnoDBDEFAULT CHARSET utf8mb4;Seata使用TCC模式
原理
TCC模式的seata版实现。TCC模式与AT模式非常相似每阶段都是独立事务不同的TCC通过人工编码来实现数据恢复。
Try资源的检测和预留Confirm完成资源操作业务要求Try 成功Confirm 一定要能成功Cancel预留资源释放可以理解为try的反向操作 操作流程:
1注册全局事务
2调用RM事务接口注册分支事务
3执行RM事务try接口检查资源预留资源
4往TC报告事务状态
5所有RM执行完本地事务TM发起全局事务提交/回滚
2TC检查所有RM事务状态yes or no?
全部yes通知所有RM 执行confirm接口提交事务
存在no通知所有RM 执行cancel接口回滚事务
TM 在发起全局事务时生成全局事务记录全局事务 ID 贯穿整个分布式事务调用链条。
案例演示 问题
TCC模式中在执行Try执行Confirm执行Cancel 过程中会出现意外情况导致TCC模式经典问题空回滚业务悬挂重试幂等问题。
空回滚
当某个分支事务try阶段阻塞时可能导致全局事务超时而触发二阶段的cancel操作RM在没有执行try操作就执行cancel操作此时cancel无数据回滚这就是空回滚。 业务悬挂
当发生的空回滚之后当阻塞的Try正常了RM先执行空回滚(cancel)后又收到Try操作指令执行业务操作并冻结资源。但是事务已经结束不会再有confirm 或cancel了那刚执行try操作冻结资源就被悬挂起来了。这就是业务悬挂
重试幂等
因为网络抖动等原因TC下达的Confirm/Cancel 指令可能出现延时发送失败等问题此时TC会启用重试机制到时RM可能收到多个confirm或cancel指令这就要求confirm接口或者cancel接口需要能够保证幂等性。 幂等性多次执行结果都一样 解决
上面空回滚/业务悬挂问题解决一般都一起实现引入事务状态控制表
表字段 xid冻结数据事务状态(try、confirm/cancel)
以RM: account-service 中用户账户余额为例子。 try1在状态表中记录冻结金额与事务状态为try2扣减账户余额
confirm1根据xid删除状态表中冻结记录
cancel1修改状态表冻结金额为0事务状态改为cancel 2恢复账户扣减
如何判断是否为空回滚在cancel中根据xid查询状态表如果不存在说明try执行需要空回滚
如果避免业务悬挂try业务中根据xid查询状态表如果已经存在说明已经执行过cancel已经执行过拒绝执行try业务。
重试幂等需要引入唯一标识比如第一次操作成功留下唯一标识下次来识别这个标识。
代码
在AT模式基础上做代码TCC改造就行。
account-service服务
新增IAccountTCCService接口
package cn.wolfcode.tx.account.service;import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.BusinessActionContextParameter;
import io.seata.rm.tcc.api.LocalTCC;
import io.seata.rm.tcc.api.TwoPhaseBusinessAction;/*** TCC 二阶段提交业务接口*/
LocalTCC
public interface IAccountTCCService {/*** try-预扣款*/TwoPhaseBusinessAction(nametryReduce, commitMethod confirm, rollbackMethod cancel)void tryReduce(BusinessActionContextParameter(paramName userId) String userId,BusinessActionContextParameter(paramName money) int money);/*** confirm-提交* param ctx* return*/boolean confirm(BusinessActionContext ctx);/*** cancel-回滚* param ctx* return*/boolean cancel(BusinessActionContext ctx);
}IAccountTCCService实现类AccountTCCServiceImpl
package cn.wolfcode.tx.account.service.impl;import cn.wolfcode.tx.account.domain.Account;
import cn.wolfcode.tx.account.mapper.AccountMapper;
import cn.wolfcode.tx.account.service.IAccountTCCService;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import io.seata.rm.tcc.api.BusinessActionContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;Service
public class AccountTCCServiceImpl implements IAccountTCCService {Autowiredprivate AccountMapper accountMapper;Overridepublic void tryReduce(String userId, int money) {System.err.println(-----------tryReduce-------------);Account one accountMapper.selectOne(new LambdaQueryWrapperAccount().eq(Account::getUserId, userId));if(one ! null one.getMoney() money){throw new RuntimeException(Not Enough Money ...);}LambdaUpdateWrapperAccount wrapper new LambdaUpdateWrapper();wrapper.setSql(money money - money);wrapper.eq(Account::getUserId, userId);accountMapper.update(null, wrapper);}Overridepublic boolean confirm(BusinessActionContext ctx) {System.err.println(-----------confirm-------------);return true;}Overridepublic boolean cancel(BusinessActionContext ctx) {System.err.println(-----------cancel-------------);return true;}
}controller改动把IAccountService换成IAccountTCCService
package cn.wolfcode.tx.account.controller;import cn.wolfcode.tx.account.service.IAccountTCCService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;RestController
RequestMapping(accounts)
public class AccountController {// Autowired
// private IAccountService accountService;Autowiredprivate IAccountTCCService accountTCCService;GetMapping(value /reduce)public String reduce(String userId, int money) {try {accountTCCService.tryReduce(userId, money);} catch (Exception exx) {exx.printStackTrace();return FAIL;}return SUCCESS;}}order-service服务
新增IOrderTCCService接口
package cn.wolfcode.tx.order.service;import cn.wolfcode.tx.order.domain.Order;
import com.baomidou.mybatisplus.extension.service.IService;
import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.BusinessActionContextParameter;
import io.seata.rm.tcc.api.LocalTCC;
import io.seata.rm.tcc.api.TwoPhaseBusinessAction;/*** TCC 二阶段提交业务接口*/
LocalTCC
public interface IOrderTCCService {/*** try-预扣款*/TwoPhaseBusinessAction(nametryCreate, commitMethod confirm, rollbackMethod cancel)void tryCreate(BusinessActionContextParameter(paramName userId) String userId,BusinessActionContextParameter(paramName commodityCode) String commodityCode,BusinessActionContextParameter(paramName orderCount) int orderCount);/*** confirm-提交* param ctx* return*/boolean confirm(BusinessActionContext ctx);/*** cancel-回滚* param ctx* return*/boolean cancel(BusinessActionContext ctx);}IOrderTCCService实现类OrderTCCServiceImpl
package cn.wolfcode.tx.order.service.impl;import cn.wolfcode.tx.order.domain.Order;
import cn.wolfcode.tx.order.feign.AccountFeignClient;
import cn.wolfcode.tx.order.mapper.OrderMapper;
import cn.wolfcode.tx.order.service.IOrderTCCService;
import io.seata.rm.tcc.api.BusinessActionContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;Service
public class OrderTCCServiceImpl implements IOrderTCCService {Autowiredprivate AccountFeignClient accountFeignClient;Autowiredprivate OrderMapper orderMapper;Overridepublic void tryCreate(String userId, String commodityCode, int count) {System.err.println(---------tryCreate-----------);// 定单总价 订购数量(count) * 商品单价(100)int orderMoney count * 100;// 生成订单Order order new Order();order.setCount(count);order.setCommodityCode(commodityCode);order.setUserId(userId);order.setMoney(orderMoney);orderMapper.insert(order);// 调用账户余额扣减String result accountFeignClient.reduce(userId, orderMoney);if (!SUCCESS.equals(result)) {throw new RuntimeException(Failed to call Account Service. );}}Overridepublic boolean confirm(BusinessActionContext ctx) {System.err.println(---------confirm-----------);return true;}Overridepublic boolean cancel(BusinessActionContext ctx) {System.err.println(---------cancel-----------);return true;}
}controller改动
package cn.wolfcode.tx.order.controller;import cn.wolfcode.tx.order.service.IOrderTCCService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;RestController
RequestMapping(orders)
public class OrderController {Autowiredprivate IOrderTCCService orderTCCService;GetMapping(value /create)public String create(String userId, String commodityCode, int orderCount) {try {orderTCCService.tryCreate(userId, commodityCode, orderCount);} catch (Exception exx) {exx.printStackTrace();return FAIL;}return SUCCESS;}
}stock-service服务
新增IStockTCCService接口
package cn.wolfcode.tx.stock.service;import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.BusinessActionContextParameter;
import io.seata.rm.tcc.api.LocalTCC;
import io.seata.rm.tcc.api.TwoPhaseBusinessAction;/*** TCC 二阶段提交业务接口*/
LocalTCC
public interface IStockTCCService {/*** try-预扣款*/TwoPhaseBusinessAction(nametryDeduct, commitMethod confirm, rollbackMethod cancel)void tryDeduct(BusinessActionContextParameter(paramName commodityCode) String commodityCode,BusinessActionContextParameter(paramName count) int count);/*** confirm-提交* param ctx* return*/boolean confirm(BusinessActionContext ctx);/*** cancel-回滚* param ctx* return*/boolean cancel(BusinessActionContext ctx);
}IStockTCCService实现类StockTCCServiceImpl
package cn.wolfcode.tx.stock.service.impl;import cn.wolfcode.tx.stock.domain.Stock;
import cn.wolfcode.tx.stock.mapper.StockMapper;
import cn.wolfcode.tx.stock.service.IStockTCCService;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import io.seata.rm.tcc.api.BusinessActionContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;Service
public class StockTCCServiceImpl implements IStockTCCService {Autowiredprivate StockMapper stockMapper;Overridepublic void tryDeduct(String commodityCode, int count) {System.err.println(---------tryDeduct-----------);Stock one stockMapper.selectOne(new LambdaQueryWrapperStock().eq(Stock::getCommodityCode, commodityCode));if(one ! null one.getCount() count){throw new RuntimeException(Not Enough Count ...);}stockMapper.update(null, new LambdaUpdateWrapperStock().setSql(count count- count).eq(Stock::getCommodityCode, commodityCode));}Overridepublic boolean confirm(BusinessActionContext ctx) {System.err.println(---------confirm-----------);return true;}Overridepublic boolean cancel(BusinessActionContext ctx) {System.err.println(---------cancel-----------);return true;}
}controller改动
package cn.wolfcode.tx.stock.controller;import cn.wolfcode.tx.stock.service.IStockService;
import cn.wolfcode.tx.stock.service.IStockTCCService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;RestController
RequestMapping(stocks)
public class StockController {Autowiredprivate IStockTCCService stockTCCService;GetMapping(value /deduct)public String deduct(String commodityCode, int count) {try {stockTCCService.tryDeduct(commodityCode, count);} catch (Exception exx) {exx.printStackTrace();return FAIL;}return SUCCESS;}
}上面操作在理想情况下是没有问题的但是一旦出现需要回滚操作就出问题了无法进行数据回补。此时就需要使用到事务状态表实现数据回补同时实现空回滚避免业务悬挂。
account-service
在seata-account 新增事务状态表
CREATE TABLE t_account_tx (id bigint NOT NULL AUTO_INCREMENT COMMENT 主键,tx_id varchar(100) NOT NULL COMMENT 事务id,freeze_money int DEFAULT NULL COMMENT 冻结金额,state int DEFAULT NULL COMMENT 状态 0try 1confirm 2cancel,PRIMARY KEY (id)
) ENGINEInnoDB DEFAULT CHARSETutf8;新增domainAccountTX
package cn.wolfcode.tx.account.domain;import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;Data
TableName(t_account_tx)
public class AccountTX {public static final int STATE_TRY 0;public static final int STATE_CONFIRM 1;public static final int STATE_CANCEL 2;TableId(type IdType.AUTO)private Integer id;private String txId;private int freezeMoney;private int state STATE_TRY;
}新增mapperAccountTXMapper
package cn.wolfcode.tx.account.mapper;import cn.wolfcode.tx.account.domain.AccountTX;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper;Mapper
public interface AccountTXMapper extends BaseMapperAccountTX {
}修改AccountTCCServiceImpl
package cn.wolfcode.tx.account.service.impl;import cn.wolfcode.tx.account.domain.Account;
import cn.wolfcode.tx.account.domain.AccountTX;
import cn.wolfcode.tx.account.mapper.AccountMapper;
import cn.wolfcode.tx.account.mapper.AccountTXMapper;
import cn.wolfcode.tx.account.service.IAccountTCCService;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import io.seata.core.context.RootContext;
import io.seata.rm.tcc.api.BusinessActionContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;Service
public class AccountTCCServiceImpl implements IAccountTCCService {Autowiredprivate AccountMapper accountMapper;Autowiredprivate AccountTXMapper accountTXMapper;Overridepublic void tryReduce(String userId, int money) {System.err.println(-----------tryReduce------------- RootContext.getXID());//业务悬挂AccountTX accountTX accountTXMapper.selectOne(new LambdaQueryWrapperAccountTX().eq(AccountTX::getTxId, RootContext.getXID()));if (accountTX ! null){//存在说明已经canel执行过类拒绝服务return;}Account one accountMapper.selectOne(new LambdaQueryWrapperAccount().eq(Account::getUserId, userId));if(one ! null one.getMoney() money){throw new RuntimeException(Not Enough Money ...);}LambdaUpdateWrapperAccount wrapper new LambdaUpdateWrapper();wrapper.setSql(money money - money);wrapper.eq(Account::getUserId, userId);accountMapper.update(null, wrapper);AccountTX tx new AccountTX();tx.setFreezeMoney(money);tx.setTxId(RootContext.getXID());tx.setState(AccountTX.STATE_TRY);accountTXMapper.insert(tx);}Overridepublic boolean confirm(BusinessActionContext ctx) {System.err.println(-----------confirm-------------);//删除记录int ret accountTXMapper.delete(new LambdaQueryWrapperAccountTX().eq(AccountTX::getTxId, ctx.getXid()));return ret 1;}Overridepublic boolean cancel(BusinessActionContext ctx) {System.err.println(-----------cancel-------------);String userId ctx.getActionContext(userId).toString();String money ctx.getActionContext(money).toString();AccountTX accountTX accountTXMapper.selectOne(new LambdaQueryWrapperAccountTX().eq(AccountTX::getTxId, ctx.getXid()));if (accountTX null){//为空 空回滚accountTX new AccountTX();accountTX.setTxId(ctx.getXid());accountTX.setState(AccountTX.STATE_CANCEL);if(money ! null){accountTX.setFreezeMoney(Integer.parseInt(money));}accountTXMapper.insert(accountTX);return true;}//幂等处理if(accountTX.getState() AccountTX.STATE_CANCEL){return true;}//恢复余额accountMapper.update(null, new LambdaUpdateWrapperAccount().setSql(money money money).eq(Account::getUserId, userId));accountTX.setFreezeMoney(0);accountTX.setState(AccountTX.STATE_CANCEL);int ret accountTXMapper.updateById(accountTX);return ret 1;}
}order-service
在seata-order 新增事务状态表
CREATE TABLE t_order_tx (id bigint NOT NULL AUTO_INCREMENT COMMENT 主键,tx_id varchar(100) NOT NULL COMMENT 事务id,state int DEFAULT NULL COMMENT 状态 0try 1confirm 2cancel,PRIMARY KEY (id)
) ENGINEInnoDB DEFAULT CHARSETutf8;新增domainOrderTX
package cn.wolfcode.tx.order.domain;import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;Data
TableName(t_order_tx)
public class OrderTX {public static final int STATE_TRY 0;public static final int STATE_CONFIRM 1;public static final int STATE_CANCEL 2;TableId(type IdType.AUTO)private Integer id;private String txId;private int state STATE_TRY;
}
新增mapperOrderTXMapper
package cn.wolfcode.tx.order.mapper;import cn.wolfcode.tx.order.domain.OrderTX;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper;Mapper
public interface OrderTXMapper extends BaseMapperOrderTX {
}修改OrderTCCServiceImpl
package cn.wolfcode.tx.order.service.impl;import cn.wolfcode.tx.order.domain.Order;
import cn.wolfcode.tx.order.domain.OrderTX;
import cn.wolfcode.tx.order.feign.AccountFeignClient;
import cn.wolfcode.tx.order.mapper.OrderMapper;
import cn.wolfcode.tx.order.mapper.OrderTXMapper;
import cn.wolfcode.tx.order.service.IOrderService;
import cn.wolfcode.tx.order.service.IOrderTCCService;
import com.alibaba.nacos.shaded.org.checkerframework.checker.units.qual.A;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import io.seata.core.context.RootContext;
import io.seata.rm.tcc.api.BusinessActionContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;Service
public class OrderTCCServiceImpl implements IOrderTCCService {Autowiredprivate AccountFeignClient accountFeignClient;Autowiredprivate OrderMapper orderMapper;Autowiredprivate OrderTXMapper orderTXMapper;Overridepublic void tryCreate(String userId, String commodityCode, int count) {System.err.println(---------tryCreate-----------);//业务悬挂OrderTX orderTX orderTXMapper.selectOne(new LambdaQueryWrapperOrderTX().eq(OrderTX::getTxId, RootContext.getXID()));if (orderTX ! null){//存在说明已经canel执行过类拒绝服务return;}// 定单总价 订购数量(count) * 商品单价(100)int orderMoney count * 100;// 生成订单Order order new Order();order.setCount(count);order.setCommodityCode(commodityCode);order.setUserId(userId);order.setMoney(orderMoney);orderMapper.insert(order);OrderTX tx new OrderTX();tx.setTxId(RootContext.getXID());tx.setState(OrderTX.STATE_TRY);orderTXMapper.insert(tx);// 调用账户余额扣减String result accountFeignClient.reduce(userId, orderMoney);if (!SUCCESS.equals(result)) {throw new RuntimeException(Failed to call Account Service. );}}Overridepublic boolean confirm(BusinessActionContext ctx) {System.err.println(---------confirm-----------);//删除记录int ret orderTXMapper.delete(new LambdaQueryWrapperOrderTX().eq(OrderTX::getTxId, ctx.getXid()));return ret 1;}Overridepublic boolean cancel(BusinessActionContext ctx) {System.err.println(---------cancel----------- );String userId ctx.getActionContext(userId).toString();String commodityCode ctx.getActionContext(commodityCode).toString();OrderTX orderTX orderTXMapper.selectOne(new LambdaQueryWrapperOrderTX().eq(OrderTX::getTxId, ctx.getXid()));if (orderTX null){//为空 空回滚orderTX new OrderTX();orderTX.setTxId(ctx.getXid());orderTX.setState(OrderTX.STATE_CANCEL);orderTXMapper.insert(orderTX);return true;}//幂等处理if(orderTX.getState() OrderTX.STATE_CANCEL){return true;}//恢复余额orderMapper.delete(new LambdaQueryWrapperOrder().eq(Order::getUserId, userId).eq(Order::getCommodityCode, commodityCode));orderTX.setState(OrderTX.STATE_CANCEL);int ret orderTXMapper.updateById(orderTX);return ret 1;}
}
stock-service
在seata-stock 新增事务状态表
CREATE TABLE t_stock_tx (id bigint NOT NULL AUTO_INCREMENT COMMENT 主键,tx_id varchar(100) NOT NULL COMMENT 事务id,count int DEFAULT NULL COMMENT 冻结库存,state int DEFAULT NULL COMMENT 状态 0try 1confirm 2cancel,PRIMARY KEY (id)
) ENGINEInnoDB DEFAULT CHARSETutf8;新增domainStockTX
package cn.wolfcode.tx.stock.domain;import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;Data
TableName(t_stock_tx)
public class StockTX {public static final int STATE_TRY 0;public static final int STATE_CONFIRM 1;public static final int STATE_CANCEL 2;TableId(type IdType.AUTO)private Integer id;private String txId;private int count;private int state STATE_TRY;
}
新增mapperStockTXMapper
package cn.wolfcode.tx.stock.mapper;import cn.wolfcode.tx.stock.domain.StockTX;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper;Mapper
public interface StockTXMapper extends BaseMapperStockTX {
}修改StockTCCServiceImpl
package cn.wolfcode.tx.stock.service.impl;import cn.wolfcode.tx.stock.domain.Stock;
import cn.wolfcode.tx.stock.domain.StockTX;
import cn.wolfcode.tx.stock.mapper.StockMapper;
import cn.wolfcode.tx.stock.mapper.StockTXMapper;
import cn.wolfcode.tx.stock.service.IStockService;
import cn.wolfcode.tx.stock.service.IStockTCCService;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import io.seata.core.context.RootContext;
import io.seata.rm.tcc.api.BusinessActionContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;Service
public class StockTCCServiceImpl implements IStockTCCService {Autowiredprivate StockMapper stockMapper;Autowiredprivate StockTXMapper stockTXMapper;Overridepublic void tryDeduct(String commodityCode, int count) {System.err.println(---------tryDeduct-----------);//业务悬挂StockTX stockTX stockTXMapper.selectOne(new LambdaQueryWrapperStockTX().eq(StockTX::getTxId, RootContext.getXID()));if (stockTX ! null){//存在说明已经canel执行过类拒绝服务return;}Stock one stockMapper.selectOne(new LambdaQueryWrapperStock().eq(Stock::getCommodityCode, commodityCode));if(one ! null one.getCount() count){throw new RuntimeException(Not Enough Count ...);}stockMapper.update(null, new LambdaUpdateWrapperStock().setSql(count count- count).eq(Stock::getCommodityCode, commodityCode));StockTX tx new StockTX();tx.setCount(count);tx.setTxId(RootContext.getXID());tx.setState(StockTX.STATE_TRY);stockTXMapper.insert(tx);}Overridepublic boolean confirm(BusinessActionContext ctx) {System.err.println(---------confirm-----------);//删除记录int ret stockTXMapper.delete(new LambdaQueryWrapperStockTX().eq(StockTX::getTxId, ctx.getXid()));return ret 1;}Overridepublic boolean cancel(BusinessActionContext ctx) {System.err.println(---------cancel-----------);String count ctx.getActionContext(count).toString();String commodityCode ctx.getActionContext(commodityCode).toString();StockTX stockTX stockTXMapper.selectOne(new LambdaQueryWrapperStockTX().eq(StockTX::getTxId, ctx.getXid()));if (stockTX null){//为空 空回滚stockTX new StockTX();stockTX.setTxId(ctx.getXid());stockTX.setState(StockTX.STATE_CANCEL);if(count ! null){stockTX.setCount(Integer.parseInt(count));}stockTXMapper.insert(stockTX);return true;}//幂等处理if(stockTX.getState() StockTX.STATE_CANCEL){return true;}//恢复余额stockMapper.update(null, new LambdaUpdateWrapperStock().setSql(count count count).eq(Stock::getCommodityCode, commodityCode));stockTX.setCount(0);stockTX.setState(StockTX.STATE_CANCEL);int ret stockTXMapper.updateById(stockTX);return ret 1;}
}
测试
正常http://localhost:8088/businesses/purchase?rollbackfalsecount2
超库存http://localhost:8088/businesses/purchase?rollbackfalsecount12
超余额http://localhost:8088/businesses/purchase?rollbackfalsecount8
优缺点
优点
一阶段完成直接提交事务释放数据库资源性能好相比AT模型无需生成快照无需使用全局锁性能最强不依赖数据库事务而是依赖补偿操作可以用于非事务性数据库
缺点
代码侵入需要认为编写tryconfirm和cancel接口麻烦没提交/回滚事务前数据是不一致的事务属于最终一致需要考虑confirm 和cancel失败情况要做好幂等处理
Seata使用SAGA模式
概述
Saga模式是SEATA提供的长事务解决方案在Saga模式中业务流程中每个参与者都提交本地事务当出现某一个参与者失败则补偿前面已经成功的参与者一阶段正向服务和二阶段补偿服务都由业务开发实现。
简单理解
saga模式也分为2个阶段
一阶段 直接提交本地事务(所有RM)
二阶段一阶段成功了啥都不做如果存在某个RM本地事务失败则编写补偿业务(反向操作)来实现回滚
原理 左边是所有参与者事务右边是补偿反向操作
正常执行顺序 T1–T2–T3–TN
需要回滚执行顺序T1–T2–T3–TN—回滚—TN—T3—T2—T1
优缺点
优点
事务参与者可以居于事件驱动实现异步调用吞吐量高一阶段直接提交事务无锁性能好不用编写TCC中的三哥阶段实现简单
缺点
一阶段到二阶段时间不定时效性差没有锁没有事务隔离会有脏写可能
模式选择
XAATTCCSAGA一致性强一致弱一致弱一致最终一致隔离性完全隔离基于全局锁基于资源预留隔离无隔离代码侵入无无有编写3个接口有编写状态机和补偿业务性能差好非常好非常好场景对一致性、隔离性有高要求的业务居于关系型数据库的大部分分布式事务场景都可以对性能要求较高的事务有非关系型数据参与的事务业务流程长业务流程多参与者包含其他公司或者遗留系统服务无法提供TCC模式要求的是3个接口
参考
https://blog.csdn.net/crazymakercircle/article/details/109459593?spm1001.2014.3001.5502 https://baijiahao.baidu.com/s?id1717325036148461851wfrspiderforpc