提到并发编程、多线程编程时,我们往往都离不开『锁』这一概念。锁其实是一种并发编程中的同步原语(Synchronization Primitives)。所以,在总结同步工具之前,咱们先了解一下什么叫同步原语。
一、同步原语(Synchronization Primitives)
Go 语言中常见的同步原语有Mutex、RWMutex、WaitGroup、Once 和 Cond,以及扩展原语 ErrGroup、Semaphore和 SingleFlight,如下图所示。简单理解,就是为了解决“竞态”问题,出现的一些同步方案。
Go 语言在 sync 包中提供了用于同步的一些基本原语,包括常见的互斥锁 Mutex 与读写互斥锁 RWMutex 以及 Once、WaitGroup。其中互斥锁 Mutex 与读写互斥锁 RWMutex 以及 Once已经在 共享变量实现并发 一节中详细讲解,这里就不再讲解了。我们从 WaitGroup开始。
二、WaitGroup
WaitGroup 是 Go 语言 sync 包中比较常见的同步机制,它可以用于等待一系列的 Goroutine 的返回,一个比较常见的使用场景是批量执行 RPC 或者调用外部服务:
requests := []*Request{...}
wg := &sync.WaitGroup{}
wg.Add(len(requests))
for _, request := range requests {
go func(r *Request) {
defer wg.Done()
// res, err := service.call(r)
}(request)
}
wg.Wait()
在上述代码中只有在所有的 Goroutine 都执行完毕之后 Wait 方法才会返回,程序可以继续执行其他的逻辑。
WaitGroup结构体
type WaitGroup struct {
noCopy noCopy
state1 [3]uint32
}
WaitGroup 结构体中的成员变量非常简单,其中的 noCopy 的主要作用就是保证 WaitGroup 不会被开发者通过再赋值的方式进行拷贝,进而导致一些诡异的行为。
state1是一个总共占用 12 字节大小的数组,这个数组中会存储当前结构体持有的状态和信号量,在 64 位与 32 位的机器上表现也非常不同。WaitGroup 提供了私有方法 state 能够帮助我们从 state1 字段中取出它的状态和信号量。
WaitGroup方法
WaitGroup 对外暴露的接口只有三个 Add、Wait 和 Done,其中 Done 方法只是调用了 wg.Add(-1) 本身并没有什么特殊的逻辑,我们来了解一下剩余的两个方法:
//Add方法
func (wg *WaitGroup) Add(delta int) {
statep, semap := wg.state()
state := atomic.AddUint64(statep, uint64(delta)<<32)
v := int32(state >> 32)
w := uint32(state)
if v < 0 {
panic("sync: negative WaitGroup counter")
}
if v > 0 || w == 0 {
return
}
*statep = 0
for ; w != 0; w-- {
runtime_Semrelease(semap, false, 0)
}
}
//Wait方法
func (wg *WaitGroup) Wait() {
statep, semap := wg.state()
for {
state := atomic.LoadUint64(statep)
v := int32(state >> 32)
if v == 0 {
return
}
if atomic.CompareAndSwapUint64(statep, state, state+1) {
runtime_Semacquire(semap)
if +statep != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
return
}
}
}
Add 方法的主要作用就是更新 WaitGroup 中持有的计数器 counter,64 位状态的高 32 位,虽然 Add 方法传入的参数可以为负数,但是一个 WaitGroup 的计数器只能是非负数,当调用 Add 方法导致计数器归零并且还有等待的 Goroutine 时,就会通过 runtime_Semrelease 唤醒处于等待状态的所有 Goroutine。方法 Wait 就会在当前计数器中保存的数据大于 0 时修改等待 Goroutine 的个数 waiter 并调用 runtime_Semacquire 陷入睡眠状态。陷入睡眠的 Goroutine 就会等待 Add 方法在计数器为 0 时唤醒。
WaitGroup小结
- Add 不能在和 Wait 方法在 Goroutine 中并发调用,一旦出现就会造成程序崩溃
- WaitGroup 必须在 Wait 方法返回之后才能被重新使用
- Done 只是对 Add 方法的简单封装,我们可以向 Add 方法传入任意负数(需要保证计数器非负)快速将计数器归零以唤醒其他等待的 Goroutine
- 可以同时有多个 Goroutine 等待当前 WaitGroup 计数器的归零,这些 Goroutine 也会被『同时』唤醒
三、Cond
Cond 其实是一个条件变量,通过 Cond 我们可以让一系列的 Goroutine 都在触发某个事件或者条件时才被唤醒,每一个 Cond 结构体都包含一个互斥锁 L,我们先来看一下 Cond 是如何使用的:
func main() {
c := sync.NewCond(&sync.Mutex{})
for i := 0; i < 10; i++ {
go listen(c)
}
go broadcast(c)
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt)
<-ch
}
func broadcast(c *sync.Cond) {
c.L.Lock()
c.Broadcast()
c.L.Unlock()
}
func listen(c *sync.Cond) {
c.L.Lock()
c.Wait()
fmt.Println("listen")
c.L.Unlock()
}
我们同时运行了 11 个 Goroutine,其中的 10 个 Goroutine 会通过 Wait 等待期望的信号或者事件,而剩下的一个 Goroutine 会调用 Broadcast 方法通知所有陷入等待的 Goroutine,当调用 Boardcast 方法之后,就会打印出 10 次 "listen" 并结束调用。
Cond 的结构体
Cond 的结构体中包含 noCopy 和 copyChecker 两个字段,前者用于保证 Cond 不会再编译期间拷贝,后者保证在运行期间发生拷贝会直接 panic,持有的另一个锁 L 其实是一个接口 Locker,任意实现 Lock 和 Unlock 方法的结构体都可以作为 NewCond 方法的参数:
type Cond struct {
noCopy noCopy
L Locker
notify notifyList
checker copyChecker
}
type notifyList struct {
wait uint32
notify uint32
lock mutex
head *sudog
tail *sudog
}
notifyList 是为了实现 Cond 同步机制,该结构体其实就是一个 Goroutine 的链表。在这个结构体中,head 和 tail 分别指向的就是整个链表的头和尾,而 wait 和 notify 分别表示当前正在等待的 Goroutine 和已经通知到的 Goroutine,我们通过这两个变量就能确认当前待通知和已通知的 Goroutine。
Cond 的方法
Cond 对外暴露的 Wait 方法会将当前 Goroutine 陷入休眠状态,它会先调用 runtime_notifyListAdd 将等待计数器 +1,然后解锁并调用 runtime_notifyListWait 等待其他 Goroutine 的唤醒。notifyListWait 方法的主要作用就是获取当前的 Goroutine 并将它追加到 notifyList 链表的最末端。
//Wait方法
func (c *Cond) Wait() {
c.checker.check()
t := runtime_notifyListAdd(&c.notify)
c.L.Unlock()
runtime_notifyListWait(&c.notify, t)
c.L.Lock()
}
func notifyListAdd(l *notifyList) uint32 {
return atomic.Xadd(&l.wait, 1) - 1
}
//notifyListWait方法
func notifyListWait(l *notifyList, t uint32) {
lock(&l.lock)
if less(t, l.notify) {
unlock(&l.lock)
return
}
s := acquireSudog()
s.g = getg()
s.ticket = t
if l.tail == nil {
l.head = s
} else {
l.tail.next = s
}
l.tail = s
goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
releaseSudog(s)
}
//Signal方法
func (c *Cond) Signal() {
c.checker.check()
runtime_notifyListNotifyOne(&c.notify)
}
//Broadcast方法
func (c *Cond) Broadcast() {
c.checker.check()
runtime_notifyListNotifyAll(&c.notify)
}
Cond 对外提供的 Signal 和 Broadcast 方法就是用来唤醒调用 Wait 陷入休眠的 Goroutine,从两个方法的名字来看,前者会唤醒队列最前面的 Goroutine,后者会唤醒队列中全部的 Goroutine(按照加入队列的先后顺序,先加入的会先被 goready 唤醒)。
需要特殊强调注意的是,Wait 方法在调用之前一定要使用 L.Lock 持有该资源,否则会发生 panic 导致程序崩溃。
四、扩展原语
除了这些标准库中提供的同步原语之外,Go 语言还在子仓库 x/sync 中提供了额外的四种同步原语,ErrGroup、Semaphore、SingleFlight 和 SyncMap,其中的 SyncMap 其实就是 sync 包中的 sync.Map,它在 1.9 版本的 Go 语言中被引入了 x/sync 包,随着 API 的成熟和稳定最后被移到了标准库 sync 包中。
- ErrGroup:为我们在一组 Goroutine 中提供了同步、错误传播以及上下文取消的功能,示例如下:
var g errgroup.Group
var urls = []string{
"http://www.golang.org/",
"http://www.google.com/",
"http://www.somestupidname.com/",
}
for i := range urls {
url := urls[i]
g.Go(func() error {
resp, err := http.Get(url)
if err == nil {
resp.Body.Close()
}
return err
})
}
if err := g.Wait(); err == nil {
fmt.Println("Successfully fetched all URLs.")
}
//使用 singleflight.Group{} 创建一个新的 Group 结构体
//然后通过调用 Do 方法就能对相同的请求进行抑制
type service struct {
requestGroup singleflight.Group
}
func (s *service) handleRequest(ctx context.Context, request Request) (Response, error) {
v, err, _ := requestGroup.Do(request.Hash(), func() (interface{}, error) {
rows, err := // select * from tables
if err != nil {
return nil, err
}
return rows, nil
})
if err != nil {
return nil, err
}
return Response{
rows: rows,
}, nil
}
以上就是本节所有的总结内容,你学会了吗?