0%

singleflight和backoff包介绍和组合使用

​ 业务中我们经常遇到一些重复使用的轮子代码,本篇介绍了 singleflight 和 backoff 以及本地缓存!来提高我们平时业务开发的效率和代码的精简度!

singleflight

介绍

  1. 源码位置: https://github.com/golang/groupcache/tree/master/singleflight 或者 golang.org/x/sync/singleflight

  2. 主要是google 开源的group cache 封装的sdk,目的是为了解决 cache 回源的时候,容易出现并发加载一个或者多个key,导致缓存击穿

  3. 其中 golang.org/x/sync/singleflight 它提供了更多方法,比如异步加载等!但是代码量增加了很多,比如很多异步的bug之类的!

简单使用

  1. 简单模拟100个并发请求去加载k1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package main

import (
"fmt"
"sync"

"github.com/golang/groupcache/singleflight"
)

var (
cache sync.Map
sf = singleflight.Group{}
)

func main() {
key := "k1" // 假如现在有100个并发请求访问 k1
wg := sync.WaitGroup{}
wg.Add(100)
for x := 0; x < 100; x++ {
go func() {
defer wg.Done()
loadKey(key)
}()
}
wg.Wait()
fmt.Printf("result key: %s\n", loadKey(key))
}
func loadKey(key string) (v string) {
if data, ok := cache.Load(key); ok {
return data.(string)
}
data, err := sf.Do(key, func() (interface{}, error) {
data := "data" + "|" + key
fmt.Printf("load and set success, data: %s\n", data)
cache.Store(key, data)
return data, nil
})
if err != nil {
// todo handler
panic(err)
}
return data.(string)
}

// output
//load and set success, data: data|k1
//load and set success, data: data|k1
//result key: data|k1

可以看到输出中,其中有2次去 loadKeyFromRemote 去加载,并没有做到完全防止得到的作用

  1. 如何解决上诉问题了,问题出在哪了?我们进行简单的源码分析

源码分析

  1. 数据结构
1
2
3
4
5
6
7
8
9
10
11
12
13
// call is an in-flight or completed Do call
type call struct {
wg sync.WaitGroup
val interface{}
err error
}

// Group represents a class of work and forms a namespace in which
// units of work can be executed with duplicate suppression.
type Group struct {
mu sync.Mutex // protects m
m map[string]*call // lazily initialized // 懒加载
}
  1. 主逻辑
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
g.mu.Lock() // lock
if g.m == nil { // 懒加载
g.m = make(map[string]*call)
}
// 如果key存在,则wait
if c, ok := g.m[key]; ok {
g.mu.Unlock()
c.wg.Wait()
return c.val, c.err
}
// new caller + wg add + set
c := new(call)
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()

// 调用方法
c.val, c.err = fn()
// notify
c.wg.Done()

// 删除key,防止内存泄漏
g.mu.Lock()
delete(g.m, key)
g.mu.Unlock()

return c.val, c.err
}

  • (1) 首先会去初始化一个 caller,然后waitgroup ++ ,然后set [锁]
  • (2) 然后调用方法,再done [无锁]
  • (3) 最后删除 key [锁]
  • (4) 其他同一个key并发请求,会发现key存在,则直接wait了!

假如现在并发请求,那么此时假如都加载同一个key,那么只有一个key先经过,但是计算机执行的很快,在第(2)和(3)步执行的很快,导致key已经删除,但是还有请求未开始 Do 方法或者到了 g.m[key] 这一步,都是会再次重新走一遍

  1. 问题? 能不能使用读写锁优化加锁了?

假如读取key加的读锁,那么此时最长流程变为: 读锁 + 写锁 + 写锁, 最短流程变为: 读锁, 当特别高的并发才会有较为大的提升!

优化后用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package main

import (
"fmt"
"sync"

"github.com/golang/groupcache/singleflight"
)

var (
cache sync.Map
sf = singleflight.Group{}
)

func main() {
key := "k1" // 假如现在有100个并发请求访问 k1
wg := sync.WaitGroup{}
wg.Add(100)
for x := 0; x < 100; x++ {
go func() {
defer wg.Done()
loadKey(key)
}()
}
wg.Wait()
fmt.Printf("result key: %s\n", loadKey(key))
}
func loadKey(key string) (v string) {
if data, ok := cache.Load(key); ok {
return data.(string)
}
data, err := sf.Do(key, func() (interface{}, error) {
if data, ok := cache.Load(key); ok { // 双重检测
return data.(string), nil
}
data := "data" + "|" + key
fmt.Printf("load and set success, data: %s\n", data)
cache.Store(key, data)
return data, nil
})
if err != nil {
// todo handler
panic(err)
}
return data.(string)
}

// output
//load and set success, data: data|k1
//result key: data|k1

backoff

介绍

  1. 源码地址: github.com/cenkalti/backoff

  2. 主要是解决补偿的操作,当业务/方法遇到异常的情况,通常会有补偿的操作,一般就是业务继续重试

  3. 我经常使用这个包做重试,感觉比较好用!不用自己写for循环了

简单使用

  1. 模拟一个异常,去加载一个data数据,当遇到偶数的时候就爆异常!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package main

import (
"fmt"
"math/rand"
"time"

"github.com/cenkalti/backoff"
)

func main() {
var (
data interface{}
)

if err := backoff.Retry(func() error {
if rand.Int()%2 == 0 { // 模拟异常
err := fmt.Errorf("find data mod 2 is zero")
fmt.Printf("find err, err: %s\n", err)
return err
}
data = "load success"
return nil
}, backoff.WithMaxRetries(backoff.NewConstantBackOff(time.Millisecond*1), 3)); err != nil {
panic(err)
}

fmt.Printf("data: %s\n", data)
}

//output
//find err, err: find data mod 2 is zero
//data: load success

结果可以看到很好的解决了重试的问题!代码很优雅!

  1. 关于为啥业务中重试都喜欢等待一下,其实比较佛学!

sdk介绍

  1. back off
1
2
3
4
5
6
7
8
9
type BackOff interface {
// NextBackOff returns the duration to wait before retrying the operation,
// or backoff. Stop to indicate that no more retries should be made.
// 是否下一次,以及下一次需要等待的时间!
NextBackOff() time.Duration

// Reset to initial state.
Reset()
}
  1. 封装了四个基本的Backoff
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// 不需要等待,继续重试
type ZeroBackOff struct{}

func (b *ZeroBackOff) Reset() {}

func (b *ZeroBackOff) NextBackOff() time.Duration { return 0 }


// 不允许重试
type StopBackOff struct{}

func (b *StopBackOff) Reset() {}

func (b *StopBackOff) NextBackOff() time.Duration { return Stop }

// 每次重试等待相同的时间
type ConstantBackOff struct {
Interval time.Duration
}

func (b *ConstantBackOff) Reset() {}
func (b *ConstantBackOff) NextBackOff() time.Duration { return b.Interval }

func NewConstantBackOff(d time.Duration) *ConstantBackOff {
return &ConstantBackOff{Interval: d}
}


// 重试back off,主要是计数重试的次数,以及基于委托代理模型,实现比较好的拓展
// max=0 会无限重试下去
func WithMaxRetries(b BackOff, max uint64) BackOff {
return &backOffTries{delegate: b, maxTries: max}
}

type backOffTries struct {
delegate BackOff
maxTries uint64
numTries uint64
}

func (b *backOffTries) NextBackOff() time.Duration {
if b.maxTries > 0 {
if b.maxTries <= b.numTries {
return Stop
}
b.numTries++
}
return b.delegate.NextBackOff()
}

func (b *backOffTries) Reset() {
b.numTries = 0
b.delegate.Reset()
}
  1. 自适应backoff

整个时间 < 15min,重试时间从500ms开始增长,每次增长1.5倍,直到60s每次!

1
2
3
4
5
6
7
8
9
10
11
12
13
// NewExponentialBackOff creates an instance of ExponentialBackOff using default values.
func NewExponentialBackOff() *ExponentialBackOff {
b := &ExponentialBackOff{
InitialInterval: DefaultInitialInterval,
RandomizationFactor: DefaultRandomizationFactor,
Multiplier: DefaultMultiplier,
MaxInterval: DefaultMaxInterval,
MaxElapsedTime: DefaultMaxElapsedTime,
Clock: SystemClock,
}
b.Reset()
return b
}

组合使用,构建一个本地缓存!

这个应该是日常开发中经常用到的,本地缓存可以有效解决高频数据但是数据整体占用并不是特别的大,但是每次加载都需要额外的开销,所以基于本地缓存去构建一个可用性比较高的缓存框架!

  1. 核心代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package main

import (
"context"
"fmt"
"time"

"github.com/cenkalti/backoff"
"golang.org/x/sync/singleflight"
)

var (
localCacheCallbackIsNil = fmt.Errorf("cache callback func is nil")
)

type CacheOption interface {
}
type Cache interface {
Get(key string) (value interface{}, isExist bool)
Set(key string, value interface{}, opts ...CacheOption)
}
type WrapperCache interface {
GetData(ctx context.Context, key string, callback func(ctx context.Context) (interface{}, error)) (v interface{}, err error)
}

type wrapperCache struct {
name string
cache Cache
singleflight singleflight.Group
retrySleepTime time.Duration
retryNum uint64
}

func NewWrapperCache(name string, cache Cache) WrapperCache {
return &wrapperCache{
name: name,
cache: cache,
retryNum: 3,
retrySleepTime: time.Millisecond * 10,
}
}

// emitHitCachedMetric 计算缓存命中率
func (c *wrapperCache) emitHitCachedMetric(hit bool) {

}
func (c *wrapperCache) GetData(ctx context.Context, key string, callback func(ctx context.Context) (interface{}, error)) (v interface{}, err error) {
if result, isExist := c.cache.Get(key); isExist {
c.emitHitCachedMetric(true)
return result, nil
}
if callback == nil {
return nil, localCacheCallbackIsNil
}
c.emitHitCachedMetric(false)

result, err, _ := c.singleflight.Do(key, func() (interface{}, error) {
// 双重检测,防止singleflight 锁的key失效
if result, isExist := c.cache.Get(key); isExist {
return result, nil
}
var callBackData interface{}
if err := backoff.Retry(func() error {
if data, err := callback(ctx); err != nil {
return err
} else {
callBackData = data
return nil
}
}, backoff.WithMaxRetries(backoff.NewConstantBackOff(c.retrySleepTime), c.retryNum)); err != nil {
// todo add log
return nil, err
}
c.cache.Set(key, callBackData)
return callBackData, nil
})

if err != nil {
return nil, err
}
return result, nil
}
  1. cache 实现

这里介绍一下sync.Map为一个无过期的本地缓存和 go-cache有ttl的缓存框架!或者你自己去实现一个也可以!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import (
"sync"
"github.com/patrickmn/go-cache"
)
type localCache struct {
sync.Map
}

func (l *localCache) Get(key string) (value interface{}, isExist bool) {
return l.Load(key)
}

func (l *localCache) Set(key string, value interface{}, opts ...CacheOption) {
l.Store(key, value)
}

type goCache struct {
*cache.Cache
}

func (l goCache) Set(key string, value interface{}, opts ...CacheOption) {
l.SetDefault(key, value)
}
  1. 测试用例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import (
"context"
"strconv"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/patrickmn/go-cache"
"github.com/stretchr/testify/assert"
)

func TestNewCached(t *testing.T) {
cached := NewWrapperCache("test", goCache{
Cache: cache.New(time.Second*10, time.Second*30),
})
//cached := NewWrapperCache("test", &localCache{})
ctx := context.Background()
wg := sync.WaitGroup{}
var (
loadTime uint64 = 0
currG = 20
)
wg.Add(currG)
for x := 0; x < currG; x++ {
go func(x int) {
defer wg.Done()
for y := 0; y < 200000; y++ {
key := y % 10
result, err := cached.GetData(ctx, strconv.Itoa(key), func(ctx context.Context) (interface{}, error) {
atomic.AddUint64(&loadTime, 1)
t.Logf("load key: %s, num: %d, g_id: %d\n", strconv.Itoa(key), y, x)
return int(key), nil
})
if err != nil {
t.Fatal(err)
}
if result.(int) != key {
t.Fatal("data is not eq err")
}
}
}(x)
}
wg.Wait()
for x := 0; x < 10; x++ {
result, _ := cached.GetData(ctx, strconv.Itoa(x), nil)
t.Log(result)
assert.Equal(t, result.(int), int(x))
}

assert.Equal(t, int(loadTime), int(10))
}
本人坚持原创技术分享,如果你觉得文章对您有用,请随意打赏! 如果有需要咨询的请发送到我的邮箱!