如何控制并发速率

为什么需要控制

1
2
3
4
5
6
7
8
9
10
11
12
func main() {
var wg sync.WaitGroup
for i := 0; i < math.MaxInt32; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
fmt.Println(i)
time.Sleep(time.Second)
}(i)
}
wg.Wait()
}

过高的并发会将系统资源消耗殆尽,导致程序运行最终panic,关键的报错信息如下:
panic: too many concurrent operations on a single file or socket (max 1048575)

  • 导致出现上述错误的原因是源自fmt.Printf函数输出到标准输出,标准输出也可以视为文件,总之就是系统的资源被耗尽了。
  • 就算注释掉fmt.Printf函数,也会因为内存不足而最终崩溃。

如何解决

解决的主要方式就是限制并发的协程数量。

使用带缓冲的channel进行控制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func main() {
var wg sync.WaitGroup
ch := make(chan struct{}, 3)
for i := 0; i < 10; i++ {
ch <- struct{}{}
wg.Add(1)
go func(i int) {
defer wg.Done()
log.Println(i)
time.Sleep(time.Second)
<-ch
}(i)
}
wg.Wait()
}
  • 创建一个缓冲区大小为3的channel,在没有被接收的情况下,最多发送3个消息则被阻塞
  • 开启协程前,调用ch <- struct{}{},若缓冲区满则阻塞
  • 协程任务结束,调用<-ch释放缓冲区

利用第三方库

目前有很多第三方库实现了协程池,可以很方便的用来控制协程的并发数量,如Jeffail/tunnypanjf2000/ants,以tunny举例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
func main() {
pool := tunny.NewFunc(3, func(i interface{}) interface{} {
log.Println(i)
time.Sleep(time.Second)
return nil
})
defer pool.Close()

for i := 0; i < 10; i++ {
go pool.Process(i)
}
time.Sleep(time.Second * 4)
}
  • tunny.NewFunc(3, f)第一个参数是协程池的大小(poolSize),第二个参数是协程运行的函数(worker)
  • pool.Process(i)将参数i传递给协程池定义好的worker处理
  • pool.Close()关闭协程池

调整系统资源上限

ulimit

有些情况下,即使我们有效的限制了协程的并发数量,但是仍旧出现某一类资源不足的问题,例如:

  • too many open files
  • out of memory

操作系统通常会限制同时打开文件数量,栈空间大小等,ulimit -a可以看到系统的当前设置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
[root@master ~]# ulimit -a
core file size (blocks, -c) unlimited
data seg size (kbytes, -d) unlimited
scheduling priority (-e) 0
file size (blocks, -f) unlimited
pending signals (-i) 14997
max locked memory (kbytes, -l) 64
max memory size (kbytes, -m) unlimited
open files (-n) 100001
pipe size (512 bytes, -p) 8
POSIX message queues (bytes, -q) 819200
real-time priority (-r) 0
stack size (kbytes, -s) 8192
cpu time (seconds, -t) unlimited
max user processes (-u) 14997
virtual memory (kbytes, -v) unlimited
file locks (-x) unlimited

进而进行按需调整即可。

虚拟内存virtual memory

虚拟内存是一项非常常见的技术,当内存不足时,将磁盘映射为内存使用,比如linux下的交换分区(swap space)。在linux上创建并使用交换分区是一件非常简单的事情:

1
2
3
4
5
sudo fallocate -l 20G /mnt/.swapfile # 创建 20G 空文件
sudo mkswap /mnt/.swapfile # 转换为交换分区文件
sudo chmod 600 /mnt/.swapfile # 修改权限为 600
sudo swapon /mnt/.swapfile # 激活交换分区
free -m # 查看当前内存使用情况(包括交换分区)

关闭交换分区也非常简单:

1
2
sudo swapoff /mnt/.swapfile
rm -rf /mnt/.swapfile

磁盘的 I/O 读写性能和内存条相差是非常大的,例如 DDR3 的内存条读写速率很容易达到 20GB/s,但是 SSD 固态硬盘的读写性能通常只能达到 0.5GB/s,相差 40倍之多。因此,使用虚拟内存技术将硬盘映射为内存使用,显然会对性能产生一定的影响。如果应用程序只是在较短的时间内需要较大的内存,那么虚拟内存能够有效避免 out of memory 的问题。如果应用程序长期高频度读写大量内存,那么虚拟内存对性能的影响就比较明显了。