同步工具

* 本页面主要介绍Go语言同步工具的相关内容。

提到并发编程、多线程编程时,我们往往都离不开『锁』这一概念。锁其实是一种并发编程中的同步原语(Synchronization Primitives)。所以,在总结同步工具之前,咱们先了解一下什么叫同步原语。

一、同步原语(Synchronization Primitives)

Go 语言中常见的同步原语有Mutex、RWMutex、WaitGroup、Once 和 Cond,以及扩展原语 ErrGroup、Semaphore和 SingleFlight,如下图所示。简单理解,就是为了解决“竞态”问题,出现的一些同步方案。

Go 语言中常见的同步原语
Go 语言中常见的同步原语

Go 语言在 sync 包中提供了用于同步的一些基本原语,包括常见的互斥锁 Mutex 与读写互斥锁 RWMutex 以及 Once、WaitGroup。其中互斥锁 Mutex 与读写互斥锁 RWMutex 以及 Once已经在 共享变量实现并发 一节中详细讲解,这里就不再讲解了。我们从 WaitGroup开始。

二、WaitGroup

WaitGroup 是 Go 语言 sync 包中比较常见的同步机制,它可以用于等待一系列的 Goroutine 的返回,一个比较常见的使用场景是批量执行 RPC 或者调用外部服务:

  requests := []*Request{...}
   
  wg := &sync.WaitGroup{}
  wg.Add(len(requests))
   
  for _, request := range requests {
      go func(r *Request) {
          defer wg.Done()
          
          // res, err := service.call(r)
      }(request)
  }
  wg.Wait() 

在上述代码中只有在所有的 Goroutine 都执行完毕之后 Wait 方法才会返回,程序可以继续执行其他的逻辑。

WaitGroup结构体
  type WaitGroup struct {
      noCopy noCopy
   
      state1 [3]uint32
  } 

WaitGroup 结构体中的成员变量非常简单,其中的 noCopy 的主要作用就是保证 WaitGroup 不会被开发者通过再赋值的方式进行拷贝,进而导致一些诡异的行为。

state1是一个总共占用 12 字节大小的数组,这个数组中会存储当前结构体持有的状态和信号量,在 64 位与 32 位的机器上表现也非常不同。WaitGroup 提供了私有方法 state 能够帮助我们从 state1 字段中取出它的状态和信号量。

WaitGroup方法

WaitGroup 对外暴露的接口只有三个 Add、Wait 和 Done,其中 Done 方法只是调用了 wg.Add(-1) 本身并没有什么特殊的逻辑,我们来了解一下剩余的两个方法:

  //Add方法
  func (wg *WaitGroup) Add(delta int) {
      statep, semap := wg.state()
      state := atomic.AddUint64(statep, uint64(delta)<<32)
      v := int32(state >> 32)
      w := uint32(state)
      if v < 0 {
          panic("sync: negative WaitGroup counter")
      }
      if v > 0 || w == 0 {
          return
      }
      *statep = 0
      for ; w != 0; w-- {
          runtime_Semrelease(semap, false, 0)
      }
  }

  //Wait方法
  func (wg *WaitGroup) Wait() {
      statep, semap := wg.state()
      for {
          state := atomic.LoadUint64(statep)
          v := int32(state >> 32)
          if v == 0 {
              return
          }
          if atomic.CompareAndSwapUint64(statep, state, state+1) {
              runtime_Semacquire(semap)
              if +statep != 0 {
                  panic("sync: WaitGroup is reused before previous Wait has returned")
              }
              return
          }
      }
  } 

Add 方法的主要作用就是更新 WaitGroup 中持有的计数器 counter,64 位状态的高 32 位,虽然 Add 方法传入的参数可以为负数,但是一个 WaitGroup 的计数器只能是非负数,当调用 Add 方法导致计数器归零并且还有等待的 Goroutine 时,就会通过 runtime_Semrelease 唤醒处于等待状态的所有 Goroutine。方法 Wait 就会在当前计数器中保存的数据大于 0 时修改等待 Goroutine 的个数 waiter 并调用 runtime_Semacquire 陷入睡眠状态。陷入睡眠的 Goroutine 就会等待 Add 方法在计数器为 0 时唤醒。

WaitGroup小结
  • Add 不能在和 Wait 方法在 Goroutine 中并发调用,一旦出现就会造成程序崩溃
  • WaitGroup 必须在 Wait 方法返回之后才能被重新使用
  • Done 只是对 Add 方法的简单封装,我们可以向 Add 方法传入任意负数(需要保证计数器非负)快速将计数器归零以唤醒其他等待的 Goroutine
  • 可以同时有多个 Goroutine 等待当前 WaitGroup 计数器的归零,这些 Goroutine 也会被『同时』唤醒
三、Cond

Cond 其实是一个条件变量,通过 Cond 我们可以让一系列的 Goroutine 都在触发某个事件或者条件时才被唤醒,每一个 Cond 结构体都包含一个互斥锁 L,我们先来看一下 Cond 是如何使用的:

  func main() {
      c := sync.NewCond(&sync.Mutex{})
   
      for i := 0; i < 10; i++ {
          go listen(c)
      }
   
      go broadcast(c)
   
      ch := make(chan os.Signal, 1)
      signal.Notify(ch, os.Interrupt)
      <-ch
  }
   
  func broadcast(c *sync.Cond) {
      c.L.Lock()
      c.Broadcast()
      c.L.Unlock()
  }
   
  func listen(c *sync.Cond) {
      c.L.Lock()
      c.Wait()
      fmt.Println("listen")
      c.L.Unlock()
  } 

我们同时运行了 11 个 Goroutine,其中的 10 个 Goroutine 会通过 Wait 等待期望的信号或者事件,而剩下的一个 Goroutine 会调用 Broadcast 方法通知所有陷入等待的 Goroutine,当调用 Boardcast 方法之后,就会打印出 10 次 "listen" 并结束调用。

Cond 的结构体

Cond 的结构体中包含 noCopy 和 copyChecker 两个字段,前者用于保证 Cond 不会再编译期间拷贝,后者保证在运行期间发生拷贝会直接 panic,持有的另一个锁 L 其实是一个接口 Locker,任意实现 Lock 和 Unlock 方法的结构体都可以作为 NewCond 方法的参数:

  type Cond struct {
      noCopy noCopy
   
      L Locker
   
      notify  notifyList
      checker copyChecker
  }

  type notifyList struct {
      wait uint32
      notify uint32
   
      lock mutex
      head *sudog
      tail *sudog
  } 

notifyList 是为了实现 Cond 同步机制,该结构体其实就是一个 Goroutine 的链表。在这个结构体中,head 和 tail 分别指向的就是整个链表的头和尾,而 wait 和 notify 分别表示当前正在等待的 Goroutine 和已经通知到的 Goroutine,我们通过这两个变量就能确认当前待通知和已通知的 Goroutine。

Cond 的方法

Cond 对外暴露的 Wait 方法会将当前 Goroutine 陷入休眠状态,它会先调用 runtime_notifyListAdd 将等待计数器 +1,然后解锁并调用 runtime_notifyListWait 等待其他 Goroutine 的唤醒。notifyListWait 方法的主要作用就是获取当前的 Goroutine 并将它追加到 notifyList 链表的最末端。

  //Wait方法
  func (c *Cond) Wait() {
      c.checker.check()
      t := runtime_notifyListAdd(&c.notify)
      c.L.Unlock()
      runtime_notifyListWait(&c.notify, t)
      c.L.Lock()
  }
   
  func notifyListAdd(l *notifyList) uint32 {
      return atomic.Xadd(&l.wait, 1) - 1
  }

  //notifyListWait方法
  func notifyListWait(l *notifyList, t uint32) {
      lock(&l.lock)
   
      if less(t, l.notify) {
          unlock(&l.lock)
          return
      }
   
      s := acquireSudog()
      s.g = getg()
      s.ticket = t
      if l.tail == nil {
          l.head = s
      } else {
          l.tail.next = s
      }
      l.tail = s
      goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
      releaseSudog(s)
  }

  //Signal方法
  func (c *Cond) Signal() {
      c.checker.check()
      runtime_notifyListNotifyOne(&c.notify)
  }
   
  //Broadcast方法
  func (c *Cond) Broadcast() {
      c.checker.check()
      runtime_notifyListNotifyAll(&c.notify)
  } 

Cond 对外提供的 Signal 和 Broadcast 方法就是用来唤醒调用 Wait 陷入休眠的 Goroutine,从两个方法的名字来看,前者会唤醒队列最前面的 Goroutine,后者会唤醒队列中全部的 Goroutine(按照加入队列的先后顺序,先加入的会先被 goready 唤醒)。

需要特殊强调注意的是,Wait 方法在调用之前一定要使用 L.Lock 持有该资源,否则会发生 panic 导致程序崩溃。

四、扩展原语

除了这些标准库中提供的同步原语之外,Go 语言还在子仓库 x/sync 中提供了额外的四种同步原语,ErrGroup、Semaphore、SingleFlight 和 SyncMap,其中的 SyncMap 其实就是 sync 包中的 sync.Map,它在 1.9 版本的 Go 语言中被引入了 x/sync 包,随着 API 的成熟和稳定最后被移到了标准库 sync 包中。

  • ErrGroup:为我们在一组 Goroutine 中提供了同步、错误传播以及上下文取消的功能,示例如下:
  •   var g errgroup.Group
      var urls = []string{
          "http://www.golang.org/",
          "http://www.google.com/",
          "http://www.somestupidname.com/",
      }
      for i := range urls {
          url := urls[i]
          g.Go(func() error {
              resp, err := http.Get(url)
              if err == nil {
                  resp.Body.Close()
              }
              return err
          })
      }
      if err := g.Wait(); err == nil {
          fmt.Println("Successfully fetched all URLs.")
      } 
  • Semaphore:带权重的信号量,我们可以按照不同的权重对资源的访问进行管理
  • SingleFlight:它能够在一个服务中抑制对下游的多次重复请求,一个比较常见的使用场景是 — 我们在使用 Redis 对数据库中的一些热门数据进行了缓存并设置了超时时间,缓存超时的一瞬间可能有非常多的并行请求发现了 Redis 中已经不包含任何缓存所以大量的流量会打到数据库上影响服务的延时和质量
  • Semaphore:带权重的信号量,我们可以按照不同的权重对资源的访问进行管理。它的主要作用就是对于同一个 Key 最终只会进行一次函数调用,在这个上下文中就是只会进行一次数据库查询,查询的结果会写回 Redis 并同步给所有请求对应 Key 的用户。使用示例如下:
  •   //使用 singleflight.Group{} 创建一个新的 Group 结构体
      //然后通过调用 Do 方法就能对相同的请求进行抑制
      type service struct {
          requestGroup singleflight.Group
      }
       
      func (s *service) handleRequest(ctx context.Context, request Request) (Response, error) {
          v, err, _ := requestGroup.Do(request.Hash(), func() (interface{}, error) {
              rows, err := // select * from tables
              if err != nil {
                  return nil, err
              }
              return rows, nil
          })
          if err != nil {
              return nil, err
          }
          
          return Response{
              rows: rows,
          }, nil
      } 

以上就是本节所有的总结内容,你学会了吗?


* 本页内容参考以下数据源:

  • 《Go程序设计语言》
  • https://blog.csdn.net/kevin_tech/article/details/105872864

凯冰科技 · 代码改变世界,技术改变生活
Next Page→