sync.Mutex 源码分析(二)

接下来我们分析Mutex的源码:

Mutex采用了一个int32类型的字段:state来描述当前锁的状态,包括:

  • 当前mutex是否被持有
  • 是否当前已经唤醒了一个goroutine
  • 是否处于饥饿状态
  • 当前等待唤醒的goroutine数量

其排列如下:

image-20221019153333348

通过同一个字段表示所有的状态,则需要用到位运算,这里先简单总结一下源码中常见的位运算的的作用。

先看各种状态的定义:

const (
  //除第一位为1之外,其他位都是0
    mutexLocked = 1 << iota 
    //除第二位为1之前,其他位都是0
  mutexWoken
  //除第三位为1之前,其他位都是0
    mutexStarving

  //还原waiterNum时需要位移的位数
    mutexWaiterShift = iota
)

mutexLocked、mutexWoken、mutexStarving都是标志位,除指定位为1之前,其他位全是0,因此通过&操作可以判断其对应的位是否为1,例如:

判断:

//判断old第mutexLocked位是否为1
if old&mutexLocked!=0{
   //xxx
}

取值:

从上图可以看到state的第3-30位被用来表示当前等待唤醒的goroutine数量。因此每次需要取值时,需要先右移mutexWaiterShift位,当需要赋值时,需要左移mutexWaiterShift位,通过右移/左移可以屏蔽前面三位的影响,例如:

if old&(mutexLocked|mutexStarving) != 0 {
            //将waiterNum的值加一
      new += 1 << mutexWaiterShift
}

Mutex.Lock()

func (m *Mutex) Lock() {
    //m.state没有任何g持有锁时,其值为0
  //如果修改成功则说明成功获取锁
  //借助atomic 实现Happens Before语义
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        if race.Enabled {
            race.Acquire(unsafe.Pointer(m))
        }
        return
    }

  //没有成功快速获取锁,进入复杂逻辑处理
    m.lockSlow()
}

可以看到在Lock第一步便是想直接通过atomic简单的尝试获取锁,如果获取失败,则说明此时锁已经被其他goroutine持有。

func (m *Mutex) lockSlow() {
  //阻塞时间,用来计算是否需要进入饥饿模式
    var waitStartTime int64
  //当前请求是否进入了饥饿模式
    starving := false
  //当前是否为唤醒状态
    awoke := false
    iter := 0
    old := m.state
    for {
        //尝试自旋
    //只有Mutex被锁定并且没有进入饥饿模式以及满足自旋的条件才允许自旋
        if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {

      //当当前请求为非awoke状态
      //并且Mutex mutexWoken标志为0
      //并且当前Mutex存在其他等待的goroutine
      //尝试设置Mutex的mutexWoken标志为1
            if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                  //设置成功,则当前请求进入awoke状态
          //awoke表示当前有goroutine正在尝试获取锁
          //解锁时通过判断是否有处于awoke状态的goroutine可以避免再唤醒队列中goroutine
          awoke = true
            }
      //让出cpu,自旋
            runtime_doSpin()
      //增加自选次数
            iter++
            old = m.state
            continue
        }

    //执行到这段代码有两种可能
    //1. 通过自旋发现锁为非锁定状态,开始尝试争抢锁
    //2. 不满足自旋条件
        new := old

    //当前为非饥饿模式才争抢锁 
        if old&mutexStarving == 0 {
            new |= mutexLocked
        }

    //如果当前Mutex处于锁定或饥饿状态,则增加WaiteNum,准备休眠
        if old&(mutexLocked|mutexStarving) != 0 {
            new += 1 << mutexWaiterShift
        }

        //如果当前goroutine需要进入饥饿状态并且锁未被释放则设置饥饿标记
        if starving && old&mutexLocked != 0 {
            new |= mutexStarving
        }

    //如果goroutine是被唤醒状态
    //awoke设置表示当前goroutine是由休眠转为唤醒状态
        if awoke {
            //检查mutexWoken标志
            if new&mutexWoken == 0 {
                throw("sync: inconsistent mutex state")
            }

      //关闭mutexWoken标志
            new &^= mutexWoken
        }

    //通过CAS修改m.state
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
      //如果之前的状态未被锁定并且不是饥饿状态,则说明抢锁成功,返回
      //此时已经通过new字段将m.state的mutexLocked标志设置为1
            if old&(mutexLocked|mutexStarving) == 0 {
                break // locked the mutex with CAS
            }
            // 如果是g是被唤醒的,则将g排在队首
            queueLifo := waitStartTime != 0
            if waitStartTime == 0 {
                waitStartTime = runtime_nanotime()
            }
      //休眠
            runtime_SemacquireMutex(&m.sema, queueLifo, 1)

      //到这里说明g又被唤醒
      //通过休眠的时间判断休眠时间,是否需要进入休眠模式
            starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
            //获取m.state最新状态
      old = m.state
      //如果Mutex的饥饿标志已经被设置,则进入饥饿流程
            if old&mutexStarving != 0 {

        //检查饥饿模式的条件:
        //如果当前Mutex未被锁定或者存在其他被唤醒的goroutine
        //或者当前等待唤醒的g数量为0则抛出异常
                if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
                    throw("sync: inconsistent mutex state")
                }

                delta := int32(mutexLocked - 1<<mutexWaiterShift)
        //如果当前goroutine退出了饥饿模式
        //或则当前Mutex没有其他的等待者
        //则退出饥饿模式
                if !starving || old>>mutexWaiterShift == 1 {
                    delta -= mutexStarving
                }
        //这里是通过直接修改改变的state状态,而不是通过CAS修改,因为判断状态用的CAS,因此这样做也不会有什么问题
        //state=state+mutexLocked - 1
                atomic.AddInt32(&m.state, delta)
                break
            }

      //未进入饥饿模式,设置自己被唤醒,继续循环
            awoke = true
            iter = 0
        } else {
      //cas失败,重新赋值old,继续循环
            old = m.state
        }
    }

    if race.Enabled {
        race.Acquire(unsafe.Pointer(m))
    }
}

上面一整段代码比较复杂,毕竟是需要通过CAS实现串行执行的效果。而比较复杂的另一个原因便是上面的代码融合了3中情况:自旋、饥饿和获取锁。其实只要单独对上面的三种情况分开分析,思路便变得清晰起来,下面我们将分能拆分来分析代码。

锁定

对于锁功能来说,其实主要是mutexLocked标志位与waiterNum。加锁时,谁成功通过CASmutexLocked由0修改为1,则代表谁成功获取到锁;解锁时,通过CAS修改waiterNum数量,如果waiterNum大于0,则还需要唤醒等待的goroutine。伪代码如下:

func (m *Mutex) Lock() {
  old:=m.state
  for{
      new:=old | mutexLocked
      //如果锁已经被其他g获取,则添加waiterNum,准备休眠
      if old & mutexLocked != 0{
        new+=1<<mutexWaiterShift
      }

     //cas修改状态
     if atomic.CompareAndSwapInt32(&m.state, old, new){
      if old& mutexLocked == 0 {
                 //获取锁成功,退出 
         return 
            }

      //休眠
      semacquire(&m.sema)      
    } 
  }
}

自旋

为了减少goroutine的休眠与唤醒的次数,在Lock()方法中可以循环检查mutexWoken标志位,如果是被其他g持有状态,则暂停一小段时间,但是不休眠。通过多次循环,能够增加g在不休眠的情况下成功获取锁的概率。

func (m *Mutex)Lock(){
  iter:=0

  for {
    if runtime_canSpin(iter){
        //通过awoke标志位标识当前有唤醒的goroutine正在争抢锁
        //当执行解锁时通过判断awoke标志位如果为true则可以不唤醒休眠的goroutine,避免一次唤醒浪费
        if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
            atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
            awoke = true
        }
         //执行休眠,对应汇编代码CPU PUSE指令
         runtime_doSpin()
        }
    }

   //抢锁
   //...
}

runtime_canSpin对应的源码为runtime#porc.go

func sync_runtime_canSpin(i int) bool {
   // 自旋次数不大于4
   // CPU核数大于1
   // 有其他非idle的p正在运行 && GOMAXPROCS > 1
   if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
      return false
   }
   //与当前`g`绑定的`p`本地运行队列为空
   if p := getg().m.p.ptr(); !runqempty(p) {
      return false
   }
   return true
}

以上条件总结起来便是g的自旋不能影响其他的g的调度。

当满足以上自旋条件时,Mutex便会调用sync_runtime_doSpin开始空转。

func sync_runtime_doSpin() {
  //通过汇编调用CPU PAUSE指令30次
   procyield(active_spin_cnt)
}

同时可以看到上面的代码多了一个awoke标记,通过这个awoke标记可以在UnLock()方法解锁是判断是否有活跃的goroutine,如果有,则说明此时已经存在活跃的goroutine在争抢锁,那么此时不用再唤醒休眠的goroutine,避免无效唤醒。

func (m *Mutex) UnLock(){
  //解锁
  new := atomic.AddInt32(&m.state, -mutexLocked)
  old := new
  for{
    //如果锁已经被其他g获取
    //或者有其他的g已经被唤醒
    //则不用再唤醒休眠的g
    if old & (mutexxLocked | mutexWoken)!=0{
       return
    }

    //waiterNum数量减一,准备唤醒一个休眠的g
    new = (old - 1<<mutexWaiterShift) | mutexWoken
    if atomic.CompareAndSwapInt32(&m.state, old, new) {
      //修改成功,唤醒g
      runtime_Semrelease(&m.sema, false, 1)
      return
    }
  }
}

饥饿

g的饥饿来自于休眠时间的检测,当一个goroutine被唤醒之后,会首先检查自己的休眠时间,如果休眠时间超过1ms,则会在新的一次尝试CAS过程中修改MutexmutexStarving,修改成功之后,新来的goroutine会避免自旋和争抢锁,而是直接进入休眠队列等待唤醒。

func (m *Mutex) Lock(){
  for{
      //自旋
      //...

      //如果Mutex mutexStarving标志位为被设置,才尝试修改mutexLocked标志
      if old&mutexStarving == 0 {
        new |= mutexLocked
      }

      //如果Mutex已经被抢占或则处于饥饿状态,则直接增加WaiterNum数,准备休眠
      if old&(mutexLocked|mutexStarving) != 0 {
        new += 1 << mutexWaiterShift
      }

      //如果在循环中发现自己满足饥饿模式的条件并且锁被其他g抢占
      //则设置Mutex的mutexStarving标志位为1
      if starving && old&mutexLocked != 0 {
        new |= mutexStarving
      }

    if atomic.CompareAndSwapInt32(&m.state, old, new) {
        //修改成功
        if old&(mutexLocked|mutexStarving) == 0 {
                    break // locked the mutex with CAS
                }

        //记录休眠的起始时间
        if waitStartTime == 0 {
          waitStartTime = runtime_nanotime()
        }
        //休眠
        runtime_SemacquireMutex(&m.sema, queueLifo, 1)
              //到这里已经被唤醒
        //通过时间对比出是否需要进入饥饿模式 
        starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
        //更新old为m当前的状态
        old = m.state

        //如果old进入了饥饿模式,则说明我们是在饥饿模式中被唤醒
        //此时我们一定能获取锁,因为饥饿模式是按序唤醒
        if old&mutexStarving != 0 {
           //如果old被锁定或者有其他活跃的g在争抢锁,则抛出异常,因为饥饿模式一定是按顺序唤醒
           //如果WaiterNum等于0则抛出异常,因为如果在饥饿模式只有一个等待的g,则应该退出饥饿模式
          if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
            throw("sync: inconsistent mutex state")
          }
          //直接减少一个waiterNum
          delta := int32(mutexLocked - 1<<mutexWaiterShift)
          //如果当前g不满足饥饿模式的条件或者后面没有其他的等待的g,则退出饥饿模式
          if !starving || old>>mutexWaiterShift == 1 {
            delta -= mutexStarving
          }
          //将值直接添加到m.state上
          atomic.AddInt32(&m.state, delta)
          //锁获取成功,退出循环
          break
        }
    }

  }
}

可以看到,对于饥饿模式的锁来说,获取锁与非公平模式完全是两套代码,非公平模式需要在for循环中执行,因为可能存在争抢,所以存在失败的可能,而饥饿模式则是直接修改state即可,因为不存在争抢。

这里可能存在一个疑问。在通过CAS成功修改m.state状态之后,为什么需要判断if old&(mutexLocked|mutexStarving) == 0才认为成功获取锁?这行代码的意思为:old状态未被锁定并且不为饥饿模式:

  • 需要为未被锁定的原因:因为是通过CAS修改的状态,所以由锁定修改为锁定,CAS也是成功的。因此只有是由未被锁定修改为锁定状态才能认为是抢锁成功。
  • 需要为非饥饿模式的原因:当old为饥饿模式时,能执行到这行代码的只能为刚好有活跃的goroutine执行Lock()方法,如果正好处于饥饿模式且持有锁的g调用UnLock()释放了锁,那么活跃的goroutine可能能刚好通过CASstateunlock修改为lock状态,但是饥饿模式的要求是按顺序获取锁,因此这里需要再判断mutexStarving标记。

结合以上3种功能的代码,便是最终的Mutex#Lock()方法。接下来看UnLock()

UnLock()

func (m *Mutex) Unlock() {
    if race.Enabled {
        _ = m.state
        race.Release(unsafe.Pointer(m))
    }

    //将state的mutexLocked设置为0
    new := atomic.AddInt32(&m.state, -mutexLocked)
    //new!=0表示当前锁没有其他等待者,此时需要考虑唤醒其他等待中的`g`
  if new != 0 {
        m.unlockSlow(new)
    }
}
func (m *Mutex) unlockSlow(new int32) {

  //校验new的值,因为是解锁,所以new应该为非锁定状态
    if (new+mutexLocked)&mutexLocked == 0 {
        throw("sync: unlock of unlocked mutex")
    }

  //如果是非饥饿状态
    if new&mutexStarving == 0 {
        old := new
        for {
      //如果当前waiterNum为0,说明没有等待的waiter,直接返回
      //如果当前状态被锁定,说明可能被其他活跃的goroutine争抢到锁,直接返回
      //如果当前状态mutexWoken被标记,说明存在其他的活跃goroutine正在争抢锁,直接返回
      //如果当前状态被修改为饥饿模式,说明一定存在某个被唤醒的gorotuine,直接返回
            if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
                return
            }
            //减少等待者的数量,并设置Woken标记,准备唤醒休眠的goroutine
            new = (old - 1<<mutexWaiterShift) | mutexWoken
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
                //CAS成功,唤醒队列中的g
        runtime_Semrelease(&m.sema, false, 1)
                return
            }
            old = m.state
        }
    } else {
        //饥饿模式,直接按顺序唤醒gorout
        runtime_Semrelease(&m.sema, true, 1)
    }
}

总结

golangMutex代码通过CAS操作Mutex的一个属性,完全融合了锁定、自旋、饥饿三种功能,想要理解Mutex的源码,需要将其Lock()方法分为3个部分来分析;理解Mutex的原理,可以进一步帮忙我们实现更具有鲁棒性的代码,在下一篇中,我们将继续分析Mutex的核心:runtime_SemacquireMutexruntime_Semrelease