并发过高导致程序崩溃

 1func main() {  
 2	var wg sync.WaitGroup  
 3	for i := 0; i < math.MaxInt32; i++ {  
 4		wg.Add(1)  
 5		go func(i int) {  
 6			defer wg.Done()  
 7			fmt.Println(i)  
 8			time.Sleep(time.Second)  
 9		}(i)  
10	}  
11	wg.Wait()  
12}
 1goroutine 1489841 [running]:
 2internal/poll.(*fdMutex).rwlock(0xc000130060, 0x0?)
 3        /usr/local/go/src/internal/poll/fd_mutex.go:147 +0x11b
 4internal/poll.(*FD).writeLock(...)
 5        /usr/local/go/src/internal/poll/fd_mutex.go:239
 6internal/poll.(*FD).Write(0xc000130060, {0xc0ca328a90, 0x8, 0x8})
 7        /usr/local/go/src/internal/poll/fd_unix.go:370 +0x72
 8os.(*File).write(...)
 9        /usr/local/go/src/os/file_posix.go:48
10os.(*File).Write(0xc00012e008, {0xc0ca328a90?, 0x8, 0xc0cd90b750?})
11        /usr/local/go/src/os/file.go:175 +0x65
12fmt.Fprintln({0x4b7ff8, 0xc00012e008}, {0xc0cd90b790, 0x1, 0x1})
13        /usr/local/go/src/fmt/print.go:305 +0x75
14fmt.Println(...)
15        /usr/local/go/src/fmt/print.go:314
16main.main.func1(0x0?)
17        /root/zhuangqf/Blank/cmd/main.go:16 +0x8f
18created by main.main
19        /root/zhuangqf/Blank/cmd/main.go:14 +0x3c
20panic: too many concurrent operations on a single file or socket (max 1048575)
1panic: too many concurrent operations on a single file or socket (max 1048575)

1 个 file/socket 的上并发操作个数超过了上限(1048575),简而言之,系统的资源被耗尽了。0xFFFFF = 1048575

如果将 fmt.Printf 这行代码去掉,那程序很可能会因为内存不足而崩溃。每个协程至少需要消耗 2KB 的空间,设计算机的内存是 2GB,那么至多允许 2GB/2KB = 1M 个协程同时存在。那如果协程中还存在着其他需要分配内存的操作,那么允许并发执行的协程将会数量级地减少。

解决方案

推荐解决方案:代码主动限制并发的协程数量。

channel

使用 channel 的缓冲区大小作为协程并发上限

 1package main
 2
 3import (
 4	"log"
 5	"sync"
 6	"time"
 7)
 8
 9func main() {
10	var wg sync.WaitGroup
11	ch := make(chan struct{}, 10)
12
13	for i := 0; i < 100; i++ {
14		wg.Add(1)
15		ch <- struct{}{}
16		go func(i int) {
17			defer wg.Done()
18			log.Println(i)
19			time.Sleep(time.Second * 2)
20			<-ch
21		}(i)
22	}
23
24	wg.Wait()
25}

其中,

  • make(chan struct{}, 10) 创建缓冲区大小为 3 的 channel,在没有被接收的情况下,至多发送 10 个消息则被阻塞。
  • 开启协程前,调用 ch <- struct{}{},若缓存区满,则阻塞。
  • 协程任务结束,调用 <-ch 释放缓冲区。
  • sync.WaitGroup 并不是必须的,例如 http 服务,每个请求天然是并发的,此时使用 channel 控制并发处理的任务数量,就不需要 sync.WaitGroup

协程池

https://github.com/panjf2000/ants

样例:

 1package main
 2
 3import (
 4	"fmt"
 5	"sync"
 6	"sync/atomic"
 7
 8	"github.com/panjf2000/ants/v2"
 9)
10
11var sum int32
12
13func myFunc(i interface{}) {
14	n := i.(int32)
15	atomic.AddInt32(&sum, n)
16	fmt.Printf("run with %d\n", n)
17}
18
19func main() {
20	defer ants.Release()
21
22	runTimes := 1000
23	var wg sync.WaitGroup
24	
25	// Use the pool with a function,
26	// set 10 to the capacity of goroutine pool and 1 second for expired duration.
27	p, _ := ants.NewPoolWithFunc(10, func(i interface{}) {
28		myFunc(i)
29		wg.Done()
30	})
31	defer p.Release()
32	// Submit tasks one by one.
33	for i := 0; i < runTimes; i++ {
34		wg.Add(1)
35		_ = p.Invoke(int32(i))
36	}
37	wg.Wait()
38	fmt.Printf("running goroutines: %d\n", p.Running())
39	fmt.Printf("finish all tasks, result is %d\n", sum)
40	if sum != 499500 {
41		panic("the final result is wrong!!!")
42	}
43
44}

上述代码自定义了一个 size 为 10 的线程池,我们只需要往线程池提交任务就可以了。