1. WaitGroup

WaitGroup 等待一组 Goroutine 完成。主 Goroutine 调用 Add(delta) 来设置要等待的 Goroutine 的数量。然后每个 Goroutine 运行并在完成时调用 Done()。同时,可以使用 Wait() 来阻塞,直到所有 Goroutine 完成。

 1type WaitGroup struct {
 2	noCopy noCopy
 3
 4	// 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
 5	// 64-bit atomic operations require 64-bit alignment, but 32-bit
 6	// compilers do not ensure it. So we allocate 12 bytes and then use
 7	// the aligned 8 bytes in them as state, and the other 4 as storage
 8	// for the sema.
 9	state1 [3]uint32
10}
  • wg.Add(delta int):Add 将 delta(可能为负)添加到 WaitGroup 计数器。如果计数器变为 0,所有在 Wait 时阻塞的 Goroutine 将被释放。如果计数器变成负值,Add 会 panic。
  • wg.Done():当 WaitGroup 同步等待组中的某个 Goroutine 执行完毕后,设置这个 WaitGroup 的 counter 数值减 1,其实就是调用了 Add(-1)。
  • wg.Wait():表示让当前的 Goroutine 等待,进入阻塞状态。一直到 WaitGroup 的计数器为 0,才能解除阻塞,这个 Goroutine 才能继续执行。

总之,WaitGroup 让某个协程等待其它若干协程都先完成它们各自的任务

 1func main() {
 2	const N = 5
 3	var values [N]int64
 4	var wg sync.WaitGroup
 5	for i := 0; i < N; i++ {
 6		i := i
 7		wg.Add(1)
 8		go func() {
 9			values[i] = 50 + rand.Int63n(50)
10			fmt.Println("Done:", i)
11			wg.Done() // <=> wg.Add(-1)
12		}()
13	}
14
15	wg.Wait()
16	// 所有的元素都保证被初始化了。
17	fmt.Println("values:", values)
18}

2. Once

Once 是只执行一次的对象。Once 使用后不能复制。

1type Once struct {
2   // done indicates whether the action has been performed.
3   // It is first in the struct because it is used in the hot path.
4   // The hot path is inlined at every call site.
5   // Placing done first allows more compact instructions on some architectures (amd64/386),
6   // and fewer instructions (to calculate offset) on other architectures.
7   done uint32
8   m    Mutex
9}

Once 只有一个 Do(f func()) 方法,该方法只有一个类型为func()的参数。

1func (o *Once) Do(f func()) {
2	if atomic.LoadUint32(&o.done) == 0 {
3		// Outlined slow-path to allow inlining of the fast-path.
4		o.doSlow(f)
5	}
6}

对一个可寻址的sync.Once值 o,o.Do() 方法调用可以在多个协程中被多次并发地执行,方法调用的实参应该为同一个函数值。最终,有且只有一个调用的实参函数被调用。被调用的实参函数内的代码将在任何o.Do()方法返回调用之前被执行。

简单来说,sync.Once被用来确保一段代码在一个并发程序中被执行且仅被执行一次,常用于单例模式。

 1func main() {
 2	var once sync.Once
 3	onceBody := func() {
 4		fmt.Println("Only Once")
 5	}
 6	doneChan := make(chan bool)
 7	for i := 0; i < 10; i++ {
 8		go func() {
 9			once.Do(onceBody)
10			doneChan <- true
11		}()
12	}
13
14	for i := 0; i < 10; i++ {
15		<- doneChan
16	}
17}
18
19// Only Once

3. Mutex

Go 语言包中的 sync 包提供了两种锁类型:sync.Mutex 和 sync.RWMutex。

Mutex 是最简单的一种锁类型 —— 互斥锁,同时也比较暴力,当一个 Goroutine 获得了 Mutex 后,其他 Goroutine 就只能等到该 Goroutine 释放 Mutex。

互斥锁是传统并发编程对共享资源进行访问控制的主要手段,它由标准库 sync 中的 Mutex 结构体类型表示。sync.Mutex 类型只有两个公开的指针方法:Lock 和 Unlock。

  • Lock 锁定当前的共享资源
  • Unlock 进行解锁
 1// A Mutex is a mutual exclusion lock.
 2// The zero value for a Mutex is an unlocked mutex.
 3//
 4// A Mutex must not be copied after first use.
 5type Mutex struct {
 6   state int32
 7   sema  uint32
 8}
 9
10// Lock locks m.
11// If the lock is already in use, the calling goroutine
12// blocks until the mutex is available.
13func (m *Mutex) Lock() {
14	// Fast path: grab unlocked mutex.
15	if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
16		if race.Enabled {
17			race.Acquire(unsafe.Pointer(m))
18		}
19		return
20	}
21	// Slow path (outlined so that the fast path can be inlined)
22	m.lockSlow()
23}
24
25// Unlock unlocks m.
26// It is a run-time error if m is not locked on entry to Unlock.
27//
28// A locked Mutex is not associated with a particular goroutine.
29// It is allowed for one goroutine to lock a Mutex and then
30// arrange for another goroutine to unlock it.
31func (m *Mutex) Unlock() {
32	if race.Enabled {
33		_ = m.state
34		race.Release(unsafe.Pointer(m))
35	}
36
37	// Fast path: drop lock bit.
38	new := atomic.AddInt32(&m.state, -mutexLocked)
39	if new != 0 {
40		// Outlined slow path to allow inlining the fast path.
41		// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
42		m.unlockSlow(new)
43	}
44}

3.1 RWMutex

RWMutex 是读写锁,该锁可以由任意数量的读者或单个写者持有。RWMutex 的零值是未锁定的互斥锁。RWMutex 被第一次使用后不得复制。

如果一个 Goroutine 持有一个 RWMutex 的读锁并且另一个 Goroutine 可能会调用 Lock,那么在前面的读锁被释放之前,任何 Goroutine 都不应该期望能够获取读锁。

特别的,禁止递归占有读锁,这是为了确保锁最终会被释放;被阻塞的 Lock 调用会将新的读者排除在获取锁之外。

  • rwm.RLock():上读锁
  • rwm.RUnlock():解读锁
  • rwm.Lock():上锁
  • rwm.Unlock():解锁

总结为如下三条:

  1. 同时只能有一个 Goroutine 能够获得写锁
  2. 同时可以有任意多个 Gorouinte 获得读锁
  3. 同时只能存在写锁或读锁(读和写互斥)

因此,RWMutex 经常用于读远远多于写的场景。

4. Map

sync.Map 对于多个 Goroutine 并发使用是安全的,无需额外的锁定或协调。Load、Store 和 Delete 在分摊常数时间内运行。

sync.Map 针对两种常见用例进行了优化:

  1. 当给定 key 的条目只写入一次但读取多次
  2. 当多个 Goroutine 读取、写入和覆盖不相交的 key 集的条目

在这两种情况下,与使用单独的 Mutex 或 RWMutex 配对的 Go map 相比,使用 Map 可以显着减少锁的竞争。

1type Map struct {
2	mu Mutex
3	read atomic.Value // readOnly
4	dirty map[interface{}]*entry
5	misses int
6}
  • m.Load(k) (v, ok):并发安全的 get。
  • m.Store(k, v):并发安全的 put。
  • m.Delete(k):并发安全的 delete。
  • m.LoadAndDelete(k) (v, loaded):LoadAndDelete 删除键的值,如果之前有值,则返回之前的值,loaded 表示是否有之前的值。
  • m.LoadAndStore(k, v) (actual, loaded):LoadOrStore 返回键的现有值(如果存在)。否则,它存储并返回给定的值。loaded 为 true 表示返回现有值,loaded 为 false 表示存储给定值。
  • m.Range(func(k, v) bool):按照传入的 func 规则遍历 Map。
 1func main() {
 2	m := sync.Map{}
 3	go func() {
 4		m.Store(1,2)
 5		fmt.Println(m.Load(1))
 6	}()
 7	go func() {
 8		m.Store(1,2)
 9		fmt.Println(m.Load(1))
10	}()
11	fmt.Println(m.Load(1))
12	time.Sleep(5 * time.Second)
13}

参考资料

  1. https://hedon954.github.io/noteSite/backend/golang/standard_packages/sync.html