channel到底需不需要手动关闭

前景提要

下面的代码是《go语言圣经》这本书的其中一个案例,其中主协程和子协程兼具生产消费两种身份了,最终当没有新的消息时代码会阻塞住,而书中没有给出该案例的终止方式,自己也是思考了很久,看来还是对go的channel理解不够深,在使用channel的时候一定要有自己的思考,不然可能会引发很多问题,小到程序莫名其妙的panic,大到出现goroutine以及channel的泄露等等!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
func crawl(url string) []string {
urls, err := links.Extract(url) // 书中其他章节, 具体逻辑是提取网页中所有a标签的链接
if err != nil {
log.Print(err)
}
return urls
}

func main() {
worklist := make(chan []string) // lists of URLs, may have duplicates
unseenLinks := make(chan string) // de-duplicated URLs
urls := []string{"http://xxxx.com"}

// Add command-line arguments to worklist.
go func() { worklist <- urls }()

// Create 20 crawler goroutines to fetch each unseen link.
for i := 0; i < 20; i++ {
go func() {
for link := range unseenLinks {
foundLinks := crawl(link)
go func() { worklist <- foundLinks }()
}
}()
}

// The main goroutine de-duplicates worklist items
// and sends the unseen ones to the crawlers.
seen := make(map[string]bool)
for list := range worklist {
for _, link := range list {
if !seen[link] {
seen[link] = true
unseenLinks <- link
}
}
}
}

虽然上述并发案例书中未提及如何终止的问题,不过书里也提供了另外一种并发方式且可自动终止,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// tokens is a counting semaphore used to
// enforce a limit of 20 concurrent requests.
var tokens = make(chan struct{}, 20)


func crawl(w work, mutex sync.Locker) []work {
fmt.Println(url)
tokens <- struct{}{} // acquire a token
urls, err := links.Extract(w.url)
<-tokens // release the token
if err != nil {
log.Print(err)
}
return urls
}

func main() {
worklist := make(chan []string)
var n int // number of pending sends to worklist
url := []string{"http://xxxx.com"}

// start
n++
go func() { worklist <- []work{url} }()

// crawl the web concurrently
visited := make(map[string]bool)

for ; n > 0; n-- {
works := <-worklist
for _, w := range works {
if !visited[w.url] {
visited[w.url] = true
n++
go func(w work) {
worklist <- crawl(w, &lock)
}(w)
}
}
}
}

这种实现方式很巧妙,使用了计数器n进行了限制,主协程在n减为0的时候会终止,子协程也会随之退出。这里的channel在没有被goroutine引用的时候也会被gc所销毁,结合第一个没有终止的案例,我们必须手动去关闭掉生产消费,让程序达到所有消息消费完后自动终止的目的。那么以下知识点就是必须要掌握的!

什么情况下关闭channel会引发panic?

示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// 1. 关闭未初始化的chan
func TestCloseNilChan(t *testing.T) {
var errCh chan error
close(errCh)

// Output:
// panic: close of nil channel
}

// 2. 关闭已经关闭的chan
func TestRepeatClosingChan(t *testing.T) {
errCh := make(chan error)
close(errCh)
close(errCh)

// Output:
// panic: close of closed channel
}

// 3. 关闭chan后发送数据
func TestSendOnClosingChan(t *testing.T) {
errCh := make(chan error)
close(errCh)
errCh <- errors.New("chan error")

// Output:
// panic: send on closed channel
}

// 4. 发送数据时关闭chan
func TestCloseOnSendingToChan(t *testing.T) {
errCh := make(chan error)
go func() {
errCh <- errors.New("chan error")
}()
time.Sleep(1 * time.Second)
close(errCh)

// Output:
// panic: send on closed channel
}

总结, 下述四种情况下关闭channel会引发panic:

  1. 关闭未初始化的channel
  2. 关闭已经关闭的channel
  3. 在关闭channel后发送数据
  4. 在发送数据时关闭channel

另外,可总结出以下规律:

  • 只能让channel的唯一发送者关闭此channel
  • 如果有多个发送者,应该使用专门的信号通知stop channel

    是否有必要关闭channel?不关闭又如何?

    当channel的发送次数等于接收次数

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    // 1. 当channel的发送次数等于接收次数
    func TestIsCloseChannelNecessary_on_equal(t *testing.T) {
    fmt.Println("NumGoroutine:", runtime.NumGoroutine())
    ch := make(chan int)

    // sender
    go func() {
    for i := 0; i < 3; i++ {
    ch <- i
    }
    }()

    // receiver
    go func() {
    for i := 0; i < 3; i++ {
    fmt.Println(<-ch)
    }
    }()
    time.Sleep(time.Second * 1)
    fmt.Println("NumGoroutine:", runtime.NumGoroutine())

    // Output:
    // NumGoroutine: 2
    // 0
    // 1
    // 2
    // NumGoroutine: 2
    }
    channel的发送次数等于接收次数时,发送者的goroutine和接收者的goroutine分别会在发送和接收结束时结束自己的goroutine,用于传输数据的channel也会由于代码没有使用被垃圾收集器回收。所以该种情况下不需手动关闭chanel。

    当channel的发送次数大于/小于接收次数

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    // 2. 当channel的发送次数大于/小于接收次数
    func TestIsCloseChannelNecessary_on_more_equal(t *testing.T) {
    fmt.Println("NumGoroutine:", runtime.NumGoroutine())
    ch := make(chan int)

    // sender
    go func() {
    defer fmt.Println("exit 1")
    for i := 0; i < 4; i++ {
    ch <- i
    }
    }()

    // receiver
    go func() {
    defer fmt.Println("exit 2")
    for i := 0; i < 3; i++ {
    fmt.Println(<-ch)
    }
    }()
    time.Sleep(time.Second * 1)
    fmt.Println("NumGoroutine:", runtime.NumGoroutine())

    // Output:
    // NumGoroutine: 2
    // 0
    // 1
    // 2
    // exit 2
    // NumGoroutine: 3
    }
    channel的发送次数大于接收次数时,发送者goroutine会等待接收者接收而一直阻塞。所以发送者goroutine一直未退出,channel也会由于一直被发送者使用而无法被垃圾回收。未退出的goroutine和channel会造成内存泄露等问题。

    总结:

    • 在只有一个发送者和一个接收者的情况下,只要确保发送者或接收者不会阻塞,不关闭channel是可行的。
    • 在无法判断channel的发送次数和接收次数时,应当在合适的时机关闭channel。
    • 另外使用for range从channel取值的时候,需要手动close掉channel,否则消费者会一直阻塞进而panic抛出错误,会被判定为死锁。

如何判断channel是否关闭?

channel关闭后继续读取该chennel不会阻塞,而是返回对应类型的零值。

使用channel的多重返回值(如err, ok := <- errCh)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 1. 使用channel的返回值判断其是否关闭
func TestReadFromClosedChan(t *testing.T) {
var errCh = make(chan error)
go func() {
defer close(errCh)
errCh <- errors.New("chan error")
}()

go func() {
for i := 0; i < 3; i++ {
err, ok := <-errCh;
fmt.Println(i, err, ok)
}
}()

time.Sleep(time.Second)

// Output:
// 0 chan error true
// 1 <nil> false
// 2 <nil> false
}

err, ok := <- errCh的第二个值ok返回false表示该channel已关闭。

使用for range简化语法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 2. 使用for range简化语法
func TestReadFromClosedChan2(t *testing.T) {
var errCh = make(chan error)
go func() {
defer close(errCh)
errCh <- errors.New("chan error")
}()

go func() {
i := 0
for err := range errCh {
fmt.Println(i, err)
i++
}
}()

time.Sleep(time.Second)

// Output:
// 0 chan error
}

如何优雅的关闭channel?

详细案例参考文章:https://gfw.go101.org/article/channel-closing.html

在使用单通道的函数中错误的关闭channel的话,编译的时候就会报错

参考上述文章,针对常规情况下需要关闭channel的四种场景,做了以下总结:

  • 一个发送者,一个接收者:发送者关闭channel;接收者使用select或for range判断channel是否关闭
  • 一个发送者,多个接收者:同上
  • 多个发送者,一个接收者:接收者接收完后,使用专门的信号channel关闭;发送者使用select监听该信号channel是否关闭
  • 多个发送者,多个接收者:任意一方或第三方使用专门的信号channel关闭;发送者,接收者都使用select监听该信号channel是否关闭

    总结

    回到开头例举的爬虫案例,个人进行了改写,主要是添加了超过递归深度时的退出以及超时退出和使用信号channel通知所用生产消费的goroutine关闭。具体代码如下,还有很多完善的点,以后理解更深后再进行修改:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    type work struct {
    url string
    depth int
    }

    func crawl(w work, quit chan struct{}) []work {
    fmt.Printf("depth: %d, url: %s\n", w.depth, w.url)
    if w.depth > 3 {
    quit <- struct{}{}
    return nil
    }
    urls, err := links.Extract(w.url)
    if err != nil {
    log.Print(err)
    }

    var works []work
    for _, url := range urls {
    works = append(works, work{url, w.depth + 1})
    }

    return works
    }

    //!+
    func main() {
    worklist := make(chan []work) // lists of URLs, may have duplicates
    unseenLinks := make(chan work) // de-duplicated URLs
    stopCh := make(chan struct{}) // signal chan to stop all goroutine

    quit := make(chan struct{})
    urls := work{"http://example.com/", 1}

    // Add command-line arguments to worklist.
    go func() { worklist <- []work{urls} }()

    // Create 20 crawler goroutines to fetch each unseen link.
    for i := 0; i < 20; i++ {
    go func() {
    for {
    select {
    case <-stopCh:
    return
    case link, _ := <-unseenLinks:
    foundLinks := crawl(link, quit)
    go func() {worklist <- foundLinks}()
    }
    }
    }()
    }

    // The main goroutine de-duplicates worklist items
    // and sends the unseen ones to the crawlers.
    seen := make(map[string]bool)

    for {
    select {
    case list := <-worklist:
    for _, link := range list {
    if !seen[link.url] {
    seen[link.url] = true
    unseenLinks <- link
    }
    }
    case <-quit:
    fmt.Println("Exit, 111")
    close(stopCh)
    return
    case <-time.After(3 * time.Second): // 如果上面的ch一直没数据会阻塞, 那么select也会检测其他case条件, 检测到后3s超时退出
    fmt.Println("Exit, timeout")
    close(stopCh)
    return
    }
    }
    }