1. future/promise
1.1 单向接收 Channel 作为函数返回
sumSquares
函数调用的两个实参请求并发进行。每个通道读取操作将阻塞到请求返回结果为止。
1func longTimeRequest() <-chan int64 {
2 r := make(chan int64)
3
4 go func() {
5 time.Sleep(time.Second * 3) // 模拟一个工作负载
6 r <- rand.Int63n(100)
7 }()
8
9 return r
10}
11
12func sumSquares(a, b int64) int64 {
13 return a*a + b*b
14}
15
16func main() {
17
18 a, b := longTimeRequest(), longTimeRequest()
19 fmt.Println(sumSquares(<-a, <-b))
20}
1.2 单向发送 Channel 作为函数实参
sumSquares
函数调用的两个实参的请求也是并发进行的。和上例不同的是longTimeRequest
函数接收一个单向发送通道类型参数而不是返回一个单向接收通道结果。
1func longTimeRequest(r chan<- int64) {
2 time.Sleep(time.Second * 3) // 模拟一个工作负载
3 r <- rand.Int63n(100)
4}
5
6func sumSquares(a, b int64) int64 {
7 return a*a + b*b
8}
9
10func main() {
11
12 ra, rb := make(chan int64), make(chan int64)
13 go longTimeRequest(ra)
14 go longTimeRequest(rb)
15
16 fmt.Println(sumSquares(<-ra, <-rb))
17}
特别的,我们也可以只使用一个通道来接收回应结果,因为两个参数的作用是对等的。
1...
2 results := make(chan int32, 2) // 缓冲与否不重要
3 go longTimeRequest(results)
4 go longTimeRequest(results)
5
6 fmt.Println(sumSquares(<-results, <-results))
7}
1.3 采用最快回应
本例是上例中只使用一个通道变种的增强。
有时候,一份数据可能同时从多个数据源获取,这些数据源将返回相同的数据,因为各种因素,这些数据源的回应速度参差不一,甚至某个特定数据源的多次回应速度之间也可能相差很大。
同时从多个数据源获取一份相同的数据可以有效保障低延迟。我们只需采用最快的回应并舍弃其它较慢回应。
若有 N 个数据源,为了防止被舍弃的回应对应的协程永久阻塞,则传输数据用的通道必须为一个容量至少为 N-1 的缓冲通道。
1func source(c chan<- int32) {
2 ra, rb := rand.Int31(), rand.Intn(3)+1
3 // 随机睡眠1-3秒
4 time.Sleep(time.Duration(rb) * time.Second)
5 c <- ra
6}
7
8func main() {
9 startTime := time.Now()
10 c := make(chan int32, 5) // 必须用一个缓冲通道
11 for i := 0; i < cap(c); i++ {
12 go source(c)
13 }
14 rnd := <-c // 只有第一个回应被使用了
15 fmt.Println(time.Since(startTime))
16 fmt.Println(rnd)
17 // 1.000202388s
18 // 839171462
19}
2. 通知
通知,是一种特殊的请求/回应用例。
在一个通知用例中,我们并不关心回应的值,我们只关心是否回应。因此,空结构体类型 struct{}
常被做为通道的元素类型,因为空结构体类型的尺寸为零,有效节省内存。
2.1 单对单通知
如果一个通道中无值可接收,则此通道上的下一个接收操作将阻塞,直到另一个协程发送值到此通道为止。
所以,一个协程可以向此通道发送一个值来通知另一个等待着从此通道接收数据的协程。
在下面这个例子中,通道done
被用来做为一个信号通道来实现单对单通知。
1func main() {
2 values := make([]byte, 32*1024*1024)
3 if _, err := rand.Read(values); err != nil {
4 fmt.Println(err)
5 os.Exit(1)
6 }
7
8 done := make(chan struct{}) // 也可以是缓冲的
9
10 // 排序协程
11 go func() {
12 sort.Slice(values, func(i, j int) bool {
13 return values[i] < values[j]
14 })
15 done <- struct{}{} // 通知排序已完成
16 }()
17
18 // 并发地做一些其它事情...
19
20 <-done // 等待通知
21 fmt.Println(values[0], values[len(values)-1])
22}
2.2 多对单通知
在实践中,常使用 sync.WaitGroup
来实现多对单通知
2.3 单对多/多对多通知
从一个已关闭的通道可以接收到无穷个值,我们可以利用这一特性,来使用关闭一个通道的方式实现单对多通知群发通知。
以下代码展示了多对多和单对多的通知。
1
2var wg sync.WaitGroup
3
4func worker(id int, ready <-chan struct{}) {
5 <-ready // 阻塞在此,等待通知
6 log.Print("Worker#", id, "开始工作")
7 time.Sleep(time.Second * time.Duration(id+1))
8 log.Print("Worker#", id, "工作完成")
9 wg.Done()
10}
11
12func main() {
13 log.SetFlags(0)
14
15 ready := make(chan struct{})
16 for i := 0; i <= 2; i++ {
17 wg.Add(1)
18 go worker(i, ready)
19 }
20
21 // 模拟初始化过程
22 time.Sleep(time.Second * 3)
23
24 // 单对多通知
25 close(ready)
26
27 // 等待被多对单通知
28 wg.Wait()
29}
3. 限制协程并发数量
1func main() {
2 var wg sync.WaitGroup
3 ch := make(chan struct{}, 10)
4
5 for i := 0; i < 100; i++ {
6 wg.Add(1)
7 ch <- struct{}{}
8 go func(i int) {
9 defer wg.Done()
10 log.Println(i)
11 time.Sleep(time.Second * 2)
12 <-ch
13 }(i)
14 }
15
16 wg.Wait()
17}
4. 使用 Channel 传送 Channel
一个通道类型的元素类型可以是另一个通道类型。
在下面这个例子中, 单向发送通道类型chan<- int
是另一个通道类型chan chan<- int
的元素类型。
1var counter = func(n int) chan<- chan<- int {
2 requests := make(chan chan<- int)
3 go func() {
4 for request := range requests {
5 if request == nil {
6 n++ // 递增计数
7 } else {
8 request <- n // 返回当前计数
9 }
10 }
11 }()
12 return requests // 隐式转换到类型chan<- (chan<- int)
13}(0)
14
15func main() {
16 increase1000 := func(done chan<- struct{}) {
17 for i := 0; i < 1000; i++ {
18 counter <- nil
19 }
20 done <- struct{}{}
21 }
22
23 done := make(chan struct{})
24 go increase1000(done)
25 go increase1000(done)
26 <-done
27 <-done
28
29 request := make(chan int, 1)
30 counter <- request
31 fmt.Println(<-request) // 2000
32}
5. 使当前协程永久阻塞
一个无分支的 select
流程控制代码块使当前协程永久处于阻塞状态。这是select
流程控制的最简单的应用。一般,select{}
用在主协程中以防止程序退出。
1func DoSomething() {
2 for {
3 // 做点什么...
4 fmt.Println(rand.Int31())
5 time.Sleep(time.Second)
6 runtime.Gosched() // 防止本协程霸占CPU不放
7 }
8}
9
10func main() {
11 go DoSomething()
12 go DoSomething()
13 select {}
14}
6. 尝试发送与接收
含有一个default
分支和一个case
分支的select
代码块可以被用做一个尝试发送或者尝试接收操作,取决于case
关键字后跟随的是一个发送操作还是一个接收操作。
- 如果
case
关键字后跟随的是一个发送操作,则此select
代码块为一个尝试发送操作。 如果case
分支的发送操作是阻塞的,则default
分支将被执行,发送失败;否则发送成功,case
分支得到执行。 - 如果
case
关键字后跟随的是一个接收操作,则此select
代码块为一个尝试接收操作。 如果case
分支的接收操作是阻塞的,则default
分支将被执行,接收失败;否则接收成功,case
分支得到执行。
尝试发送和尝试接收代码块永不阻塞。
标准编译器对尝试发送和尝试接收代码块做了特别的优化,使得它们的执行效率比多case
分支的普通select
代码块执行效率高得多。
1type Book struct {
2 id int
3}
4
5func main() {
6
7 bookshelf := make(chan Book, 3)
8
9 for i := 0; i < cap(bookshelf)*2; i++ {
10 select {
11 case bookshelf <- Book{id: i}:
12 fmt.Println("成功将书放在书架上", i)
13 default:
14 fmt.Println("书架已经被占满了")
15 }
16 }
17
18 for i := 0; i < cap(bookshelf)*2; i++ {
19 select {
20 case book := <-bookshelf:
21 fmt.Println("成功从书架上取下一本书", book.id)
22 default:
23 fmt.Println("书架上已经没有书了")
24 }
25 }
26
27}
7. 无阻塞地检查 Channel 是否已经关闭
假设,我们可以保证没有任何协程会向一个通道发送数据,则我们可以使用下面的代码来(并发安全地)检查此通道是否已经关闭,此检查不会阻塞当前协程。
1func IsClosed(c chan T) bool {
2 select {
3 case <-c:
4 return true
5 default:
6 }
7 return false
8}
此方法常用来查看某个期待中的通知是否已经来临。此通知将由另一个协程通过关闭一个通道来发送。
8. 超时机制(timeout)
在一些请求/回应用例中,一个请求可能因为种种原因导致需要超出预期的时长才能得到回应,有时甚至永远得不到回应。
对于这样的情形,我们可以使用一个超时方案给请求者返回一个错误信息。使用 select
可以很轻松地实现这样的一个超时方案。
1func requestWithTimeout(timeout time.Duration) (int, error) {
2 c := make(chan int)
3 go doRequest(c) // 可能需要超出预期的时长回应
4
5 select {
6 case data := <-c:
7 return data, nil
8 case <-time.After(timeout):
9 return 0, errors.New("超时了!")
10 }
11}
8.1 速率限制
速率限制常用来限制吞吐和确保在一段时间内的资源使用不会超标。
下面的例子中,任何一分钟时段内处理的请求数不会超过200。
也就是漏桶算法,但漏桶算法无法处理峰值情况。
1type Request interface{}
2
3func handle(r Request) {
4 fmt.Println(r.(int))
5}
6
7const RateLimitPeriod = time.Minute
8const RateLimit = 5 // 任何一分钟内最多处理200个请求
9
10func handleRequests(requests <-chan Request) {
11 quotas := make(chan time.Time, RateLimit)
12
13 go func() {
14 tick := time.NewTicker(RateLimitPeriod / RateLimit)
15 defer tick.Stop()
16 for t := range tick.C {
17 select {
18 case quotas <- t:
19 default:
20 }
21 }
22 }()
23
24 for r := range requests {
25 <-quotas
26 go handle(r)
27 }
28}
29
30func main() {
31 requests := make(chan Request)
32 go handleRequests(requests)
33 // time.Sleep(time.Minute)
34 for i := 0; ; i++ {
35 requests <- i
36 }
37}