接下来我们分析Mutex
的源码:
Mutex
采用了一个int32
类型的字段:state
来描述当前锁的状态,包括:
- 当前
mutex
是否被持有 - 是否当前已经唤醒了一个
goroutine
- 是否处于饥饿状态
- 当前等待唤醒的
goroutine
数量
其排列如下:
通过同一个字段表示所有的状态,则需要用到位运算,这里先简单总结一下源码中常见的位运算的的作用。
先看各种状态的定义:
const (
//除第一位为1之外,其他位都是0
mutexLocked = 1 << iota
//除第二位为1之前,其他位都是0
mutexWoken
//除第三位为1之前,其他位都是0
mutexStarving
//还原waiterNum时需要位移的位数
mutexWaiterShift = iota
)
mutexLocked、mutexWoken、mutexStarving
都是标志位,除指定位为1之前,其他位全是0,因此通过&
操作可以判断其对应的位是否为1,例如:
判断:
//判断old第mutexLocked位是否为1
if old&mutexLocked!=0{
//xxx
}
取值:
从上图可以看到state
的第3-30位被用来表示当前等待唤醒的goroutine
数量。因此每次需要取值时,需要先右移mutexWaiterShift
位,当需要赋值时,需要左移mutexWaiterShift
位,通过右移/左移可以屏蔽前面三位的影响,例如:
if old&(mutexLocked|mutexStarving) != 0 {
//将waiterNum的值加一
new += 1 << mutexWaiterShift
}
Mutex.Lock()
func (m *Mutex) Lock() {
//m.state没有任何g持有锁时,其值为0
//如果修改成功则说明成功获取锁
//借助atomic 实现Happens Before语义
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
//没有成功快速获取锁,进入复杂逻辑处理
m.lockSlow()
}
可以看到在Lock
第一步便是想直接通过atomic
简单的尝试获取锁,如果获取失败,则说明此时锁已经被其他goroutine
持有。
func (m *Mutex) lockSlow() {
//阻塞时间,用来计算是否需要进入饥饿模式
var waitStartTime int64
//当前请求是否进入了饥饿模式
starving := false
//当前是否为唤醒状态
awoke := false
iter := 0
old := m.state
for {
//尝试自旋
//只有Mutex被锁定并且没有进入饥饿模式以及满足自旋的条件才允许自旋
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
//当当前请求为非awoke状态
//并且Mutex mutexWoken标志为0
//并且当前Mutex存在其他等待的goroutine
//尝试设置Mutex的mutexWoken标志为1
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
//设置成功,则当前请求进入awoke状态
//awoke表示当前有goroutine正在尝试获取锁
//解锁时通过判断是否有处于awoke状态的goroutine可以避免再唤醒队列中goroutine
awoke = true
}
//让出cpu,自旋
runtime_doSpin()
//增加自选次数
iter++
old = m.state
continue
}
//执行到这段代码有两种可能
//1. 通过自旋发现锁为非锁定状态,开始尝试争抢锁
//2. 不满足自旋条件
new := old
//当前为非饥饿模式才争抢锁
if old&mutexStarving == 0 {
new |= mutexLocked
}
//如果当前Mutex处于锁定或饥饿状态,则增加WaiteNum,准备休眠
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
//如果当前goroutine需要进入饥饿状态并且锁未被释放则设置饥饿标记
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
//如果goroutine是被唤醒状态
//awoke设置表示当前goroutine是由休眠转为唤醒状态
if awoke {
//检查mutexWoken标志
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
//关闭mutexWoken标志
new &^= mutexWoken
}
//通过CAS修改m.state
if atomic.CompareAndSwapInt32(&m.state, old, new) {
//如果之前的状态未被锁定并且不是饥饿状态,则说明抢锁成功,返回
//此时已经通过new字段将m.state的mutexLocked标志设置为1
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
// 如果是g是被唤醒的,则将g排在队首
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
//休眠
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
//到这里说明g又被唤醒
//通过休眠的时间判断休眠时间,是否需要进入休眠模式
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
//获取m.state最新状态
old = m.state
//如果Mutex的饥饿标志已经被设置,则进入饥饿流程
if old&mutexStarving != 0 {
//检查饥饿模式的条件:
//如果当前Mutex未被锁定或者存在其他被唤醒的goroutine
//或者当前等待唤醒的g数量为0则抛出异常
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
delta := int32(mutexLocked - 1<<mutexWaiterShift)
//如果当前goroutine退出了饥饿模式
//或则当前Mutex没有其他的等待者
//则退出饥饿模式
if !starving || old>>mutexWaiterShift == 1 {
delta -= mutexStarving
}
//这里是通过直接修改改变的state状态,而不是通过CAS修改,因为判断状态用的CAS,因此这样做也不会有什么问题
//state=state+mutexLocked - 1
atomic.AddInt32(&m.state, delta)
break
}
//未进入饥饿模式,设置自己被唤醒,继续循环
awoke = true
iter = 0
} else {
//cas失败,重新赋值old,继续循环
old = m.state
}
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}
上面一整段代码比较复杂,毕竟是需要通过CAS
实现串行执行的效果。而比较复杂的另一个原因便是上面的代码融合了3中情况:自旋、饥饿和获取锁。其实只要单独对上面的三种情况分开分析,思路便变得清晰起来,下面我们将分能拆分来分析代码。
锁定
对于锁功能来说,其实主要是mutexLocked
标志位与waiterNum
。加锁时,谁成功通过CAS
将mutexLocked
由0修改为1,则代表谁成功获取到锁;解锁时,通过CAS
修改waiterNum
数量,如果waiterNum
大于0,则还需要唤醒等待的goroutine
。伪代码如下:
func (m *Mutex) Lock() {
old:=m.state
for{
new:=old | mutexLocked
//如果锁已经被其他g获取,则添加waiterNum,准备休眠
if old & mutexLocked != 0{
new+=1<<mutexWaiterShift
}
//cas修改状态
if atomic.CompareAndSwapInt32(&m.state, old, new){
if old& mutexLocked == 0 {
//获取锁成功,退出
return
}
//休眠
semacquire(&m.sema)
}
}
}
自旋
为了减少goroutine
的休眠与唤醒的次数,在Lock()
方法中可以循环检查mutexWoken
标志位,如果是被其他g
持有状态,则暂停一小段时间,但是不休眠。通过多次循环,能够增加g
在不休眠的情况下成功获取锁的概率。
func (m *Mutex)Lock(){
iter:=0
for {
if runtime_canSpin(iter){
//通过awoke标志位标识当前有唤醒的goroutine正在争抢锁
//当执行解锁时通过判断awoke标志位如果为true则可以不唤醒休眠的goroutine,避免一次唤醒浪费
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
//执行休眠,对应汇编代码CPU PUSE指令
runtime_doSpin()
}
}
//抢锁
//...
}
runtime_canSpin
对应的源码为runtime#porc.go
func sync_runtime_canSpin(i int) bool {
// 自旋次数不大于4
// CPU核数大于1
// 有其他非idle的p正在运行 && GOMAXPROCS > 1
if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
return false
}
//与当前`g`绑定的`p`本地运行队列为空
if p := getg().m.p.ptr(); !runqempty(p) {
return false
}
return true
}
以上条件总结起来便是g
的自旋不能影响其他的g
的调度。
当满足以上自旋条件时,Mutex
便会调用sync_runtime_doSpin
开始空转。
func sync_runtime_doSpin() {
//通过汇编调用CPU PAUSE指令30次
procyield(active_spin_cnt)
}
同时可以看到上面的代码多了一个awoke
标记,通过这个awoke
标记可以在UnLock()
方法解锁是判断是否有活跃的goroutine
,如果有,则说明此时已经存在活跃的goroutine
在争抢锁,那么此时不用再唤醒休眠的goroutine
,避免无效唤醒。
func (m *Mutex) UnLock(){
//解锁
new := atomic.AddInt32(&m.state, -mutexLocked)
old := new
for{
//如果锁已经被其他g获取
//或者有其他的g已经被唤醒
//则不用再唤醒休眠的g
if old & (mutexxLocked | mutexWoken)!=0{
return
}
//waiterNum数量减一,准备唤醒一个休眠的g
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
//修改成功,唤醒g
runtime_Semrelease(&m.sema, false, 1)
return
}
}
}
饥饿
g
的饥饿来自于休眠时间的检测,当一个goroutine
被唤醒之后,会首先检查自己的休眠时间,如果休眠时间超过1ms
,则会在新的一次尝试CAS
过程中修改Mutex
的mutexStarving
,修改成功之后,新来的goroutine
会避免自旋和争抢锁,而是直接进入休眠队列等待唤醒。
func (m *Mutex) Lock(){
for{
//自旋
//...
//如果Mutex mutexStarving标志位为被设置,才尝试修改mutexLocked标志
if old&mutexStarving == 0 {
new |= mutexLocked
}
//如果Mutex已经被抢占或则处于饥饿状态,则直接增加WaiterNum数,准备休眠
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
//如果在循环中发现自己满足饥饿模式的条件并且锁被其他g抢占
//则设置Mutex的mutexStarving标志位为1
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
if atomic.CompareAndSwapInt32(&m.state, old, new) {
//修改成功
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
//记录休眠的起始时间
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
//休眠
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
//到这里已经被唤醒
//通过时间对比出是否需要进入饥饿模式
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
//更新old为m当前的状态
old = m.state
//如果old进入了饥饿模式,则说明我们是在饥饿模式中被唤醒
//此时我们一定能获取锁,因为饥饿模式是按序唤醒
if old&mutexStarving != 0 {
//如果old被锁定或者有其他活跃的g在争抢锁,则抛出异常,因为饥饿模式一定是按顺序唤醒
//如果WaiterNum等于0则抛出异常,因为如果在饥饿模式只有一个等待的g,则应该退出饥饿模式
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
//直接减少一个waiterNum
delta := int32(mutexLocked - 1<<mutexWaiterShift)
//如果当前g不满足饥饿模式的条件或者后面没有其他的等待的g,则退出饥饿模式
if !starving || old>>mutexWaiterShift == 1 {
delta -= mutexStarving
}
//将值直接添加到m.state上
atomic.AddInt32(&m.state, delta)
//锁获取成功,退出循环
break
}
}
}
}
可以看到,对于饥饿模式的锁来说,获取锁与非公平模式完全是两套代码,非公平模式需要在for
循环中执行,因为可能存在争抢,所以存在失败的可能,而饥饿模式则是直接修改state
即可,因为不存在争抢。
这里可能存在一个疑问。在通过CAS
成功修改m.state
状态之后,为什么需要判断if old&(mutexLocked|mutexStarving) == 0
才认为成功获取锁?这行代码的意思为:old
状态未被锁定并且不为饥饿模式:
- 需要为未被锁定的原因:因为是通过
CAS
修改的状态,所以由锁定修改为锁定,CAS
也是成功的。因此只有是由未被锁定修改为锁定状态才能认为是抢锁成功。 - 需要为非饥饿模式的原因:当
old
为饥饿模式时,能执行到这行代码的只能为刚好有活跃的goroutine
执行Lock()
方法,如果正好处于饥饿模式且持有锁的g
调用UnLock()
释放了锁,那么活跃的goroutine
可能能刚好通过CAS
将state
由unlock
修改为lock
状态,但是饥饿模式的要求是按顺序获取锁,因此这里需要再判断mutexStarving
标记。
结合以上3种功能的代码,便是最终的Mutex#Lock()
方法。接下来看UnLock()
UnLock()
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
//将state的mutexLocked设置为0
new := atomic.AddInt32(&m.state, -mutexLocked)
//new!=0表示当前锁没有其他等待者,此时需要考虑唤醒其他等待中的`g`
if new != 0 {
m.unlockSlow(new)
}
}
func (m *Mutex) unlockSlow(new int32) {
//校验new的值,因为是解锁,所以new应该为非锁定状态
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
//如果是非饥饿状态
if new&mutexStarving == 0 {
old := new
for {
//如果当前waiterNum为0,说明没有等待的waiter,直接返回
//如果当前状态被锁定,说明可能被其他活跃的goroutine争抢到锁,直接返回
//如果当前状态mutexWoken被标记,说明存在其他的活跃goroutine正在争抢锁,直接返回
//如果当前状态被修改为饥饿模式,说明一定存在某个被唤醒的gorotuine,直接返回
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
//减少等待者的数量,并设置Woken标记,准备唤醒休眠的goroutine
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
//CAS成功,唤醒队列中的g
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else {
//饥饿模式,直接按顺序唤醒gorout
runtime_Semrelease(&m.sema, true, 1)
}
}
总结
golang
的Mutex
代码通过CAS
操作Mutex
的一个属性,完全融合了锁定、自旋、饥饿三种功能,想要理解Mutex
的源码,需要将其Lock()
方法分为3个部分来分析;理解Mutex
的原理,可以进一步帮忙我们实现更具有鲁棒性的代码,在下一篇中,我们将继续分析Mutex
的核心:runtime_SemacquireMutex
与runtime_Semrelease
。