赞
赏
groupcache 中的 singleflight 用于控制多个相同的并发请求只查询一次,从而优化查询效率,具体的代码在 singleflight 文件夹下面的 singleflight.go 文件中,我们先写个代码,看具体如何使用,具体代码如下:
package main
import (
"fmt"
"github.com/golang/groupcache/singleflight"
"time"
)
func main(){
fmt.Println("嗨客网(www.haicoder.net)")
singleDoGroup := singleflight.Group{}
go func() {
retVal, err := singleDoGroup.Do("haicoder", func() (data interface{}, e error) {
time.Sleep(time.Second * 5)
fmt.Println("Doing1...")
return "HAICODER1", nil
})
if err != nil{
fmt.Println("Do Err1, Err =", err)
}else{
fmt.Println("RetVal1 =", retVal)
}
}()
go func() {
time.Sleep(time.Second)
retVal, err := singleDoGroup.Do("haicoder", func() (data interface{}, e error) {
fmt.Println("Doing2...")
return "HAICODER1", nil
})
if err != nil{
fmt.Println("Do Err2, Err =", err)
}else{
fmt.Println("RetVal2 =", retVal)
}
}()
time.Sleep(time.Second*10)
}
执行完毕后,控制台输出如下:
我们创建了两个 协程 用于同时模拟请求键为 haicoder 的数据,并且我们在第一个协程里面使用了 Sleep 模拟了耗时操作,在第二个协程开始的时候我们使用了 Sleep 等待了 1 秒钟,从而让第一个协程先执行。
在第二个协程里面我们同样模拟了请求键为 haicoder 的数据,从运行结果来看,第二个请求被第一个请求阻塞了,即,第二个请求虽然已经在执行了,但并没有出结果,一直在等待第一个请求的结束,因为第二个请求的键与第一个请求的键相同。
即,我们通过 singleflight 实现了合并多个相同的请求为一个请求。现在,我们修改你代码如下,请求不同的键:
package main
import (
"fmt"
"github.com/golang/groupcache/singleflight"
"time"
)
func main(){
fmt.Println("嗨客网(www.haicoder.net)")
singleDoGroup := singleflight.Group{}
go func() {
retVal, err := singleDoGroup.Do("haicoder", func() (data interface{}, e error) {
time.Sleep(time.Second * 5)
fmt.Println("Doing1...")
return "HAICODER1", nil
})
if err != nil{
fmt.Println("Do Err1, Err =", err)
}else{
fmt.Println("RetVal1 =", retVal)
}
}()
go func() {
time.Sleep(time.Second)
retVal, err := singleDoGroup.Do("haicoder1", func() (data interface{}, e error) {
fmt.Println("Doing2...")
return "HAICODER2", nil
})
if err != nil{
fmt.Println("Do Err2, Err =", err)
}else{
fmt.Println("RetVal2 =", retVal)
}
}()
time.Sleep(time.Second*10)
}
执行完毕后,控制台输出如下:
这次,我们可以看到,第二个请求并没有被阻塞,因为第二个请求的键与第一个不一样。
我们查看 singleflight.go 文件,首先看到的是 call 结构体,具体代码如下:
// call is an in-flight or completed Do call
type call struct {
wg sync.WaitGroup
val interface{}
err error
}
calll 的结构就是一个 WaitGroup 类型的属性一个 接口类型 的 val 和一个 err,这里的 val 和 err 正好是下面我们要使用的 Do 方法回调的返回值,因为在 Do 函数的实现中,需要将回调的具体值赋值给 call 对象的 val 和 err 属性。接着,我们来看 Group 结构,具体代码如下:
// Group represents a class of work and forms a namespace in which
// units of work can be executed with duplicate suppression.
type Group struct {
mu sync.Mutex // protects m
m map[string]*call // lazily initialized
}
Group 结构的第一个 互斥锁 用于保护下面的 map 的并发安全,接着下面的 m 是一个 map 类型的结构,其中的键为 string 类型的,就是我们请求传递的键,最后我们来看 Do 函数的实现代码,具体代码如下:
// Do executes and returns the results of the given function, making
// sure that only one execution is in-flight for a given key at a
// time. If a duplicate comes in, the duplicate caller waits for the
// original to complete and receives the same results.
func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
//首先使用Group对象的mu实例进行加锁,保证并发安全
g.mu.Lock()
//如果map为空,则使用make创建下
if g.m == nil {
g.m = make(map[string]*call)
}
//判断当前请求的键是否已经存在map中了
if c, ok := g.m[key]; ok {
//如果已经存在了,则先解锁
g.mu.Unlock()
//使用WaitGroup对象的Wait方法,等待call对象的结束
c.wg.Wait()
//结束后,返回call对象的val值
return c.val, c.err
}
//如果键不存在map中,则先创建一个call对象
c := new(call)
//先使用WaitGroup对象Add
c.wg.Add(1)
//存放到map中
g.m[key] = c
//解锁
g.mu.Unlock()
//执行传入的函数,并将返回值返回给call对象
c.val, c.err = fn()
//使用Done表明这个call对象执行完毕
c.wg.Done()
//再次加锁
g.mu.Lock()
//从map中删除这个键
delete(g.m, key)
//解锁
g.mu.Unlock()
return c.val, c.err
}
Do 方法实现的原理就是,一个请求过来的时候,首先查看 Group 对象的 map 中是否存在相同的请求,如果有,则使用该请求的 WaitGroup 对象进行等待,如果没有,则在请求开始的时候,加入到 map 中,并使用 WaitGroup 对象的 Add 方法使相同的请求进行阻塞,接着,开始执行传入的回调函数,执行完毕后,赋值给 call 对象,并使用 WaitGroup 对象的 Done 方法,表明请求结束,此时,如果有相同的请求的 Wait 方法就会接触阻塞,最后,再从 map 中删除该请求即可。