并发过高导致程序崩溃
1func main() {
2 var wg sync.WaitGroup
3 for i := 0; i < math.MaxInt32; i++ {
4 wg.Add(1)
5 go func(i int) {
6 defer wg.Done()
7 fmt.Println(i)
8 time.Sleep(time.Second)
9 }(i)
10 }
11 wg.Wait()
12}
1goroutine 1489841 [running]:
2internal/poll.(*fdMutex).rwlock(0xc000130060, 0x0?)
3 /usr/local/go/src/internal/poll/fd_mutex.go:147 +0x11b
4internal/poll.(*FD).writeLock(...)
5 /usr/local/go/src/internal/poll/fd_mutex.go:239
6internal/poll.(*FD).Write(0xc000130060, {0xc0ca328a90, 0x8, 0x8})
7 /usr/local/go/src/internal/poll/fd_unix.go:370 +0x72
8os.(*File).write(...)
9 /usr/local/go/src/os/file_posix.go:48
10os.(*File).Write(0xc00012e008, {0xc0ca328a90?, 0x8, 0xc0cd90b750?})
11 /usr/local/go/src/os/file.go:175 +0x65
12fmt.Fprintln({0x4b7ff8, 0xc00012e008}, {0xc0cd90b790, 0x1, 0x1})
13 /usr/local/go/src/fmt/print.go:305 +0x75
14fmt.Println(...)
15 /usr/local/go/src/fmt/print.go:314
16main.main.func1(0x0?)
17 /root/zhuangqf/Blank/cmd/main.go:16 +0x8f
18created by main.main
19 /root/zhuangqf/Blank/cmd/main.go:14 +0x3c
20panic: too many concurrent operations on a single file or socket (max 1048575)
1panic: too many concurrent operations on a single file or socket (max 1048575)
1 个 file/socket 的上并发操作个数超过了上限(1048575),简而言之,系统的资源被耗尽了。0xFFFFF = 1048575
如果将 fmt.Printf
这行代码去掉,那程序很可能会因为内存不足而崩溃。每个协程至少需要消耗 2KB 的空间,设计算机的内存是 2GB,那么至多允许 2GB/2KB = 1M 个协程同时存在。那如果协程中还存在着其他需要分配内存的操作,那么允许并发执行的协程将会数量级地减少。
解决方案
推荐解决方案:代码主动限制并发的协程数量。
channel
使用 channel 的缓冲区大小作为协程并发上限
1package main
2
3import (
4 "log"
5 "sync"
6 "time"
7)
8
9func main() {
10 var wg sync.WaitGroup
11 ch := make(chan struct{}, 10)
12
13 for i := 0; i < 100; i++ {
14 wg.Add(1)
15 ch <- struct{}{}
16 go func(i int) {
17 defer wg.Done()
18 log.Println(i)
19 time.Sleep(time.Second * 2)
20 <-ch
21 }(i)
22 }
23
24 wg.Wait()
25}
其中,
make(chan struct{}, 10)
创建缓冲区大小为 3 的 channel,在没有被接收的情况下,至多发送 10 个消息则被阻塞。- 开启协程前,调用
ch <- struct{}{}
,若缓存区满,则阻塞。 - 协程任务结束,调用
<-ch
释放缓冲区。 sync.WaitGroup
并不是必须的,例如 http 服务,每个请求天然是并发的,此时使用 channel 控制并发处理的任务数量,就不需要sync.WaitGroup
。
协程池
https://github.com/panjf2000/ants
样例:
1package main
2
3import (
4 "fmt"
5 "sync"
6 "sync/atomic"
7
8 "github.com/panjf2000/ants/v2"
9)
10
11var sum int32
12
13func myFunc(i interface{}) {
14 n := i.(int32)
15 atomic.AddInt32(&sum, n)
16 fmt.Printf("run with %d\n", n)
17}
18
19func main() {
20 defer ants.Release()
21
22 runTimes := 1000
23 var wg sync.WaitGroup
24
25 // Use the pool with a function,
26 // set 10 to the capacity of goroutine pool and 1 second for expired duration.
27 p, _ := ants.NewPoolWithFunc(10, func(i interface{}) {
28 myFunc(i)
29 wg.Done()
30 })
31 defer p.Release()
32 // Submit tasks one by one.
33 for i := 0; i < runTimes; i++ {
34 wg.Add(1)
35 _ = p.Invoke(int32(i))
36 }
37 wg.Wait()
38 fmt.Printf("running goroutines: %d\n", p.Running())
39 fmt.Printf("finish all tasks, result is %d\n", sum)
40 if sum != 499500 {
41 panic("the final result is wrong!!!")
42 }
43
44}
上述代码自定义了一个 size 为 10 的线程池,我们只需要往线程池提交任务就可以了。