微服务限流之一:令牌桶算法


0. 写在前面

令牌桶算法和漏桶算法是微服务限流算法中2个比较常见的算法,微服务限流系列将从原理、使用方法、具体示例等方向详细讲解一下这2个算法。

1. 原理

  • 我们以 r/s 的速度向桶内放置令牌,桶的容量为 b , 如果桶满了令牌将会丢弃
  • 当请求到达时,我们向桶内获取令牌,如果令牌足够,我们就通过转发请求
  • 如果桶内的令牌数量不够,那么这个请求会被缓存等待令牌足够时转发,或者是被直接丢弃掉

由于桶的存在,所以令牌桶算法不仅可以限流还可以应对突发流量的情况

举个例子:假设我们桶的容量是 100,速度是 10 rps,那么在我们桶满的情况下,如果突然来 100 个请求是可以满足的,但是后续的请求就会被限制到 10 rps

存在下面两种特殊情况

  • 如果桶的容量为 0,那么相当于禁止请求,因为所有的令牌都被丢弃了
  • 如果令牌放置速率为无穷大,那么相当于没有限制

2. 官方实现

令牌桶最常见的实现就是 Go 官方的 golang.org/x/time/rate

官方实现的令牌桶算法提供了如下方法:

type Limiter struct {
	// contains filtered or unexported fields
}
 
// 构建一个限流器,r 是每秒放入的令牌数量,b 是桶的大小
func NewLimiter(r Limit, b int) *Limiter
 
// 分别返回 b 和 r 的值
func (lim *Limiter) Burst() int
func (lim *Limiter) Limit() Limit
 
// token 消费方法
func (lim *Limiter) Allow() bool
func (lim *Limiter) AllowN(now time.Time, n int) bool
func (lim *Limiter) Reserve() *Reservation
func (lim *Limiter) ReserveN(now time.Time, n int) *Reservation
func (lim *Limiter) Wait(ctx context.Context) (err error)
func (lim *Limiter) WaitN(ctx context.Context, n int) (err error)
 
// 动态流控
func (lim *Limiter) SetBurst(newBurst int)
func (lim *Limiter) SetBurstAt(now time.Time, newBurst int)
func (lim *Limiter) SetLimit(newLimit Limit)
func (lim *Limiter) SetLimitAt(now time.Time, newLimit Limit)

2.1. NewLimiter: 初始化令牌桶

直接调用 NewLimiter(r Limit, b int) 即可, r 表示每秒产生 token 的速度, b 表示桶的大小

func NewLimiter(r Limit, b int) *Limiter

2.2. token消费

总共有三种 token 消费的方式,最常用的是使用 Wait 阻塞等待

Allow

Allow 就是 AllowN(now,1) 的别名, AllowN 表示截止到 now 这个时间点,是否存在 n 个 token,如果存在那么就返回 true 反之返回 false,如果我们限流比较严格,没有资源就直接丢弃, 可以使用这个方法

func (lim *Limiter) Allow() bool
func (lim *Limiter) AllowN(now time.Time, n int) bool

Reserve

同理 Reserve 也是 ReserveN(now, 1) 的别名, ReserveN 其实和 AllowN 类似,表示截止到 now 这个时间点,是否存在 n 个 token,只是 AllowN 直接返回 true or false,但是 ReserveN 返回一个 Reservation 对象

func (lim *Limiter) Reserve() *Reservation
func (lim *Limiter) ReserveN(now time.Time, n int) *Reservation

Reservation 有 5 个方法,通过调用 OK 我们可以知道是否通过等待可以获取到 N 个 token,如果可以, 那通过Delay 方法我们可以得知需要等待的时间,如果我们不想等了可以调用 Cancel 方法归还 token

type Reservation
    func (r *Reservation) Cancel()
    func (r *Reservation) CancelAt(now time.Time)
    func (r *Reservation) Delay() time.Duration
    func (r *Reservation) DelayFrom(now time.Time) time.Duration
    func (r *Reservation) OK() bool

Wait

Wait 是最常用的, Wait 是 WaitN(ctx, 1) 的别名, WaitN(ctx, n) 表示如果存在 n 个令牌就直接转发,不存在我们就等,等待存在为止,传入的 ctx 的 Deadline 就是等待的 Deadline

func (lim *Limiter) Wait(ctx context.Context) (err error)
func (lim *Limiter) WaitN(ctx context.Context, n int) (err error)

2.3 动态流控

通过调用 SetBurst 和 SetLimit 可以动态的设置桶的大小和 token 生产速率,其中 SetBurstAt 和 SetLimitAt 会将传入的时间 now 设置为流控最后的更新时间

func (lim *Limiter) SetBurst(newBurst int)
func (lim *Limiter) SetBurstAt(now time.Time, newBurst int)
func (lim *Limiter) SetLimit(newLimit Limit)
func (lim *Limiter) SetLimitAt(now time.Time, newLimit Limit)

3. 用法

3.1. 基于ip的gin限流中间件

主要就是使用了 sync.map 来为每一个 ip 创建一个 limiter,当然这个 key 也可以是其他的值,例如用户名等

func NewLimiter(r rate.Limit, b int, t time.Duration) gin.HandlerFunc {
	limiters := &sync.Map{}
	return func(c *gin.Context) {
		// 获取限速器
		// key 除了 ip 之外也可以是其他的,例如 header,user name 等
		key := c.ClientIP()
		l, _ := limiters.LoadOrStore(key, rate.NewLimiter(r, b))

		// 这里注意不要直接使用 gin 的 context 默认是没有超时时间的
		ctx, cancel := context.WithTimeout(c, t)
		defer cancel()

		if err := l.(*rate.Limiter).Wait(ctx); err != nil {
			// 这里先不处理日志了,如果返回错误就直接 429
			c.AbortWithStatusJSON(http.StatusTooManyRequests, gin.H{"error": err.Error()})
		}
		c.Next()
	}
}

使用的时候只需要 use 一下中间件就可以了

func main() {
	e := gin.Default()
	// 新建一个限速器,允许突发 10 个并发,限速 3rps,超过 500ms 就不再等待
	e.Use(NewLimiter(3, 10, 500*time.Millisecond))
	e.GET("ping", func(c *gin.Context) {
		c.String(http.StatusOK, "pong")
	})
	e.Run(":9877")
}

3.2. 压测

我们使用 go-stress-testing 来压测一下,20 个并发

(base) ➜  go-stress-testing git:(master) ./go-stress-testing  -c 20 -n 1 -u http://127.0.0.1:9877/ping

 开始启动  并发数:20 请求数:1 请求参数:
request:
 form:http
 url:http://127.0.0.1:9877/ping
 method:GET
 headers:map[Content-Type:application/x-www-form-urlencoded; charset=utf-8]
 data:
 verify:statusCode
 timeout:30s
 debug:false
 http2.0:false
 keepalive:false
 maxCon:1


─────┬───────┬───────┬───────┬────────┬────────┬────────┬────────┬────────┬────────┬────────
 耗时│ 并发数│ 成功数│ 失败数│   qps  │最长耗时│最短耗时│平均耗时│下载字节│字节每秒│ 状态码
─────┼───────┼───────┼───────┼────────┼────────┼────────┼────────┼────────┼────────┼────────
   0s│     20│     11│      9│  494.15│  342.18│    2.73│   40.47│   1,283│   3,728│200:11;429:9


*************************  结果 stat  ****************************
处理协程数量: 20
请求总数(并发数*请求数 -c * -n): 20 总请求时间: 0.344 秒 successNum: 11 failureNum: 9
tp90: 9.000
tp95: 342.000
tp99: 342.000
*************************  结果 end   ****************************

可以发现总共成功了 11 个请求,失败了 9 个,这是因为我们桶的大小是 10 ,所以前 10 个请求都很快就结束了,第 11 个请求等待 333.3 ms 就可以完成,小于超时时间 500ms,所以可以放行,但是后面的请求确是等不了了,所以就都失败了,并且可以看到最后一个成功的请求的耗时为 336.83591ms 而其他的请求耗时都很短

4. 参考

[1]二.Go微服务–令牌桶



文章作者: Alex
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Alex !
  目录