郑州建网站的公司,gps建站教程,企业网站 阿里云,企业电子商务网站建设的重要性目录
一、锁的基础知识
1. 互斥量/互斥锁 2. CAS#xff08;compare and swap#xff09;
3. 自旋锁
4. 读写锁
5. 乐观锁 悲观锁
6. 死锁
二、go中锁机制
1. Mutex-互斥锁
2. RWMutex-读写锁
2.1 RWMutex流程概览
2.2 写锁饥饿问题
2.3. golang的读写锁源…目录
一、锁的基础知识
1. 互斥量/互斥锁 2. CAScompare and swap
3. 自旋锁
4. 读写锁
5. 乐观锁 悲观锁
6. 死锁
二、go中锁机制
1. Mutex-互斥锁
2. RWMutex-读写锁
2.1 RWMutex流程概览
2.2 写锁饥饿问题
2.3. golang的读写锁源码剖析
2.3.1 读锁实现
2.3.2 写锁实现
2.3.3 关键核心机制
3. 常见问题 一、锁的基础知识
1. 互斥量/互斥锁 互斥量Mutex 又称为互斥锁 是一种用来保护临界区的特殊变量 它可以处于锁定locked 状态 也可以处于解锁unlocked 状态。 在编程中引入了对象互斥锁的概念来保证共享数据操作的完整性。每个对象都对应于一个可称为 互斥锁 的标记这个标记用来保证在任一时刻只能有一个线程访问该对象。 2. CAScompare and swap 解决多线程并行情况下使用锁造成性能损耗的一种机制CAS操作包含三个操作数——内存位置V、预期原值A和新值(B)。如果内存位置的值与预期原值相匹配那么处理器会自动将该位置值更新为新值。否则处理器不做任何操作。无论哪种情况它都会在CAS指令之前返回该位置的值。CAS有效地说明了“我认为位置V应该包含值A如果包含该值则将B放到这个位置否则不要更改该位置只告诉我这个位置现在的值即可。
CAS机制执行流程 CAS存在的问题1. ABA问题 CAS需要在操作值的时候检查值有没有发生变化如果没有发生变化就更新但是如果一个值原来是A变成了B又变成了A那么使用CAS进行检查时会发现它的值没有发生变化但是实际上却变化了 — 这就是所谓的ABA问题。 ABA问题的解决思路其实也很简单就是使用版本号。在变量前面追加上版本号每次变量更新的时候把版本号加1那么A→B→A就会变成1A→2B→3A了。
2. 循环时间长开销大 自旋CAS如果长时间不成功会给CPU带来非常大的执行开销
3. 只能保证一个共享变量的原子操作 当对一个共享变量执行操作时我们可以使用循环CAS的方式来保证原子操作但是对多个共享变量操作时循环CAS就无法保证操作的原子性这个时候就可以用锁。 3. 自旋锁 自旋锁与互斥锁比较类似它们都是为了解决对某项资源的互斥使用。无论是互斥锁还是自旋锁在任何时刻最多只能有一个保持者也就说在任何时刻最多只能有一个执行单元获得锁。但是两者在调度机制上略有不同。对于互斥锁如果资源已经被占用资源申请者只能进入睡眠状态。但是自旋锁不会引起调用者睡眠如果自旋锁已经被别的执行单元保持调用者就一直循环在那里看是否该自旋锁的保持者已经释放了锁自旋一词就是因此而得名。
自旋锁可能存在的2个问题 试图递归地获得自旋锁必然会引起死锁递归程序的持有实例在第二个实例循环以试图获得相同自旋锁时不会释放此自旋锁。 在递归程序中使用自旋锁应遵守下列策略递归程序决不能在持有自旋锁时调用它自己也决不能在递归调用时试图获得相同的自旋锁。 过多占用cpu资源。如果不加限制由于申请者一直在循环等待因此自旋锁在锁定的时候,如果不成功,不会睡眠,会持续的尝试,单cpu的时候自旋锁会让其它process动不了. 因此一般自旋锁实现会有一个参数限定最多持续尝试次数. 超出后, 自旋锁放弃当前time slice. 等下一次机会。 由此可见自旋锁比较适用于锁使用者保持锁时间比较短的情况。正是由于自旋锁使用者一般保持锁时间非常短因此选择自旋而不是睡眠是非常必要的自旋锁的效率远高于互斥锁。 4. 读写锁 读写锁实际是一种特殊的自旋锁它把对共享资源的访问者划分成读者和写者读者只对共享资源进行读访问写者则需要对共享资源进行写操作。 这种锁相对于自旋锁而言能提高并发性因为在多处理器系统中它允许同时有多个读者来访问共享资源最大可能的读者数为实际的逻辑CPU数。写者是排他性的一个读写锁同时只能有一个写者或多个读者与CPU数相关但不能同时既有读者又有写者。在读写锁保持期间也是抢占失效的。 如果读写锁当前没有读者也没有写者那么写者可以立刻获得读写锁否则它必须自旋在那里直到没有任何写者或读者。如果读写锁没有写者那么读者可以立即获得该读写锁否则读者必须自旋在那里直到写者释放该读写锁。 5. 乐观锁 悲观锁 乐观锁其实主要就是一种思想因为乐观锁的操作过程中其实没有没有任何锁的参与乐观锁只是和悲观锁相对严格的说乐观锁不能称之为锁。 悲观锁总是假设最坏的情况每次去拿数据的时候都认为别人会修改所以每次在拿数据的时候都会上锁这样别人想拿这个数据就会阻塞直到它拿到锁共享资源每次只给一个线程使用其它线程阻塞用完后再把资源转让给其它线程。 乐观锁总是假设最好的情况每次去拿数据的时候都认为别人不会修改所以不会上锁只在更新的时候会判断一下在此期间别人有没有去更新这个数据。 乐观锁适用于写比较少的情况下多读场景即冲突真的很少发生的时候这样可以省去了锁的开销加大了系统的整个吞吐量。但如果是多写的情况一般会经常产生冲突这就会导致上层应用会不断的进行retry这样反倒是降低了性能所以一般多写的场景下用悲观锁就比较合适。
乐观锁常见的两种实现方式 1. 版本号机制 CAS机制保证了在更新数据的时候没有被修改为其他数据的同步机制版本机制就保证了没有被修改过的同步机制 2. CAS机制 当多个线程尝试使用CAS同时更新同一个变量时只有其中一个线程能更新变量的值而其它线程都失败。 6. 死锁 死锁是指两个或两个以上的进程在执行过程中由于竞争资源或者由于彼此通信而造成的一种阻塞的现象若无外力作用它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁这些永远在互相等待的进程称为死锁进程。 虽然进程在运行过程中可能发生死锁但死锁的发生也必须具备一定的条件死锁的发生必须具备以下四个必要条件
互斥条件指进程对所分配到的资源进行排它性使用即在一段时间内某资源只由一个进程占用。如果此时还有其它进程请求资源则请求者只能等待直至占有资源的进程用毕释放。请求和保持条件指进程已经保持至少一个资源但又提出了新的资源请求而该资源已被其它进程占有此时请求进程阻塞但又对自己已获得的其它资源保持不放。不剥夺条件指进程已获得的资源在未使用完之前不能被剥夺只能在使用完时由自己释放。环路等待条件指在发生死锁时必然存在一个进程——资源的环形链即进程集合{P0P1P2···Pn}中的P0正在等待一个P1占用的资源P1正在等待P2占用的资源……Pn正在等待已被P0占用的资源。 二、go中锁机制 在 Golang 里有专门的方法来实现锁就是 sync 包这个包有两个很重要的锁类型。一个叫 Mutex 利用它可以实现互斥锁。一个叫 RWMutex利用它可以实现读写锁。
sync.Mutex 的锁只有一种锁Lock()它是互斥锁同一时间只能有一个锁。sync.RWMutex 叫读写锁它有两种锁 RLock() 和 Lock() RLock() 叫读锁。它不是绝对锁可以有多个读者同时获取此锁调用 mu.RLock。Lock() 叫写锁它是个绝对锁就是说如果一旦某人拿到了这个锁别人就不能再获取此锁了。
1. Mutex-互斥锁 Mutex 的实现主要借助了 CAS 指令 自旋 信号量 数据结构
type Mutex struct {state int32sema uint32
}上述两个加起来只占 8 字节空间的结构体表示了 Go语言中的互斥锁
状态 在默认情况下互斥锁的所有状态位都是 0int32 中的不同位分别表示了不同的状态
1位表示是否被锁定1位表示是否有协程已经被唤醒1位表示是否处于饥饿状态剩下29位表示阻塞的协程数
正常模式和饥饿模式 正常模式正常模式下waiter都是先入先出在队列中等待的waiter被唤醒后不会直接获取锁因为要和新来的goroutine 进行竞争新来的goroutine相对于被唤醒的waiter是具有优势的新的goroutine 正在cpu上运行被唤醒的waiter还要进行调度才能进入状态所以在并发的情况下waiter大概率抢不过新来的goroutine这个时候waiter会被放到队列的头部如果等待的时间超过了1ms这个时候Mutex就会进入饥饿模式。 饥饿模式当Mutex进入饥饿模式之后锁的所有权会从解锁的goroutine移交给队列头部的goroutine这几个时候新来的goroutine会直接放入队列的尾部这样很好的解决了老的goroutine一直抢不到锁的场景。 对于两种模式正常模式下的性能是最好的goroutine可以连续多次获取锁饥饿模式解决了取锁公平的问题但是性能会下降其实是性能和公平的一个平衡模式。所以在lock的源码里面当队列只剩本省goroutine一个并且等待时间没有超过1ms这个时候Mutex会重新恢复到正常模式。
Lock函数
// 加锁
// 如果锁已经被使用调用goroutine阻塞直到锁可用
func (m *Mutex) Lock() {// 快速路径没有竞争直接获取到锁修改状态位为加锁if atomic.CompareAndSwapInt32(m.state, 0, mutexLocked) {// 开启-race之后会进行判断正常情况可忽略if race.Enabled {race.Acquire(unsafe.Pointer(m))}return}// 慢路径以便快速路径可以内联m.lockSlow()
}来看看Lock函数分为两个部分 快速路径先通过CAS尝试直接获取锁如果能获取到直接返回否则进入慢路径的方法这里的代码注释提到了内联 tips方法内联 简单的说方法内联就是将被调用方函数代码“复制”到调用方函数中减少函数调用开销在2018年之前的go版本中所有的逻辑都在Lock函数中并没有拆出来2018年之后Go开发者将slow path拆出来当lock方法被频繁调用的时候有两种情况如果直接获得锁走的是fast path这个时候内联就只有fast path 的代码这样会减少方法调用的堆栈空间和时间的消耗 如果处于自旋锁竞争的情况下走的是slow path这个时候才会把lock slow 的方法内联进来这样方便了编译器做内联。 lockSlow 函数
func (m *Mutex) lockSlow() {var waitStartTime int64 //记录请求锁的初始时间starving : false //饥饿标记awoke : false //唤醒标记iter : 0 //自旋次数old : m.state //当前锁的状态for {//锁处于正常模式还没有释放的时候尝试自旋if old(mutexLocked|mutexStarving) mutexLocked runtime_canSpin(iter) { //在临界区耗时很短的情况下提高性能if !awoke oldmutexWoken 0 oldmutexWaiterShift ! 0 atomic.CompareAndSwapInt32(m.state, old, old|mutexWoken) {awoke true}runtime_doSpin()iter//更新锁的状态old m.statecontinue}new : old// 非饥饿状态进行加锁if oldmutexStarving 0 {new | mutexLocked}// 等待着数量1if old(mutexLocked|mutexStarving) ! 0 {new 1 mutexWaiterShift}// 加锁的情况下切换为饥饿模式if starving oldmutexLocked ! 0 {new | mutexStarving}//goroutine 唤醒的时候进行重置标志if awoke {if newmutexWoken 0 {throw(sync: inconsistent mutex state)}new ^ mutexWoken}// 设置新的状态if atomic.CompareAndSwapInt32(m.state, old, new) {if old(mutexLocked|mutexStarving) 0 {break }// 判断是不是第一次加入队列// 如果之前就在队列里面等待了加入到队头queueLifo : waitStartTime ! 0if waitStartTime 0 {waitStartTime runtime_nanotime()}// 阻塞等待runtime_SemacquireMutex(m.sema, queueLifo, 1)// 检查锁是否处于饥饿状态starving starving || runtime_nanotime()-waitStartTime starvationThresholdNsold m.state// 如果锁处于饥饿状态直接抢到锁if oldmutexStarving ! 0 {if old(mutexLocked|mutexWoken) ! 0 || oldmutexWaiterShift 0 {throw(sync: inconsistent mutex state)}// 设置标志进行加锁并且waiter-1delta : int32(mutexLocked - 1mutexWaiterShift)// 如果是最后一个的话清除饥饿标志if !starving || oldmutexWaiterShift 1 {// 退出饥饿模式 delta - mutexStarving}atomic.AddInt32(m.state, delta)break}awoke trueiter 0} else {old m.state}}// -race开启检测冲突可以忽略if race.Enabled {race.Acquire(unsafe.Pointer(m))}
}Unlock函数
//如果对没有lock 的Mutex进行unlock会报错
//unlock和goroutine是没有绑定的对于一个Mutex可以一个goroutine加锁另一个goroutine进行解锁
func (m *Mutex) Unlock() {if race.Enabled {_ m.staterace.Release(unsafe.Pointer(m))}// 快速之路直接解锁去除加锁位的标记new : atomic.AddInt32(m.state, -mutexLocked)if new ! 0 {// 解锁失败进入慢路径//同样的对慢路径做了单独封装便于内联m.unlockSlow(new)}
}unlockSlow函数
func (m *Mutex) unlockSlow(new int32) {//解锁一个未加锁的Mutex会报错可以想想为什么Mutex使用状态位进行标记锁的状态的if (newmutexLocked)mutexLocked 0 {throw(sync: unlock of unlocked mutex)}if newmutexStarving 0 {old : newfor {//正常模式下没有waiter或者在处理事情的情况下直接返回if oldmutexWaiterShift 0 || old(mutexLocked|mutexWoken|mutexStarving) ! 0 {return}//如果有等待者设置mutexWoken标志waiter-1更新statenew (old - 1mutexWaiterShift) | mutexWokenif atomic.CompareAndSwapInt32(m.state, old, new) {runtime_Semrelease(m.sema, false, 1)return}old m.state}} else {// 饥饿模式下会直接将mutex交给下一个等待的waiter让出时间片以便waiter执行runtime_Semrelease(m.sema, true, 1)}
}同样的在unlock也有fastpath和slowpathfastpath尝试解锁解锁成功就返回否则进入slowpathslowpath分为正常模式的处理和饥饿模式的处理饥饿模式直接将锁的控制权交给队列中等待的waiter正常模式分两种情况 如果当前没有waiter只有自己本身直接解锁返回如果有waiter解锁后唤醒下个等待者。 2. RWMutex-读写锁 RWMutex 是一个读/写互斥锁在某一时刻只能由任意数量的 reader 持有 或者 一个 writer 持有。也就是说要么放行任意数量的 reader多个 reader 可以并行读要么放行一个 writer多个 writer 需要串行写。
RWMutex 对外暴露的方法有五个
RLock()读操作获取锁如果锁已经被 writer 占用会一直阻塞直到 writer 释放锁否则直接获得锁RUnlock()读操作完毕之后释放锁Lock()写操作获取锁如果锁已经被 reader 或者 writer 占用会一直阻塞直到获取到锁否则直接获得锁Unlock()写操作完毕之后释放锁RLocker()返回读操作的 Locker 对象该对象的 Lock() 方法对应 RWMutex 的 - RLock()Unlock() 方法对应 RWMutex 的 RUnlock() 方法。 一旦涉及到多个 reader 和 writer 就需要考虑优先级问题是 reader 优先还是 writer 优先。
2.1 RWMutex流程概览 可以想象 RWMutex 有两个队伍一个是包含 所有reader 和你获得准入权writer 的 队列A一个是还没有获得准入权 writer 的 队列B。 队列 A 最多只允许有 一个writer如果有其他 writer需要在 队列B 等待 当一个 writer 到了 队列A 后只允许它 之前的reader 执行读操作新来的 reader 需要在 队列A 后面排队 当前面的 reader 执行完读操作之后writer 执行写操作 writer 执行完写操作后让 后面的reader 执行读操作再唤醒队列B 的一个 writer 到 队列A 后面排队。 初始时刻 队列A 中 writer W1 前面有三个 reader后面有两个 reader队列B中有两个 writer 并发读 多个 reader 可以同时获取到读锁进入临界区进行读操作writer W1 在 队列A 中等待同时又来了两个 reader直接在 队列A 后面排队 写操作 W1 前面所有的 reader 完成后W1 获得锁进入临界区操作 获得准入权 W1 完成写操作退出先让后面排队的 reader 进行读操作然后从 队列B 中唤醒 W2 到 队列A 排队。W2 从 队列B 到 队列A 的过程中R8 先到了 队列A因此 R8 可以执行读操作。R9、R10、R11 在 W2 之后到的所以在后面排队新来的 W4 直接在队列B 排队。 从上面的示例可以看出RWMutex 可以看作是没有优先级按照先来先到的顺序去执行只不过是 多个reader 可以 并行去执行罢了。
2.2 写锁饥饿问题 因为读锁是共享的所以如果当前已经有读锁那后续goroutine继续加读锁正常情况下是可以加锁成功但是如果一直有读锁进行加锁那尝试加写锁的goroutine则可能会长期获取不到锁这就是因为读锁而导致的写锁饥饿问题
go通过引入以下特性避免出现写锁饥饿
当写锁阻塞时新的读锁是无法申请的 即在sync.RWMutex的使用中一个线程请求了他的写锁mx.Lock()后即便它还没有取到该锁可能由于资源已被其他人锁定后面所有的读锁的申请都将被阻塞只有取写锁的请求得到了锁且用完释放后读锁才能去取。 这种特性可以有效防止写锁饥饿。如果一个线程因为某种原因导致长时间得不到CPU时间片这种状态被称之为饥饿。
2.3. golang的读写锁源码剖析
成员变量
结构体
type RWMutex struct {w Mutex // held if there are pending writerswriterSem uint32 // 用于writer等待读完成排队的信号量readerSem uint32 // 用于reader等待写完成排队的信号量readerCount int32 // 读锁的计数器readerWait int32 // 等待读锁释放的数量
}写锁计数 读写锁中允许加读锁的最大数量是4294967296在go里面对写锁的计数采用了负值进行通过递减最大允许加读锁的数量从而进行写锁对读锁的抢占
const rwmutexMaxReaders 1 302.3.1 读锁实现 读锁加锁逻辑 func (rw *RWMutex) RLock() {if race.Enabled {_ rw.w.staterace.Disable()}// 累加reader计数器如果小于0则表明有writer正在等待if atomic.AddInt32(rw.readerCount, 1) 0 {// 当前有writer正在等待读锁读锁就加入排队runtime_SemacquireMutex(rw.readerSem, false)}if race.Enabled {race.Enable()race.Acquire(unsafe.Pointer(rw.readerSem))}
}读锁释放逻辑 func (rw *RWMutex) RUnlock() {if race.Enabled {_ rw.w.staterace.ReleaseMerge(unsafe.Pointer(rw.writerSem))race.Disable()}// 如果小于0则表明当前有writer正在等待if r : atomic.AddInt32(rw.readerCount, -1); r 0 {if r1 0 || r1 -rwmutexMaxReaders {race.Enable()throw(sync: RUnlock of unlocked RWMutex)}// 将等待reader的计数减1证明当前是已经有一个读的如果值0则进行唤醒等待的if atomic.AddInt32(rw.readerWait, -1) 0 {// The last reader unblocks the writer.runtime_Semrelease(rw.writerSem, false)}}if race.Enabled {race.Enable()}
}2.3.2 写锁实现 加写锁实现 func (rw *RWMutex) Lock() {if race.Enabled {_ rw.w.staterace.Disable()}// 首先获取mutex锁同时多个goroutine只有一个可以进入到下面的逻辑rw.w.Lock()// 对readerCounter进行进行抢占通过递减rwmutexMaxReaders允许最大读的数量// 来实现写锁对读锁的抢占r : atomic.AddInt32(rw.readerCount, -rwmutexMaxReaders) rwmutexMaxReaders// 记录需要等待多少个reader完成,如果发现不为0则表明当前有reader正在读取当前goroutine// 需要进行排队等待if r ! 0 atomic.AddInt32(rw.readerWait, r) ! 0 {runtime_SemacquireMutex(rw.writerSem, false)}if race.Enabled {race.Enable()race.Acquire(unsafe.Pointer(rw.readerSem))race.Acquire(unsafe.Pointer(rw.writerSem))}
}释放写锁 func (rw *RWMutex) Unlock() {if race.Enabled {_ rw.w.staterace.Release(unsafe.Pointer(rw.readerSem))race.Disable()}// 将reader计数器复位上面减去了一个rwmutexMaxReaders现在再重新加回去即可复位r : atomic.AddInt32(rw.readerCount, rwmutexMaxReaders)if r rwmutexMaxReaders {race.Enable()throw(sync: Unlock of unlocked RWMutex)}// 唤醒所有的读锁for i : 0; i int(r); i {runtime_Semrelease(rw.readerSem, false)}// 释放mutexrw.w.Unlock()if race.Enabled {race.Enable()}
}2.3.3 关键核心机制
写锁对读锁的抢占 加写锁的抢占
// 在加写锁的时候通过将readerCount递减最大允许加读锁的数量来实现对加读锁的抢占r : atomic.AddInt32(rw.readerCount, -rwmutexMaxReaders) rwmutexMaxReaders加读锁的抢占检测
// 如果没有写锁的情况下读锁的readerCount进行Add后一定是一个0的数字这里通过检测值为负数
//就实现了读锁对写锁抢占的检测
if atomic.AddInt32(rw.readerCount, 1) 0 {// A writer is pending, wait for it.runtime_SemacquireMutex(rw.readerSem, false)}写锁抢占读锁后后续的读锁就会加锁失败但是如果想加写锁成功还要继续对已经加读锁成功的进行等待
if r ! 0 atomic.AddInt32(rw.readerWait, r) ! 0 {// 写锁发现需要等待的读锁释放的数量不为0就自己自己去休眠了runtime_SemacquireMutex(rw.writerSem, false)
}写锁既然休眠了则必定要有一种唤醒机制其实就是每次释放锁的时候当检查到有加写锁的情况下就递减readerWait并由最后一个释放reader lock的goroutine来实现唤醒写锁
if atomic.AddInt32(rw.readerWait, -1) 0 {// The last reader unblocks the writer.runtime_Semrelease(rw.writerSem, false)
}3. 常见问题
不可复制 和 Mutex 一样RWMutex 也是不可复制。不能复制的原因和互斥锁一样。一旦读写锁被使用它的字段就会记录它当前的一些状态。这个时候你去复制这把锁就会把它的状态也给复制过来。但是原来的锁在释放的时候并不会修改你复制出来的这个读写锁这就会导致复制出来的读写锁的状态不对可能永远无法释放锁。不可重入 不可重入的原因是获得锁之后还没释放锁又申请锁这样有可能造成死锁。比如 reader A 获取到了读锁writer B 等待 reader A 释放锁reader 还没释放锁又申请了一把锁但是这把锁申请不成功他需要等待 writer B。这就形成了一个循环等待的死锁。加锁和释放锁一定要成对出现不能忘记释放锁也不能解锁一个未加锁的锁。