RWmutex:站在巨人肩膀上跳舞

我们在使用mutex 的过程中,不管是读还是写,都通过 Mutex 来保证只有一个 goroutine 访问共享资源,这在某些情况下有点“浪费”。比如说,在写少读多的情况下,即使一段时间内没有写操作,大量并发的读访问也不得不在 Mutex 的保护下变成了串行访问,这个时候使用 Mutex,对性能的影响就比较大。 RWmutex 就是为了优化这问题而实现的

如果您没有阅读过 mutex 文章,请先阅读我的这篇 关于mutex 的文章,RWmutex 是基于mutex 实现的,就像站在巨人肩膀上跳舞。
https://medium.com/gitconnected/go-source-code-sync-mutex-3082a25ef092

什么是 RWMutex

简单解释一下读写锁 RWMutex。标准库中的 RWMutex 是一个 reader/writer 互斥锁。RWMutex 在某一时刻只能由任意数量的 reader 持有,或者是只被单个的 writer 持有。这就是 readers-writers 问题, readers-writers 问题一般有三类,基于对读和写操作的优先级,读写锁的设计和实现也分成三类。

  • Read-preferring:读优先的设计可以提供很高的并发性,但是,在竞争激烈的情况下可能会导致写饥饿。这是因为,如果有大量的读,这种设计会导致只有所有的读都释放了锁之后,写才可能获取到锁。
  • Write-preferring:写优先的设计意味着,如果已经有一个 writer 在等待请求锁的话,它会阻止新来的请求锁的 reader 获取到锁,所以优先保障 writer。当然,如果有一些 reader 已经请求了锁的话,新请求的 writer 也会等待已经存在的 reader 都释放锁之后才能获取。所以,写优先级设计中的优先权是针对新来的请求而言的。这种设计主要避免了 writer 的饥饿问题。
  • 不指定优先级:这种设计比较简单,不区分 reader 和 writer 优先级,某些场景下这种不指定优先级的设计反而更有效,因为第一类优先级会导致写饥饿,第二类优先级可能会导致读饥饿,这种不指定优先级的访问不再区分读写,大家都是同一个优先级,解决了饥饿的问题。
    Go 标准库中的 RWMutex 设计是 Write-preferring 方案。一个正在阻塞的 Lock 调用会排除新的 reader 请求到锁。

RWMutex 的实现

本文基于go 1.21.4

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
type RWMutex struct {  
w Mutex // held if there are pending writers
writerSem uint32 // semaphore for writers to wait for completing readers
readerSem uint32 // semaphore for readers to wait for completing writers
readerCount atomic.Int32 // number of pending readers
readerWait atomic.Int32 // number of departing readers
}
const rwmutexMaxReaders = 1 << 30
//
func (rw *RWMutex) Lock(){}
func (rw *RWMutex) Unlock() {}
func (rw *RWMutex) RLock(){}
func (rw *RWMutex) RUnlock(){}
func (rw *RWMutex) TryLock() bool{}
func (rw *RWMutex) TryRLock() bool{}
func (rw *RWMutex) RLocker() Locker{}
  • 字段 readerCount:记录当前 reader 的数量(以及是否有 writer 竞争锁);
  • readerWait:记录 writer 请求锁时需要等待 read 完成的 reader 的数量;

RLock/RUnlock 的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (rw *RWMutex) RLock() {
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// rw.readerCount 是负值的时候,意味着此时有writer等待请求锁,因为writer优先级高,所以把后来的reader阻塞休眠,
runtime_SemacquireMutex(&rw.readerSem, false, 0)
}
}
func (rw *RWMutex) RUnlock() {
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
rw.rUnlockSlow(r) // 有等待的writer
}
}
func (rw *RWMutex) rUnlockSlow(r int32) {
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// 最后一个reader了,writer终于有机会获得锁了
runtime_Semrelease(&rw.writerSem, false, 1)
}
}

第 2 行是对 reader 计数加 1。
readerCount 这个字段有双重含义:没有 writer 竞争或持有锁时,readerCount 和我们正常理解的 reader 的计数是一样的;
如果有 writer 竞争锁或者持有锁时,那么,readerCount 不仅仅承担着 reader 的计数功能,还能够标识当前是否有 writer 竞争或持有锁(后面会说明),在这种情况下,reader会阻塞等待writer 唤醒(第四行)。
调用 RUnlock 的时候,我们需要将 Reader 的计数减去 1(第 8 行),因为 reader 的数量减少了一个。但是,第 8 行的 AddInt32 的返回值还有另外一个含义。如果它是负值,就表示当前有 writer 竞争锁,在这种情况下,还会调用 rUnlockSlow 方法,检查是不是 reader 都释放读锁了,如果读锁都释放了,那么可以唤醒请求写锁的 writer 了。当一个或者多个 reader 持有锁的时候,竞争锁的 writer 会等待这些 reader 释放完,才可能持有这把锁

LOCK

1
2
3
4
5
6
7
8
9
10
func (rw *RWMutex) Lock() {
// 首先解决其他writer竞争问题
rw.w.Lock()
// 反转readerCount,告诉reader有writer竞争锁
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// 如果当前有reader持有锁,那么需要等待
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
}

RWMutex 是一个多 writer 多 reader 的读写锁,所以同时可能有多个 writer 和 reader。那么,为了避免 writer 之间的竞争,RWMutex 就会使用一个 Mutex 来保证 writer 的互斥。一旦一个 writer 获得了内部的互斥锁,就会反转 readerCount 字段,把它从原来的正整数 readerCount(>=0) 修改为负数(readerCount-rwmutexMaxReaders),让这个字段保持两个含义(既保存了 reader 的数量,又表示当前有 writer)。我们来看下下面的代码。第 5 行,还会记录当前活跃的 reader 数量,所谓活跃的 reader,就是指持有读锁还没有释放的那些 reader。

如果 readerCount 不是 0,就说明当前有持有读锁的 reader,RWMutex 需要把这个当前 readerCount 赋值给 readerWait 字段保存下来(第 7 行), 同时,这个 writer 进入阻塞等待状态(第 8 行)。
每当一个 reader 释放读锁的时候(调用 RUnlock 方法时),readerWait 字段就减 1,直到所有的活跃的 reader 都释放了读锁,才会唤醒这个 writer。

Unlock

1
2
3
4
5
6
7
8
9
10
11
func (rw *RWMutex) Unlock() {
// 告诉reader没有活跃的writer了
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)

// 唤醒阻塞的reader们
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
// 释放内部的互斥锁
rw.w.Unlock()
}

当一个 writer 释放锁的时候,它会再次反转 readerCount 字段。(由负数修改为正数),然后唤醒阻塞的reader。在 RWMutex 的 Unlock 返回之前,需要把内部的互斥锁释放。释放完毕后,其他的 writer 才可以继续竞争这把锁。

TryLock & TryRLock

TryRLock 和 TryLock 的实现都很简单,都是尝试获取读锁或者写锁,如果获取不到就返回 false,获取到了就返回 true,这两个方法不会阻塞等待。直接看源代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (rw *RWMutex) TryLock() bool {  
if !rw.w.TryLock() { // 有 goroutine 持有写锁
return false
}
// 读锁被占用
if !rw.readerCount.CompareAndSwap(0, -rwmutexMaxReaders) {
rw.w.Unlock()
return false
}
return true
}

// TryRLock 尝试锁定 rw 以进行读取,并报告是否成功。
func (rw *RWMutex) TryRLock() bool {
for {
c := rw.readerCount.Load()
// 有 goroutine 持有写锁
if c < 0 {
return false
}
// 尝试获取读锁
if rw.readerCount.CompareAndSwap(c, c+1) {
return true
}
}
}

争议点

Mutex那些争议点,在RWMutex 这里也存在,比如不可重入,不可复制,写代码的时候需要仔细考虑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// 一个RWmutex 错误的使用例子
func main() {
var mu sync.RWMutex

// writer,稍微等待,然后制造一个调用Lock的场景
go func() {
time.Sleep(200 * time.Millisecond)
mu.Lock()
fmt.Println("Lock")
time.Sleep(100 * time.Millisecond)
mu.Unlock()
fmt.Println("Unlock")
}()

go func() {
factorial(&mu, 10) // 计算10的阶乘, 10!
}()

select {}
}

// 递归调用计算阶乘
func factorial(m *sync.RWMutex, n int) int {
if n < 1 { // 阶乘退出条件
return 0
}
fmt.Println("RLock")
m.RLock()
defer func() {
fmt.Println("RUnlock")
m.RUnlock()
}()
time.Sleep(100 * time.Millisecond)
return factorial(m, n-1) * n // 递归调用
}

总结

  1. RWMutex 是基于Mutex 实现的。
  2. 在读多写少的场景,我们可以使用RWMutex 替代Mutex 提高性能。
  3. 同一时间内,可以有多个reader通过RLock()持有RWMutex,
  4. 当一个wirter 通过Lock()加锁时,会先阻止新的reader 加锁,然后等待所有的所有的reader释放后再加锁。
  5. writer 通过ULock()释放锁时,会唤醒等待的reader。