微服务限流之二:令牌桶算法源码解析


0. 写在前面

上一篇文章微服务限流之一:令牌桶算法介绍了令牌桶算法的原理、官方实现和用法,本文就来看看官方是如何实现的,深入分析它的源码。

官方在golang.org/x/time/rate库中实现的令牌桶算法,这个实现很有意思,并没有真正的使用一个定时器不断的生成令牌,而是靠计算的方式来完成。

1. 介绍

这个库并没有使用定时器来发放 token 而是用了 lazyload 的方式,等需要消费 token 的时候才通过时间去计算然后更新 token 的数量,下面我们先通过一个例子来看一下这个流程是怎么跑的

如上图所示,假设我们有一个限速器,它的 token 生成速度为 1,也就是一秒一个,桶的大小为 10,每个格子表示一秒的时间间隔

  • last表示上一次更新 token时还有 2 个token
  • 现在我们有一个请求竟来, 总共需要7个 token才能完成请求
  • now表示我现在进来的时间,距离last 已经过去了2s, 那么现在就有4个token(每秒生成一个token)
  • 所以,如果需要 7 个 token 那么也就还需要等待 3s 中才真的有 7 个,所以这就是 timeToAct 所在的时间节点
  • 预约成功之后更新 last = now 、token = -3 因为 token 已经被预约出去了所以现在剩下的就是负数了

2. Limiter

limiter是这个包的主要数据结构

type Limiter struct {
    // 互斥锁
	mu     sync.Mutex
 
    // 每秒产生 token 的速度, 其实是 float64 的一个别名
	limit  Limit
 
    // 桶的大小
	burst  int
 
    // 当前时间节点拥有的 tokens 数量
	tokens float64
 
	// 上次更新 token 的时间
	last time.Time
 
	// 上次限速的时间,这个时间可能是过去的某个时间也可能是将来的某个时间
	lastEvent time.Time
}

3. Reservation

预定,表示预约某个时间的 token

type Reservation struct {
    // 是否能预约上
	ok        bool
    // limter
	lim       *Limiter
    // 预约的 token 数量
	tokens    int
    // token 实际使用的时间
	timeToAct time.Time
	// 保存一下速率,因为 lim 的速率是可以被动态调整的,所以不能直接用
	limit Limit
}

4. 消费token

总共有三种消费 token 的方法 AllowN, ReserveN, WaitN最终都是调用的reserveN 这个方法

// now: 需要消费 token 的时间点
// n: 需要多少个 token
// maxFutureReserve: 能够等待的最长时间
func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {
	lim.mu.Lock()
 
    // 如果发放令牌的速度无穷大的话,那么直接返回就行了,要多少可以给多少
	if lim.limit == Inf {
		lim.mu.Unlock()
		return Reservation{
			ok:        true,
			lim:       lim,
			tokens:    n,
			timeToAct: now,
		}
	}
 
    // advance 方法会去计算当前有多少个 token
    // 后面会讲到,now 其实就是传入的时间,但是 last 可能会变
	now, last, tokens := lim.advance(now)
 
	// 发放 token 之后还剩多少
	tokens -= float64(n)
 
	// 根据 token 数量计算需要等待的时间
	var waitDuration time.Duration
	if tokens < 0 {
		waitDuration = lim.limit.durationFromTokens(-tokens)
	}
 
	// 计算是否可以发放,如果需要的量比桶的容量还大肯定是不行的
    // 然后就是看需要能否容忍需要等待的时间
	ok := n <= lim.burst && waitDuration <= maxFutureReserve
 
	// Prepare reservation
	r := Reservation{
		ok:    ok,
		lim:   lim,
		limit: lim.limit,
	}
    // 如果可以的话,就把 token 分配给预约者
	if ok {
		r.tokens = n
		r.timeToAct = now.Add(waitDuration)
	}
 
	// 更新各个字段的状态
	if ok {
		lim.last = now
		lim.tokens = tokens
		lim.lastEvent = r.timeToAct
	} else {
        // 为什么不 ok 也要更新 last 呢?因为 last 可能会改变
		lim.last = last
	}
 
	lim.mu.Unlock()
	return r
}

advance 方法用于计算 token 的数量

// now 是传入的当前的时间点,返回的 newNow 其实就是传入的参数,没有任何改变
// newLast 是更新 token 的时间
// newTokens 是 token 的数量
func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) {
	// 如果当前时间比上次更新 token 的时间还要早,那么就重置一下 last
    last := lim.last
	if now.Before(last) {
		last = now
	}
 
	// 这里为了防止溢出,先计算了将桶填满需要花费的最大时间
	maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens)
    // 计算时间差,如果大于最大时间的话,就取最大值
	elapsed := now.Sub(last)
	if elapsed > maxElapsed {
		elapsed = maxElapsed
	}
 
	// 计算这段时间生成的 token 数量,如果大于桶的容量,就取桶的容量
	delta := lim.limit.tokensFromDuration(elapsed)
	tokens := lim.tokens + delta
	if burst := float64(lim.burst); tokens > burst {
		tokens = burst
	}
 
	return now, last, tokens
}

这个比较有意思的是先去计算了时间的最大值,因为初始化的时候没为 last 赋值,所以 now.Before(last) 出来的结果可能是一个很大的值,再去计算 tokens 数量很可能溢出

durationFromTokens 根据 tokens 的数量计算需要花费的时间

func (limit Limit) durationFromTokens(tokens float64) time.Duration {
	seconds := tokens / float64(limit)
	return time.Nanosecond * time.Duration(1e9*seconds)
}

tokensFromDuration根据时间计算 tokens 的数量

func (limit Limit) tokensFromDuration(d time.Duration) float64 {
	// 这里通过拆分整数和小数部分可以减少时间上的误差
	sec := float64(d/time.Second) * float64(limit)
	nsec := float64(d%time.Second) * float64(limit)
	return sec + nsec/1e9
}

消费 token 的逻辑就讲完了,大概总结一下

  • 需要消费的时候, 先去计算一下,从过去到现在可以生成多少个token
  • 然后通过需要的 token 减去现在拥有的token数量,就得到了需要预约的token数量
  • 再通过token数量 转换成时间,就可以得到需要等待的时间长度,以及是否可以消费
  • 然后再通过不同的消费方式进行消费

5. WaitN

// ctx 用于控制超时, n 是需要消费的 token 数量,如果 context 的 Deadline 早于要等待的时间就会直接返回失败
func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) {
	lim.mu.Lock()
	burst := lim.burst
	limit := lim.limit
	lim.mu.Unlock()
 
    // 先看一下是不是已经超出消费极限了
	if n > burst && limit != Inf {
		return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, burst)
	}
 
    // 如果 ctx 已经结束了也不用等了
	select {
	case <-ctx.Done():
		return ctx.Err()
	default:
	}
 
	// 计算一下可以等待的时间
	now := time.Now()
	waitLimit := InfDuration
	if deadline, ok := ctx.Deadline(); ok {
		waitLimit = deadline.Sub(now)
	}
 
	// 调用 reserveN 得到预约数据
	r := lim.reserveN(now, n, waitLimit)
 
    // 如果不 ok 说明预约不到
	if !r.ok {
		return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n)
	}
 
	// 如果可以预约到,计算一下需要等多久
	delay := r.DelayFrom(now)
	if delay == 0 {
		return nil
	}
 
    // 启动一个 timer 进行定时
	t := time.NewTimer(delay)
	defer t.Stop()
	select {
	case <-t.C:
		// We can proceed.
		return nil
	case <-ctx.Done():
		// 如果 context 主动取消了,那么之前预约的 token 数量需要归还
		r.Cancel()
		return ctx.Err()
	}
}

6. CancelAt

WaitN 当中如果预约上了,但是 Context 取消了,会调用 CancelAt 归还 tokens, 实现原理如下

func (r *Reservation) CancelAt(now time.Time) {
    // 不 ok 说明没有预约上,直接返回就行了
	if !r.ok {
		return
	}
 
	r.lim.mu.Lock()
	defer r.lim.mu.Unlock()
 
    // 如果没有速率限制,或者没有消费 token 或 token 已经被消费了,都不用还了
	if r.lim.limit == Inf || r.tokens == 0 || r.timeToAct.Before(now) {
		return
	}
 
	// 计算需要还的 token 数量
    // 这里说是需要减去已经预支的 token 数量,但是我发现应该是个 bug,感觉这里减重复了
	restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct))
	if restoreTokens <= 0 {
		return
	}
 
    // 计算当前拥有的 tokens 数量
	now, _, tokens := r.lim.advance(now)
 
	// 当前拥有的加上需要归还的就是现有的,但是不能大于桶的容量
	tokens += restoreTokens
	if burst := float64(r.lim.burst); tokens > burst {
		tokens = burst
	}
 
	// 更新 tokens 数量
	r.lim.last = now
	r.lim.tokens = tokens
 
    // 如果相等说明后面没有新的 token 消费,所以将状态重置到上一次
	if r.timeToAct == r.lim.lastEvent {
		prevEvent := r.timeToAct.Add(r.limit.durationFromTokens(float64(-r.tokens)))
		if !prevEvent.Before(now) {
			r.lim.lastEvent = prevEvent
		}
	}
 
	return
}

7. 参考

[1]三.Go微服务–令牌桶实现原理

br>


文章作者: Alex
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Alex !
  目录