go踩坑合集

RWMutex RLock重入导致死锁

RWMutex,即读写锁,可以被多个的reader或一个writer获取使用。

死锁例子

在使用RWMutex的时候,同一个reader是不应该连续调用Rlock多次的,这样做不但没有意义,还有可能导致死锁,具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func main() {
var l = sync.RWMutex{}
var wg sync.WaitGroup
wg.Add(2)

c := make(chan int)
go func() {
l.RLock()
defer l.RUnlock()
c <- 1
runtime.Gosched()

l.RLock()
defer l.RUnlock()
wg.Done()
}()

go func() {
<-c
l.Lock()
defer l.Unlock()
wg.Done()
}()

wg.Wait()
}

sync.RWMutex分析

下面RWMutex的实现,我们来看这段代码的具体执行。为了方便理解,把if race.Enabled {...}的相关代码都去除了。

  1. goroutine 1l.RLock()

    1
    2
    3
    4
    5
    6
    func (rw *RWMutex) RLock() {
    if atomic.AddInt32(&rw.readerCount, 1) < 0 {
    // A writer is pending, wait for it.
    runtime_SemacquireMutex(&rw.readerSem, false, 0)
    }
    }

    执行l.RLock()后,goroutine 1获得写锁。

    • 状态:获得读锁
    • readerCount = 1,readerWait = 0
  2. goroutine 2l.Lock()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    func (rw *RWMutex) Lock() {
    // First, resolve competition with other writers.
    rw.w.Lock()
    // Announce to readers there is a pending writer.
    r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
    // Wait for active readers.
    if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
    runtime_SemacquireMutex(&rw.writerSem, false, 0)
    }
    }

    由于goroutine 1已获得写锁,此时goroutine 2等待。

    • 状态:等待reader释放读锁
    • readerCount = 1 - rwmutexMaxReaders,readerWait = 1
  3. goroutine 1l.RLock()

    1
    2
    3
    4
    5
    6
    func (rw *RWMutex) RLock() {
    if atomic.AddInt32(&rw.readerCount, 1) < 0 {
    // A writer is pending, wait for it.
    runtime_SemacquireMutex(&rw.readerSem, false, 0)
    }
    }

    goroutine 1发现readerCount为负,认为有writer获得了写锁,接着也进入了等待状态。

    • 状态:等待
    • readerCount = 2 - rwmutexMaxReaders,readerWait = 1

最后goroutine 1和goroutine 2都进入了等待状态。

总结

  1. readerCount的作用? 持有读锁的reader数。置为负时,代表了writer正在或者已经获得了读锁,此时其他reader不能再获得写锁。

  2. readerWait的作用,以及在Lock()中,为何需要同时判断r != 0atomic.AddInt32(&rw.readerWait, r) != 0? 置readerCount为负的时候,获得了写锁,但尚未RULock的reader数。writer需要等待这些reader执行结束。

    • r == 0,则无正在持有读锁的reader,可以直接完成读锁的加锁。
    • r != 0,writer需要等待获得了写锁,但尚未RULock的reader执行结束。如何判断是否需要等待呢,即readerWait不为0的时候。同时reader决定是否能唤醒writer,也需要等到readerWait为0的时候。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    func (rw *RWMutex) Lock() {
    rw.w.Lock()
    r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
    // 此刻起,其他reader不再能够获得读锁。
    // 此时,尚未释放写锁的reader数为readerWait个,等待他们结束才能完成读锁的加锁。
    if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
    runtime_SemacquireMutex(&rw.writerSem, false)
    }
    }

sync.WaitGroup使用的问题

例子

在实现一个需求的时候,需要等待一定数目的go协程执行完毕,但这个数目事先并不好确定。想到了可以用sync.WaitGroup来完成,在使用时候发现,Wait()没有生效,并未等待协程结束,代码大致如下,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func main() {
wg := sync.WaitGroup{}
wg.Add(1)
ch := make(chan struct{})

f := func(){
wg.Add(1)
defer wg.Done()
select {
case <- ch:
fmt.Println("hi")
}
}

go func() {
defer wg.Done()
go f()
go f()
go f()
}()
close(ch)
wg.Wait()
}

执行后程序不会有任何的输出就退出了。

sync.WaitGroup源码分析

Typically this means the calls to Add should execute before the statement creating the goroutine or other event to be waited for.

总结

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func main() {
wg := sync.WaitGroup{}
wg.Add(1)
ch := make(chan struct{})

f := func(){
defer wg.Done()
select {
case <- ch:
fmt.Println("hi")
}
}

go func() {
defer wg.Done()
wg.Add(1)
go f()
wg.Add(1)
go f()
wg.Add(1)
go f()
}()
close(ch)
wg.Wait()
}

References

  1. Waiting on an indeterminate number of goroutines

goroutine

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
func process(ctx context.Context, wg *sync.WaitGroup, retCh chan int) {
defer wg.Done()
for {
time.Sleep(time.Second*1)
// process
// send process result back
select {
case <-ctx.Done():
retCh <- 1
retCh <- 2
return
default:
retCh <- 1
retCh <- 2
}
}
}

func loop(ctx context.Context, wg *sync.WaitGroup, retCh chan int) {
defer wg.Done()
wg.Add(1)
go process(ctx, wg, retCh)
for {
select {
case <-ctx.Done():
return
case ret := <-retCh:
fmt.Println(ret)

}
}
}

func main() {
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
retCh := make(chan int, 1)
wg.Add(1)
go loop(ctx, wg, retCh)

time.Sleep(time.Second*5)
cancel()
wg.Wait()
}