Main Routine Locked 2 not get lock, waiting... 0 not get lock, waiting... 1 not get lock, waiting... Main Routine Unlocked 2 get lock, doing... 2 Unlocked 0 get lock, doing... 0 Unlocked 1 get lock, doing... 1 Unlocked
// call is an in-flight or completed singleflight.Do call type call struct { wg sync.WaitGroup
// These fields are written once before the WaitGroup is done // and are only read after the WaitGroup is done. // 函数返回值,在wg.Done前只会写入一次,在wg.Done后是只读的。 val interface{} err error
// forgotten indicates whether Forget was called with this call's key // while the call was still in flight. // 标识Forget方法是否被调用 forgotten bool
// These fields are read and written with the singleflight // mutex held before the WaitGroup is done, and are read but // not written after the WaitGroup is done. // 统计调用次数 dups int // 返回的 channel chans []chan<- Result }
Group
1 2 3 4 5 6 7 8
// Group represents a class of work and forms a namespace in // which units of work can be executed with duplicate suppression. type Group struct { // 互斥锁 mu sync.Mutex // protects m // 映射表,调用key->调用,懒加载, m map[string]*call // lazily initialized }
// Do executes and returns the results of the given function, making // sure that only one execution is in-flight for a given key at a // time. If a duplicate comes in, the duplicate caller waits for the // original to complete and receives the same results. // The return value shared indicates whether v was given to multiple callers. // Do执行和返回给定函数的值,确保某一个时间只有一个方法被执行。如果一个重复的请求进入,则重复的请求会等待前一个执行完毕并获取相同的数据,返回值shared标识返回值v是否是传递给重复的调用的 func(g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) { g.mu.Lock() if g.m == nil { // 懒加载,初始化 g.m = make(map[string]*call) } // 检查指定key是否已存在请求 if c, ok := g.m[key]; ok { // 已存在则解锁,调用次数+1, c.dups++ g.mu.Unlock() // 然后等待 call.wg(WaitGroup) 执行完毕,只要一执行完,所有的 wait 都会被唤醒 c.wg.Wait()
// 我的Go知识还没学到异常,暂且不表: // 这里区分 panic 错误和 runtime 的错误,避免出现死锁,后面可以看到为什么这么做[4] if e, ok := c.err.(*panicError); ok { panic(e) } elseif c.err == errGoexit { runtime.Goexit() } return c.val, c.err, true } // 如果我们没有找到这个 key 就 new call c := new(call) // 然后调用 waitgroup 这里只有第一次调用会 add 1,其他的都会调用 wait 阻塞掉 // 所以只要这次调用返回,所有阻塞的调用都会被唤醒 c.wg.Add(1) g.m[key] = c g.mu.Unlock() // 实际执行fn g.doCall(c, key, fn) return c.val, c.err, c.dups > 0 }
doCall
由于本人Go的知识面还没有覆盖到Go的异常部分,其对异常的处理暂且不表,借用文章与代码中的注释的说法:使用了两个 defer 巧妙的将 runtime 的错误和我们传入 function 的 panic 区别开来避免了由于传入的 function panic 导致的死锁
// doCall handles the single call for a key. func(g *Group) doCall(c *call, key string, fn func() (interface{}, error)) { // 表示方法是否正常返回 normalReturn := false recovered := false
// use double-defer to distinguish panic from runtime.Goexit, // more details see https://golang.org/cl/134395 deferfunc() { // the given function invoked runtime.Goexit // 如果既没有正常执行完毕,又没有 recover 那就说明需要直接退出了 if !normalReturn && !recovered { c.err = errGoexit }
// 下面应该主要是异常处理的diamante if e, ok := c.err.(*panicError); ok { // In order to prevent the waiting channels from being blocked forever, // needs to ensure that this panic cannot be recovered. iflen(c.chans) > 0 { gopanic(e) select {} // Keep this goroutine around so that it will appear in the crash dump. } else { panic(e) } } elseif c.err == errGoexit { // Already in the process of goexit, no need to call again } else { // Normal return for _, ch := range c.chans { ch <- Result{c.val, c.err, c.dups > 0} } } }()
func() { // 使用一个匿名函数来执行实际的fn deferfunc() { if !normalReturn { // Ideally, we would wait to take a stack trace until we've determined // whether this is a panic or a runtime.Goexit. // // Unfortunately, the only way we can distinguish the two is to see // whether the recover stopped the goroutine from terminating, and by // the time we know that, the part of the stack trace relevant to the // panic has been discarded. if r := recover(); r != nil { c.err = newPanicError(r) } } }() // 方法实际执行,将值存在c.val中 c.val, c.err = fn() normalReturn = true }()