前景提要
下面的代码是《go语言圣经》这本书的其中一个案例,其中主协程和子协程兼具生产消费两种身份了,最终当没有新的消息时代码会阻塞住,而书中没有给出该案例的终止方式,自己也是思考了很久,看来还是对go的channel理解不够深,在使用channel的时候一定要有自己的思考,不然可能会引发很多问题,小到程序莫名其妙的panic,大到出现goroutine以及channel的泄露等等!
1 | func crawl(url string) []string { |
虽然上述并发案例书中未提及如何终止的问题,不过书里也提供了另外一种并发方式且可自动终止,如下:
1 | // tokens is a counting semaphore used to |
这种实现方式很巧妙,使用了计数器n进行了限制,主协程在n减为0的时候会终止,子协程也会随之退出。这里的channel在没有被goroutine引用的时候也会被gc所销毁,结合第一个没有终止的案例,我们必须手动去关闭掉生产消费,让程序达到所有消息消费完后自动终止的目的。那么以下知识点就是必须要掌握的!
什么情况下关闭channel会引发panic?
示例如下:
1 | // 1. 关闭未初始化的chan |
总结, 下述四种情况下关闭channel会引发panic:
- 关闭未初始化的channel
- 关闭已经关闭的channel
- 在关闭channel后发送数据
- 在发送数据时关闭channel
另外,可总结出以下规律:
- 只能让channel的唯一发送者关闭此channel
- 如果有多个发送者,应该使用专门的信号通知stop channel
是否有必要关闭channel?不关闭又如何?
当channel的发送次数等于接收次数
channel的发送次数等于接收次数时,发送者的goroutine和接收者的goroutine分别会在发送和接收结束时结束自己的goroutine,用于传输数据的channel也会由于代码没有使用被垃圾收集器回收。所以该种情况下不需手动关闭chanel。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28// 1. 当channel的发送次数等于接收次数
func TestIsCloseChannelNecessary_on_equal(t *testing.T) {
fmt.Println("NumGoroutine:", runtime.NumGoroutine())
ch := make(chan int)
// sender
go func() {
for i := 0; i < 3; i++ {
ch <- i
}
}()
// receiver
go func() {
for i := 0; i < 3; i++ {
fmt.Println(<-ch)
}
}()
time.Sleep(time.Second * 1)
fmt.Println("NumGoroutine:", runtime.NumGoroutine())
// Output:
// NumGoroutine: 2
// 0
// 1
// 2
// NumGoroutine: 2
}当channel的发送次数大于/小于接收次数
channel的发送次数大于接收次数时,发送者goroutine会等待接收者接收而一直阻塞。所以发送者goroutine一直未退出,channel也会由于一直被发送者使用而无法被垃圾回收。未退出的goroutine和channel会造成内存泄露等问题。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31// 2. 当channel的发送次数大于/小于接收次数
func TestIsCloseChannelNecessary_on_more_equal(t *testing.T) {
fmt.Println("NumGoroutine:", runtime.NumGoroutine())
ch := make(chan int)
// sender
go func() {
defer fmt.Println("exit 1")
for i := 0; i < 4; i++ {
ch <- i
}
}()
// receiver
go func() {
defer fmt.Println("exit 2")
for i := 0; i < 3; i++ {
fmt.Println(<-ch)
}
}()
time.Sleep(time.Second * 1)
fmt.Println("NumGoroutine:", runtime.NumGoroutine())
// Output:
// NumGoroutine: 2
// 0
// 1
// 2
// exit 2
// NumGoroutine: 3
}总结:
- 在只有一个发送者和一个接收者的情况下,只要确保发送者或接收者不会阻塞,不关闭channel是可行的。
- 在无法判断channel的发送次数和接收次数时,应当在合适的时机关闭channel。
- 另外使用for range从channel取值的时候,需要手动close掉channel,否则消费者会一直阻塞进而panic抛出错误,会被判定为死锁。
如何判断channel是否关闭?
channel关闭后继续读取该chennel不会阻塞,而是返回对应类型的零值。
使用channel的多重返回值(如err, ok := <- errCh)
1 | // 1. 使用channel的返回值判断其是否关闭 |
err, ok := <- errCh的第二个值ok返回false表示该channel已关闭。
使用for range简化语法
1 | // 2. 使用for range简化语法 |
如何优雅的关闭channel?
详细案例参考文章:https://gfw.go101.org/article/channel-closing.html
在使用单通道的函数中错误的关闭channel的话,编译的时候就会报错
参考上述文章,针对常规情况下需要关闭channel的四种场景,做了以下总结:
- 一个发送者,一个接收者:发送者关闭channel;接收者使用select或for range判断channel是否关闭
- 一个发送者,多个接收者:同上
- 多个发送者,一个接收者:接收者接收完后,使用专门的信号channel关闭;发送者使用select监听该信号channel是否关闭
- 多个发送者,多个接收者:任意一方或第三方使用专门的信号channel关闭;发送者,接收者都使用select监听该信号channel是否关闭
总结
回到开头例举的爬虫案例,个人进行了改写,主要是添加了超过递归深度时的退出以及超时退出和使用信号channel通知所用生产消费的goroutine关闭。具体代码如下,还有很多完善的点,以后理解更深后再进行修改:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75type work struct {
url string
depth int
}
func crawl(w work, quit chan struct{}) []work {
fmt.Printf("depth: %d, url: %s\n", w.depth, w.url)
if w.depth > 3 {
quit <- struct{}{}
return nil
}
urls, err := links.Extract(w.url)
if err != nil {
log.Print(err)
}
var works []work
for _, url := range urls {
works = append(works, work{url, w.depth + 1})
}
return works
}
//!+
func main() {
worklist := make(chan []work) // lists of URLs, may have duplicates
unseenLinks := make(chan work) // de-duplicated URLs
stopCh := make(chan struct{}) // signal chan to stop all goroutine
quit := make(chan struct{})
urls := work{"http://example.com/", 1}
// Add command-line arguments to worklist.
go func() { worklist <- []work{urls} }()
// Create 20 crawler goroutines to fetch each unseen link.
for i := 0; i < 20; i++ {
go func() {
for {
select {
case <-stopCh:
return
case link, _ := <-unseenLinks:
foundLinks := crawl(link, quit)
go func() {worklist <- foundLinks}()
}
}
}()
}
// The main goroutine de-duplicates worklist items
// and sends the unseen ones to the crawlers.
seen := make(map[string]bool)
for {
select {
case list := <-worklist:
for _, link := range list {
if !seen[link.url] {
seen[link.url] = true
unseenLinks <- link
}
}
case <-quit:
fmt.Println("Exit, 111")
close(stopCh)
return
case <-time.After(3 * time.Second): // 如果上面的ch一直没数据会阻塞, 那么select也会检测其他case条件, 检测到后3s超时退出
fmt.Println("Exit, timeout")
close(stopCh)
return
}
}
}