北京建设监理协会官方网站,响应式网站 产品轮播代码,宜兴宜兴建设局网站,网站开发费用无形资产本篇主要介绍一下Elasticsearch的并发控制和乐观锁的实现原理#xff0c;列举常见的电商场景#xff0c;关系型数据库的并发控制、ES的并发控制实践。
并发场景
不论是关系型数据库的应用#xff0c;还是使用Elasticsearch做搜索加速的场景#xff0c;只要有数据更新列举常见的电商场景关系型数据库的并发控制、ES的并发控制实践。
并发场景
不论是关系型数据库的应用还是使用Elasticsearch做搜索加速的场景只要有数据更新并发控制是永恒的话题。
当我们使用ES更新document的时候先读取原始文档做修改然后把document重新索引如果有多人同时在做相同的操作不做并发控制的话就极有可能会发生修改丢失的。可能有些场景丢失一两条数据不要紧比如文章阅读数量统计评论数量统计但有些场景对数据严谨性要求极高丢失一条可能会导致很严重的生产问题比如电商系统中商品的库存数量丢失一次更新可能会导致超卖的现象。
我们还是以电商系统的下单环节举例某商品库存100个两个用户下单购买都包含这件商品常规下单扣库存的实现步骤 客户端完成订单数据校验准备执行下单事务。 客户端从ES中获取商品的库存数量。 客户端提交订单事务并将库存数量扣减。 客户端将更新后的库存数量写回到ES。
示例流程图如下 如果没有并发控制这件商品的库存就会更新成99实际正确的值是98这样就会导致超卖现象。假定http-1比http-2先一步执行出现这个问题的原因是http-2在获取库存数据时http-1还未完成下单扣减库存后更新到ES的环节导致http-2获取的数据已经是过期数据后续的更新肯定也是错的。
上述的场景如果更新操作越是频繁并发数越多读取到更新这一段的耗时越长数据出错的概率就越大。
常用的锁方案
并发控制尤为重要有两种通用的方案可以确保数据在并发更新时的正确性。
悲观并发控制
悲观锁的含义我认为每次更新都有冲突的可能并发更新这种操作特别不靠谱我只相信只有严格按我定义的粒度进行串行更新才是最安全的一个线程更新时其他的线程等着前一个线程更新完成后下一个线程再上。
关系型数据库中广泛使用该方案常见的表锁、行锁、读锁、写锁依赖redis或memcache等实现的分布式锁都属于悲观锁的范畴。明显的特征是后续的线程会被挂起等待性能一般来说比较低不过自行实现的分布式锁粒度可以自行控制按行记录、按客户、按业务类型等在数据正确性与并发性能方面也能找到很好的折衷点。
乐观并发控制
乐观锁的含义我认为冲突不经常发生我想提高并发的性能如果真有冲突被冲突的线程重新再尝试几次就好了。
在使用关系型数据库的应用也经常会自行实现乐观锁的方案有性能优势方案实现也不难还是挺吸引人的。
Elasticsearch默认使用的是乐观锁方案前面介绍的_version字段记录的就是每次更新的版本号只有拿到最新版本号的更新操作才能更新成功其他拿到过期数据的更新失败由客户端程序决定失败后的处理方案一般是重试。
ES的乐观锁方案
我们还是以上面的案例为背景若http-2向ES提交更新数据时ES会判断提交过来的版本号与当前document版本号document版本号单调递增如果提交过来的版本号比document版本号小则说明是过期数据更新请求将提示错误过程图如下 使用外部_version实战乐观锁控制效果
虽然ElasticSearch的最新本版已经不再支持直接使用version字段实现乐观锁但仍然允许利用外部版本号version实现乐观锁。
所谓外部版本号version意思就是version字段的值不是由ElasticSearch自动生成的而是在创建或修改文档时由应用程序在请求参数中明确指定的。
例如在生产实践中我们通常使用关系型数据库作为主数据源将数据从DB同步到ElasticSearch以提供数据搜索服务。而DB中的每行记录都会有update_time字段表示数据最后被更新的时间戳。我们可以利用这个时间戳作为外部版本号以确保从DB同步数据给ElasticSearch的数据一致性。
删除之前的索引执行如下命令重新创建一个新的文档 PUT /book_store/_doc/97876381260?version1638430097013version_typeexternal
{title: 零基础学Java,ISBN: 97876381260,price: 66.88, stock: 100
}
version_type参数的值是external明确指定使用外部版本号作为文档version字段的值。
version参数的值是笔者写这篇文章时的时间戳会作为文档的version字段的值。
该命令与普通的Index API类似也是“先尝试创建一个新的文档如果对应的文档已存在就更新那个文档”。但区别是当且仅当request中的version参数的值大于该文档中version字段的值时才会更新成功否则就会报错表示并发冲突。
ElasticSearch收到该请求后返回如下结果可以看到文档version字段的值就是创建文档时提供的外部版本号。 {_index : book_store,_type : _doc,_id : 97876381260,_version : 1638430097013,result : created,_shards : {total : 2,successful : 2,failed : 0},_seq_no : 0,_primary_term : 1
}
再执行一次与上面完全相同的命令ElasticSearch会返回如下错误结果 {error : {root_cause : [{type : version_conflict_engine_exception,reason : [97876381260]: version conflict, current version [1638430097013] is higher or equal to the one provided [1638430097013],index_uuid : NUfx6zIrQFGWcgh5NNwP4g,shard : 0,index : book_store}],type : version_conflict_engine_exception,reason : [97876381260]: version conflict, current version [1638430097013] is higher or equal to the one provided [1638430097013],index_uuid : NUfx6zIrQFGWcgh5NNwP4g,shard : 0,index : book_store},status : 409
}
从错误原因可以看出由于提供的版本号参数不大于文档中的版本号所以导致了并发冲突的异常。
如果我们要更新这个文档则必须用一个新的外部版本号且这个外部版本号必须大于文档当前的version字段的值。
使用了一个新的时间戳构造文档更新请求 PUT /book_store/_doc/97876381260?version1638435446074version_typeexternal
{doc: {stock: 99}
}
ElasticSearch收到该请求后返回如下结果更新成功且文档version字段的值已经被更新成新的外部版本号了。
{_index : book_store,_type : _doc,_id : 97876381260,_version : 1638435446074,result : updated,_shards : {total : 2,successful : 2,failed : 0},_seq_no : 1,_primary_term : 1
} _seq_no _primary_term
_seq_no 和 _primary_term 是用来并发控制和 _version不同_version属于当前文档而 _seq_no属于整个index。
_seq_no _primary_term _seq_no索引级别的版本号索引中所有文档共享一个 _seq_no 。 _primary_termprimary_term是一个整数每当Primary Shard发生重新分配时比如节点重启Primary选举或重新分配等primary_term会递增1。主要作用是用来恢复数据时处理当多个文档的_seq_no 一样时的冲突避免 Primary Shard 上的数据写入被覆盖。
if_seq_no if_primary_term 在Elasticsearch中if_seq_no 和 if_primary_term 是用于乐观锁并发控制的参数用于确保对文档的操作不会与其他操作产生冲突。
if_seq_no 参数用于指定期望的文档序列号seq_no而 if_primary_term 参数用于指定期望的 primary term。这两个参数一起作为条件如果提供的条件与实际存储的文档序列号和主要项匹配则操作成功执行否则操作将失败并返回版本冲突的错误。
假设我们有一个名为 my_index 的索引其中包含 _id 为 1 的文档。当前文档的 seq_no 是 10primary_term 是 1。
示例 1更新文档
PUT my_index/_doc/1?if_seq_no10if_primary_term1
{foo: bar
}
输出
{_index: my_index,_type: _doc,_id: 1,_version: 11,result: updated,_shards: {total: 2,successful: 1,failed: 0}
}
在这个示例中通过提供正确的 if_seq_no 和 if_primary_term 条件操作成功地更新了文档并返回了更新后的版本号 _version。
示例 2更新文档但条件不匹配
PUT my_index/_doc/1?if_seq_no11if_primary_term1
{foo: bar
}
输出
{error: {root_cause: [{type: version_conflict_engine_exception,reason: [1]: version conflict, current version [11], provided version [11],index_uuid: xxxxxxxxxxxxx,shard: 0,index: my_index}],type: version_conflict_engine_exception,reason: [1]: version conflict, current version [11], provided version [11],index_uuid: xxxxxxxxxxxxx,shard: 0,index: my_index},status: 409
}
在这个示例中由于提供的 if_seq_no 和 if_primary_term 条件与实际存储的文档序列号和主要项不匹配操作失败并返回版本冲突的错误。
通过使用 if_seq_no 和 if_primary_term 参数我们可以精确控制对文档的并发操作并避免冲突。