Golang的协程同步

问题背景

最近遇到了一个BUG,由于对Golang的同步模型理解不够深入所导致的资源泄露。

伪代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
var res Connection
done := make(chan bool)
go func(){
defer close(done)
res, err := acquire(200 * time.Millisecond) //计数器递增
if err != nil {
log.Printf("err: %v", err) //第7行,这里报错
return
}
doWork(res)
}()
select {
case <-done:
case <-time.After(300 * time.Millisecond):
}
release(res) //如果res不等于nil,计数器递减

咋一看来,代码没有逻辑问题。当时我在Review这份代码的时候,直观的感觉是写的不规范,把release和acquire分开到两个不同的routine,容易被遗漏,代码可读性不佳,也就是说代码写的不优雅。但是,同事说服了我,业务逻辑里需要将资源及时释放,因此有必要在超时的时候主动释放这个资源,所以不优雅这个小问题就是可以被容忍的。

这段代码上线后不久,我们就在业务日志里发现大量“资源耗尽”的错误日志,很容易就定位到这个错误是在代码里的第7行出现,由acquire()抛出的。我们维护了一个“资源计数器”,acquire的时候递增,而release的时候递减,计数器超过阈值的时候就会抛出“资源耗尽”的错误。

问题排查

资源耗尽?第一反应是我们的线上流量太大,导致资源使用量超过了计数器的阈值。我们通过netstat检查了连接信息,发现连接数不到个位数,远远未到超过阈值的程度。

接着,我们还是不死心,希望可以更透明地观察到程序内部的运行状态,确认计数器的状态。于是,我们引入exp/var包,这样就可以实时查看计数器,阈值,连接数, inflght request(正在处理的请求数). 结果发现,抛出错误的时候,计数器与阈值相等,inflght request与连接数差不多且远小于计数器/阈值。

到了这里,我们开始怀疑计数器的实现有问题,可能没有对并发做好控制。检查了acquire/release的代码,在读写计数器的时候都正确的加了锁,看起来无懈可击。

问题到底出在了哪里?我们陷入了苦恼中,直到我们发现其他指标也出现了异常。在我们的服务中,有个在最坏情况下不会超过150毫秒(因为有定时器控制)的延时指标,居然出现了延时达到1秒的情况。机器时钟坏掉或者golang的定时器出现了BUG这种小概率事件暂不考虑,对此比较合理的猜想是CPU过于繁忙,协程调度器的调度时间比较长,导致最后延时指标异常。那么回过头来看我们的代码,在调度异常的情况下会发生什么情况呢?release 可能会发生在 acquire 前,由于release只有在资源非空的情况下才会递减计数器,因此计数器就会在acquire里递增,重复出现几次这样的情况下,计数器就会达到阈值,抛出“资源耗尽”的错误。

为了维护计数器的正确,必须确保acquire发生在release之前,也就是所谓的happen-before。而我们的代码里是希望通过定时器的超时长短来确保happen-before,也就是说,超时短的事件一定会提前结束,发生在超时长的事件前。很遗憾,Golang的happen-before原则里不包含定时器超时事件,超时不能保证先后顺序。Golang的内存模型具体请参考Golang内存模型,简单概括如下:

  1. init 发生在 main 前,而且根据包的依赖顺序,被依赖方的init发生在依赖方的init前
  2. 对于有buffer的channel,send与close发生在receive前;无buffer的channel,receive发生在send前;没有对有buffer的channel的close事件做说明,待考证
  3. 对于锁,Unlock发生在Lock前

问题的修复

因此,要想确保协程间的同步顺序,只能依靠channel或者lock。

最后,我们对代码做了修复,计数器问题得到了解决:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
var res Connection
done := make(chan bool)
exit := make(chan bool)
go func(){
defer close(done)
res, err := acquire(200 * time.Millisecond) //计数器递增
If err != nil {
log.Printf("err: %v", err)
return
}
// here, setup cleaner
go func(res Connection){
<- exit
release(res)
}(res) //catch res
doWork(res)
}()
select {
case <-done:
case <-time.After(300 * time.Millisecond):
}
close(exit) //notify exit