关于channel和goroutine的内存泄漏问题

本文主要了解为关闭channel情况下所引发的内存泄露问题。

一个发送者导致的内存泄露

主要原因是接收者提前退出了,导致发送者一直阻塞,最后导致了goroutine泄露,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func TestLeakOfMemory(t *testing.T) {
fmt.Println("NumGoroutine:", runtime.NumGoroutine())
chanLeakOfMemory()
time.Sleep(time.Second * 3) // 等待 goroutine 执行,防止过早输出结果
fmt.Println("NumGoroutine:", runtime.NumGoroutine())
}

func chanLeakOfMemory() {
errCh := make(chan error) // (1)
go func() { // (5)
time.Sleep(2 * time.Second)
errCh <- errors.New("chan error") // (2)
}()

var err error
select {
case <-time.After(time.Second): // (3) 大家也经常在这里使用 <-ctx.Done()
fmt.Println("超时")
case err = <-errCh: // (4)
if err != nil {
fmt.Println(err)
} else {
fmt.Println(nil)
}
}
}

上述代码分析:

  • 由于没有发送方想errCh发送数据,故代码在(4)处阻塞
  • 当(3)处超时后,函数退出且(4)处代码并未接收成功
  • 之后(2)开始执行,由于errCh没有接收者,故一直阻塞在(2)出
  • 因为(2)出的代码所在协程一直没有退出,故发生了内存泄露

这种情况处理起来也较为简单,只需将channel设置为有缓冲的就行。例如将(1)处代码改为errCh := make(chan error, 1)即可。

多个发送者导致的内存泄露

产生原因也和上述相同,当接收者提前退出了,那么至少有一个goroutine无法退出,进而造成内存泄露。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func TestLeakOfMemory2(t *testing.T) {
fmt.Println("NumGoroutine:", runtime.NumGoroutine())
chanLeakOfMemory2()
time.Sleep(time.Second * 3) // 等待 goroutine 执行,防止过早输出结果
fmt.Println("NumGoroutine:", runtime.NumGoroutine())
}

func chanLeakOfMemory2() {
ich := make(chan int, 100) // (3)
// sender
go func() {
defer close(ich)
for i := 0; i < 10000; i++ {
ich <- i
time.Sleep(time.Millisecond) // 控制一下,别发太快
}
}()
// receiver
go func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
for i := range ich { // (2)
if ctx.Err() != nil { // (1)
fmt.Println(ctx.Err())
return
}
fmt.Println(i)
}
}()
}

尽管上述代码使用了有缓冲的channel,依然可能会出现接收者提前退出,导致有缓冲channel的缓存队列被占满,阻塞在第101个位置。这种情况需要使用一个额外的stop channel来结束发送者所在的goroutine,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func TestLeakOfMemory2(t *testing.T) {
fmt.Println("NumGoroutine:", runtime.NumGoroutine())
chanLeakOfMemory2()
time.Sleep(time.Second * 3) // 等待 goroutine 执行,防止过早输出结果
fmt.Println("NumGoroutine:", runtime.NumGoroutine())
}

func chanLeakOfMemory2() {
ich := make(chan int, 100)
stopCh := make(chan struct{})
// sender
go func() {
defer close(ich)
for i := 0; i < 10000; i++ {
select {
case <-stopCh:
case ich <- i:
return
}
time.Sleep(time.Millisecond) // 控制一下,别发太快
}
}()
// receiver
go func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
for i := range ich {
if ctx.Err() != nil {
fmt.Println(ctx.Err())
close(stopCh)
return
}
fmt.Println(i)
}
}()
}

总结

不论发送者发送一次还是多次,如果接收者在接收完channel中的数据之前退出,那么就会造成内存泄露。如果接收者需要在channel关闭之前退出,为了防止内存泄露,在发送者与接收者一对一时,应设置channel缓冲队列为1;在发送者与接收者一对多或多对多时,应使用专门的stop channel通知发送者关闭相应channel。