之前有记录过在go中如何控制并发速率,要么就是通过带缓冲的channel,要么就是通过协程池。不论是通过什么方式,选择最适合在当前场景使用的即可,下面记录下学习过程中发现的开源项目中不同的实现方式。
ratelimit
来自
projectdiscovery/ratelimit
个人理解该项目的特点是能控制的是每隔一段时间(自己定义),起多少个并发,之前起的goroutine还在执行的话也没有影响,故如果goroutine执行的若是比较耗时的操作,那么会不会存在goroutine的数量一直在增加,消耗完所有内存,导致程序崩溃的情况。
不过这主要还是和使用方式有关,排除上述疑问的话,这个实现速率控制的方式也非常值得学习
源码分析
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36type Limiter struct {
maxCount int64 // 定义的最大并发数
count int64 // 当前剩余的令牌数(token)
ticker *time.Ticker // 定时器,执行定时任务(标准库)
tokens chan struct{} // 无缓冲channel,用于获取令牌
ctx context.Context // 上下文,终止定时器及创建的子协程
}
func (limiter *Limiter) run() {
for {
// 当剩余的令牌数小于等于零时,等待设置的时间间隔后,重置剩余令牌数
if limiter.count <= 0 {
<-limiter.ticker.C
limiter.count = limiter.maxCount
}
select {
// 接收退出信号,关闭定时器并返回
case <-limiter.ctx.Done():
limiter.ticker.Stop()
return
// 获取
case limiter.tokens <- struct{}{}:
limiter.count--
// 每过一段时间(设置的时间间隔),重置剩余令牌数
// 与其他case是否执行无关,当其他case执行的时间小于该间隔,则其他case不会被执行
case <-limiter.ticker.C:
limiter.count = limiter.maxCount
}
}
}
// Take one token from the bucket
func (rateLimiter *Limiter) Take() {
<-rateLimiter.tokens
}使用方式
官方示例中还有两个新建
Limiter
的方法,一个是控制并发速率和时间间隔的New
,一个是不做任何控制的NewUnlimited
,如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27// New creates a new limiter instance with the tokens amount and the interval
func New(ctx context.Context, max int64, duration time.Duration) *Limiter {
limiter := &Limiter{
maxCount: max,
count: max,
ticker: time.NewTicker(duration),
tokens: make(chan struct{}),
ctx: ctx,
}
go limiter.run()
return limiter
}
// NewUnlimited create a bucket with approximated unlimited tokens
func NewUnlimited(ctx context.Context) *Limiter {
limiter := &Limiter{
maxCount: math.MaxInt64,
count: math.MaxInt64,
ticker: time.NewTicker(time.Millisecond),
tokens: make(chan struct{}),
ctx: ctx,
}
go limiter.run()
return limiter
}个人写的一个使用案例如下,还是得多思考程序的一个运行流程:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45func main() {
expected := 5 * time.Second
limiter := New(context.Background(), 5, expected)
start := time.Now()
for i := 0; i < 20; i++ {
limiter.Take()
go func(_i int) {
fmt.Println(_i, "+++", time.Since(start))
time.Sleep(10 * time.Second)
fmt.Println(_i, "---", time.Since(start))
}(i)
}
took := time.Since(start)
fmt.Println(took)
limiter.Take()
took = time.Since(start)
fmt.Println(took)
select {
}
}
// 部分输出结果如下:
// 1 +++ 0s
// 0 +++ 0s
// 3 +++ 0s
// 2 +++ 0s
// 4 +++ 0s
// 6 +++ 5.0139749s
// 5 +++ 5.0139749s
// 8 +++ 5.0139749s
// 7 +++ 5.0139749s
// 9 +++ 5.0139749s
// 4 --- 10.0149828s
// 10 +++ 10.0149828s
// 11 +++ 10.0149828s
// 1 --- 10.0149828s
// 2 --- 10.0149828s
// 3 --- 10.0149828s
// 0 --- 10.0149828s
// 13 +++ 10.0149828s
// 12 +++ 10.0149828s
// 14 +++ 10.0149828ssizedwaitgroup
来自
remeh/sizedwaitgroup
这个项目是对标准库
sync.WaitGroup
进行了进一步的封装,定义了一个大小和控制并发的channel,从实现上来看还是非常简单易懂,就不做注释了对比
ratelimit
,个人理解这个是严格控制了同一时刻goroutine的并发数源码分析
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67// SizedWaitGroup has the same role and close to the
// same API as the Golang sync.WaitGroup but adds a limit of
// the amount of goroutines started concurrently.
type SizedWaitGroup struct {
Size int
current chan struct{}
wg sync.WaitGroup
}
// New creates a SizedWaitGroup.
// The limit parameter is the maximum amount of
// goroutines which can be started concurrently.
func New(limit int) SizedWaitGroup {
size := math.MaxInt32 // 2^31 - 1
if limit > 0 {
size = limit
}
return SizedWaitGroup{
Size: size,
current: make(chan struct{}, size),
wg: sync.WaitGroup{},
}
}
// Add increments the internal WaitGroup counter.
// It can be blocking if the limit of spawned goroutines
// has been reached. It will stop blocking when Done is
// been called.
//
// See sync.WaitGroup documentation for more information.
func (s *SizedWaitGroup) Add() {
s.AddWithContext(context.Background())
}
// AddWithContext increments the internal WaitGroup counter.
// It can be blocking if the limit of spawned goroutines
// has been reached. It will stop blocking when Done is
// been called, or when the context is canceled. Returns nil on
// success or an error if the context is canceled before the lock
// is acquired.
//
// See sync.WaitGroup documentation for more information.
func (s *SizedWaitGroup) AddWithContext(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
case s.current <- struct{}{}:
break
}
s.wg.Add(1)
return nil
}
// Done decrements the SizedWaitGroup counter.
// See sync.WaitGroup documentation for more information.
func (s *SizedWaitGroup) Done() {
<-s.current
s.wg.Done()
}
// Wait blocks until the SizedWaitGroup counter is zero.
// See sync.WaitGroup documentation for more information.
func (s *SizedWaitGroup) Wait() {
s.wg.Wait()
}使用方式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32func query(i int) {
fmt.Println(i)
time.Sleep(5 * time.Second)
}
func main() {
swg := New(5)
for i := 0; i < 50; i++ {
swg.Add()
go func(i int) {
defer swg.Done()
query(i)
}(i)
}
}
// 部分输出如下
// 0 5.0046797s
// 1 5.0046797s
// 2 5.0048117s
// 3 5.0046797s
// 4 5.0048117s
// 6 10.020529s
// 8 10.020529s
// 9 10.020529s
// 7 10.020529s
// 5 10.020529s
// 14 15.0282816s
// 12 15.0283974s
// 11 15.0283974s
// 10 15.0283974s
// 13 15.0283974s总结
通过上述案例可以发现,不论是通过哪种方式去控制并发速率,它们都是通过channel去进行控制的,区别在于
ratelimit
是使用的无缓冲的channel,而sizedwaitgroup
使用的是有缓冲的channel。故回归到本质,熟练掌握基础,深刻理解底层实现才是后期不断进阶的基石。