1. WaitGroup
WaitGroup 等待一组 Goroutine 完成。主 Goroutine 调用 Add(delta) 来设置要等待的 Goroutine 的数量。然后每个 Goroutine 运行并在完成时调用 Done()。同时,可以使用 Wait() 来阻塞,直到所有 Goroutine 完成。
1type WaitGroup struct {
2 noCopy noCopy
3
4 // 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
5 // 64-bit atomic operations require 64-bit alignment, but 32-bit
6 // compilers do not ensure it. So we allocate 12 bytes and then use
7 // the aligned 8 bytes in them as state, and the other 4 as storage
8 // for the sema.
9 state1 [3]uint32
10}
- wg.Add(delta int):Add 将 delta(可能为负)添加到 WaitGroup 计数器。如果计数器变为 0,所有在 Wait 时阻塞的 Goroutine 将被释放。如果计数器变成负值,Add 会 panic。
- wg.Done():当 WaitGroup 同步等待组中的某个 Goroutine 执行完毕后,设置这个 WaitGroup 的 counter 数值减 1,其实就是调用了 Add(-1)。
- wg.Wait():表示让当前的 Goroutine 等待,进入阻塞状态。一直到 WaitGroup 的计数器为 0,才能解除阻塞,这个 Goroutine 才能继续执行。
总之,WaitGroup 让某个协程等待其它若干协程都先完成它们各自的任务。
1func main() {
2 const N = 5
3 var values [N]int64
4 var wg sync.WaitGroup
5 for i := 0; i < N; i++ {
6 i := i
7 wg.Add(1)
8 go func() {
9 values[i] = 50 + rand.Int63n(50)
10 fmt.Println("Done:", i)
11 wg.Done() // <=> wg.Add(-1)
12 }()
13 }
14
15 wg.Wait()
16 // 所有的元素都保证被初始化了。
17 fmt.Println("values:", values)
18}
2. Once
Once 是只执行一次的对象。Once 使用后不能复制。
1type Once struct {
2 // done indicates whether the action has been performed.
3 // It is first in the struct because it is used in the hot path.
4 // The hot path is inlined at every call site.
5 // Placing done first allows more compact instructions on some architectures (amd64/386),
6 // and fewer instructions (to calculate offset) on other architectures.
7 done uint32
8 m Mutex
9}
Once 只有一个 Do(f func())
方法,该方法只有一个类型为func()
的参数。
1func (o *Once) Do(f func()) {
2 if atomic.LoadUint32(&o.done) == 0 {
3 // Outlined slow-path to allow inlining of the fast-path.
4 o.doSlow(f)
5 }
6}
对一个可寻址的sync.Once
值 o,o.Do()
方法调用可以在多个协程中被多次并发地执行,方法调用的实参应该为同一个函数值。最终,有且只有一个调用的实参函数被调用。被调用的实参函数内的代码将在任何o.Do()
方法返回调用之前被执行。
简单来说,sync.Once
被用来确保一段代码在一个并发程序中被执行且仅被执行一次,常用于单例模式。
1func main() {
2 var once sync.Once
3 onceBody := func() {
4 fmt.Println("Only Once")
5 }
6 doneChan := make(chan bool)
7 for i := 0; i < 10; i++ {
8 go func() {
9 once.Do(onceBody)
10 doneChan <- true
11 }()
12 }
13
14 for i := 0; i < 10; i++ {
15 <- doneChan
16 }
17}
18
19// Only Once
3. Mutex
Go 语言包中的 sync 包提供了两种锁类型:sync.Mutex 和 sync.RWMutex。
Mutex 是最简单的一种锁类型 —— 互斥锁,同时也比较暴力,当一个 Goroutine 获得了 Mutex 后,其他 Goroutine 就只能等到该 Goroutine 释放 Mutex。
互斥锁是传统并发编程对共享资源进行访问控制的主要手段,它由标准库 sync 中的 Mutex 结构体类型表示。sync.Mutex 类型只有两个公开的指针方法:Lock 和 Unlock。
- Lock 锁定当前的共享资源
- Unlock 进行解锁
1// A Mutex is a mutual exclusion lock.
2// The zero value for a Mutex is an unlocked mutex.
3//
4// A Mutex must not be copied after first use.
5type Mutex struct {
6 state int32
7 sema uint32
8}
9
10// Lock locks m.
11// If the lock is already in use, the calling goroutine
12// blocks until the mutex is available.
13func (m *Mutex) Lock() {
14 // Fast path: grab unlocked mutex.
15 if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
16 if race.Enabled {
17 race.Acquire(unsafe.Pointer(m))
18 }
19 return
20 }
21 // Slow path (outlined so that the fast path can be inlined)
22 m.lockSlow()
23}
24
25// Unlock unlocks m.
26// It is a run-time error if m is not locked on entry to Unlock.
27//
28// A locked Mutex is not associated with a particular goroutine.
29// It is allowed for one goroutine to lock a Mutex and then
30// arrange for another goroutine to unlock it.
31func (m *Mutex) Unlock() {
32 if race.Enabled {
33 _ = m.state
34 race.Release(unsafe.Pointer(m))
35 }
36
37 // Fast path: drop lock bit.
38 new := atomic.AddInt32(&m.state, -mutexLocked)
39 if new != 0 {
40 // Outlined slow path to allow inlining the fast path.
41 // To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
42 m.unlockSlow(new)
43 }
44}
3.1 RWMutex
RWMutex 是读写锁,该锁可以由任意数量的读者或单个写者持有。RWMutex 的零值是未锁定的互斥锁。RWMutex 被第一次使用后不得复制。
如果一个 Goroutine 持有一个 RWMutex 的读锁并且另一个 Goroutine 可能会调用 Lock,那么在前面的读锁被释放之前,任何 Goroutine 都不应该期望能够获取读锁。
特别的,禁止递归占有读锁,这是为了确保锁最终会被释放;被阻塞的 Lock 调用会将新的读者排除在获取锁之外。
- rwm.RLock():上读锁
- rwm.RUnlock():解读锁
- rwm.Lock():上锁
- rwm.Unlock():解锁
总结为如下三条:
- 同时只能有一个 Goroutine 能够获得写锁
- 同时可以有任意多个 Gorouinte 获得读锁
- 同时只能存在写锁或读锁(读和写互斥)
因此,RWMutex 经常用于读远远多于写的场景。
4. Map
sync.Map 对于多个 Goroutine 并发使用是安全的,无需额外的锁定或协调。Load、Store 和 Delete 在分摊常数时间内运行。
sync.Map 针对两种常见用例进行了优化:
- 当给定 key 的条目只写入一次但读取多次
- 当多个 Goroutine 读取、写入和覆盖不相交的 key 集的条目
在这两种情况下,与使用单独的 Mutex 或 RWMutex 配对的 Go map 相比,使用 Map 可以显着减少锁的竞争。
1type Map struct {
2 mu Mutex
3 read atomic.Value // readOnly
4 dirty map[interface{}]*entry
5 misses int
6}
- m.Load(k) (v, ok):并发安全的 get。
- m.Store(k, v):并发安全的 put。
- m.Delete(k):并发安全的 delete。
- m.LoadAndDelete(k) (v, loaded):LoadAndDelete 删除键的值,如果之前有值,则返回之前的值,loaded 表示是否有之前的值。
- m.LoadAndStore(k, v) (actual, loaded):LoadOrStore 返回键的现有值(如果存在)。否则,它存储并返回给定的值。loaded 为 true 表示返回现有值,loaded 为 false 表示存储给定值。
- m.Range(func(k, v) bool):按照传入的 func 规则遍历 Map。
1func main() {
2 m := sync.Map{}
3 go func() {
4 m.Store(1,2)
5 fmt.Println(m.Load(1))
6 }()
7 go func() {
8 m.Store(1,2)
9 fmt.Println(m.Load(1))
10 }()
11 fmt.Println(m.Load(1))
12 time.Sleep(5 * time.Second)
13}