go singleflight源码解读


写在前面

go singleflight提供了一个重复的函数调用抑制机制,这个库的主要作用就是将一组相同的请求合并成一个请求,实际上只会去请求一次,然后对所有的请求返回相同的结果。其本质是对函数调用的结果进行复用。

为什么需要singleflight

一般情况下我们对外的服务都会有一层 cache 作为缓存,用来减少底层数据库的压力,但是在遇到例如 redis 抖动或者其他情况可能会导致大量的 cache miss 出现。

如下图所示,可能存在来自桌面端和移动端的用户有 1000 的并发请求,他们都访问的获取文章列表的接口,获取前 20 条信息,如果这个时候我们服务直接去访问 redis 出现 cache miss 那么我们就会去请求 1000 次数据库,这时可能会给数据库带来较大的压力(这里的 1000 只是一个例子,实际上可能远大于这个值)导致我们的服务异常或者超时。

如下图所示,使用 singleflight 之后,我们在一个请求的时间周期内实际上只会向底层的数据库发起一次请求大大减少对数据库的压力。

应用场景

  • 应对缓存击穿

缓存在某个时间点过期的时候,恰好在这个时间点对这个Key有大量的并发请求过来,这些请求发现缓存过期一般都会从后端DB加载数据并回设到缓存,这个时候大量的请求可能会瞬间把后端DB打垮。这时可以通过使用singlefilght 可以有效合并重复请求,避免数据库被打爆。

  • 去除重复请求

与一致性hash负载均衡配合组成一个特殊的服务。用户根据key使用一致性hash请求到特定的服务机器上,服务对请求执行singlefilght后,再去请求下游,以此收束重复请求。

设计思想

singleflight为了解决请求去重的问题,使用了map[key]*call 结构,并且通过锁解决call访问的并发问题。每次请求到来,都会去根据key获取call的信息。

call内部使用了WaitGroup来管理各个并发请求,首次请求执行Add(1)操作,请求完成后执行Done()操作,其他请求wait首次请求完成并共享结果。

对于异步控制,其对chan的使用非常经典。将chan的写接口传入goroutine进行写操作,读接口交由用户自己控制。

源码阅读// Group 对外的核心结构体

type Group struct {
	mu sync.Mutex       // 保护 m
	m  map[string]*call // lazily initialized
}

// Do 执行函数, 对同一个 key 多次调用的时候,在第一次调用没有执行完的时候
// 只会执行一次 fn,其他的调用会阻塞住等待这次调用返回
// v, err 是传入的 fn 的返回值
// shared 表示fn的结果是否被共享
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool)

// DoChan 和 Do 类似,只是 DoChan 返回一个 channel,也就是同步与异步的区别
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result

// Forget 用于通知 Group 删除某个 key 这样后面继续这个 key 的调用的时候就不会在阻塞等待了
func (g *Group) Forget(key string){
	g.mu.Lock()
	if c, ok := g.m[key]; ok {
		c.forgotten = true
	}
	delete(g.m, key)
	g.mu.Unlock()
}

其将调用对象封装为call ,每个key对应一个call:

// call is an in-flight or completed singleflight.Do call
type call struct {
	wg sync.WaitGroup

	// val 和 err 是用户调用返回的字段,(err包括panic err)
        // 其在 wg.done 之前只被写入一次
	// 在 wg.done 之后,只能被读取
	val interface{}
	err error

	// 如果执行过 Forget,则会被设置为true,避免重复delete
	forgotten bool
        
        // 此 call 被额外调用的次数,也是结果被额外共享的次数(不算首次)
	dups  int
        // DoChan 的结果数组,用于一次执行后,给所有结果传值
	chans []chan<- Result
}

Do方法

// Do 内部没有额外的goroutine执行,故panic可以被捕获
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
	// 1. 加锁并懒加载内部变量
	g.mu.Lock()
	if g.m == nil {
		g.m = make(map[string]*call)
	}

	// 2. 如果callMap里存在这个key,说明此函数正在被调用
	// c 为singleflight封装的call结构体
	if c, ok := g.m[key]; ok {
		c.dups++      // 2.1 记录此call被执行的次数+1
		g.mu.Unlock() // 2.2 释放锁,让之后被调用的函数也进来
		c.wg.Wait()   // 2.3 阻塞在这里,等待函数调用完成
		
		// 2.4 函数调用完成,进行错误处理,区分系统错误or用户错误
		if e, ok := c.err.(*panicError); ok {
			panic(e)
		} else if c.err == errGoexit {
			runtime.Goexit()
		}
		// 2.5 返回函数执行结果,其结果必定是共享的结果
		return c.val, c.err, true
	}

	// 3. 首次调用,新建call
	c := new(call)
	c.wg.Add(1)
	g.m[key] = c
	g.mu.Unlock() // 完成对map操作,释放锁
	
	// 4. 执行fn函数,并将执行结果返回,并调用c.wg.Done(), 也会删除 callMap 里对应的key
	g.doCall(c, key, fn)
	return c.val, c.err, c.dups > 0 // c.dups 代表此call被额外调用次数,>0 说明结果共享
}

DoChan方法

// 存放每次调用的结果
type Result struct {
	Val    interface{}
	Err    error
	Shared bool
}

func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
	// 本次执行的结果,一次调用只有1个
	ch := make(chan Result, 1)

	// 1. 加锁并懒加载内部变量
	g.mu.Lock()
	if g.m == nil {
		g.m = make(map[string]*call)
	}
	
	// 2. 如果此key对应的函数已经在执行 
	if c, ok := g.m[key]; ok {
		c.dups++
		// 将接收调用结果的channel 加入 call 的结果channel数组
		c.chans = append(c.chans, ch)
		g.mu.Unlock()
		return ch
	}

	// 3. 如果是首次执行,创建call,加入到Group的CallMap里
	// 之所以是结果数组,是为了用于doCall一次执行后,给所有结果传值
	c := &call{chans: []chan<- Result{ch}}	// doCall内部只准写入值
	c.wg.Add(1)
	g.m[key] = c
	g.mu.Unlock()
	
	// 4. 开goroutine通过doCall执行fn
	// 执行完成后会调用c.wg.Done(), 也会删除callMap里对应的key
	go g.doCall(c, key, fn)

	return ch
}

doCall方法是核心

// doCall handles the single call for a key.
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
	normalReturn := false
	recovered := false

	// 3. 执行第二个defer,对panic或Goexit统一处理
	defer func() {
		// 既不是普通返回,也不是recover,排除法得到Goexit
		if !normalReturn && !recovered {
			c.err = errGoexit
		}

                // 3.1 执行完成,返阻塞在调用Group.Do调用
		// Group.Do的部分到这里就返回结果了
		c.wg.Done()

		// 3.2 加锁删除key
		g.mu.Lock()
		defer g.mu.Unlock()
                // 如果已经 forgot 过了,就不要重复删除这个 key 了
		if !c.forgotten {
			delete(g.m, key)
		}
		
		// 3.3 处理panic
		if e, ok := c.err.(*panicError); ok {
			// DoChan调用
			//    由于ch发生了panic,ch不会被写入,goroutine会一直阻塞,导致死锁
			//    这里通过go panic的方式保证必定panic,
			//    预防调用Group.DoChan后,外部recover导致死锁的问题
			if len(c.chans) > 0 {
				go panic(e)
				select {} // 保留这个goroutine到核心存储
			
			} else { // Do 调用,直接panic 
				panic(e)
			}

		// 3.4 处理Goexit
		} else if c.err == errGoexit {
			// 已经准备退出了,没有啥要处理的,资源在之前就已经释放完成了
		
		// 3.5 处理正常情况下的DoChan结果
		} else {
			for _, ch := range c.chans {
				ch <- Result{c.val, c.err, c.dups > 0}
			}
		}
	}()

	// 1. 首次执行此函数
	func() {
		defer func() {
			// 1.3 判断是否为正常返回
			if !normalReturn {
				// 非正常返回则recover,保留堆栈信息
				// 在下一个defer里统一处理 panic or Goexit
				if r := recover(); r != nil { 
					c.err = newPanicError(r)
				}
			}
		}()

		// 1.1 执行fn
		c.val, c.err = fn()
		
		// 1.2 执行成功则设置正常返回
		normalReturn = true // 如果fn() panic 或者 Goexit则不会执行此步骤
	}()

	// 2. 由于panic被第一个defer recover了,可以执行到此
	// Goexit会直接执行下一个defer
	if !normalReturn {
		recovered = true
	}
}

Forget方法

// Forget tells the singleflight to forget about a key.  Future calls
// to Do for this key will call the function rather than waiting for
// an earlier call to complete.
func (g *Group) Forget(key string) {
	g.mu.Lock()
	delete(g.m, key)
	g.mu.Unlock()
}

参考

[1] Go并发工具SingleFlight实现原理



文章作者: Alex
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Alex !
  目录