Skip to content

Commit

Permalink
fix: bbr limiter maxPass minRt cache problem (#690)
Browse files Browse the repository at this point in the history
  • Loading branch information
Snoopyjoy authored Feb 17, 2021
1 parent 9270386 commit 3daca16
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 19 deletions.
60 changes: 44 additions & 16 deletions pkg/ratelimit/bbr/bbr.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,20 @@ type BBR struct {
rtStat metric.RollingCounter
inFlight int64
winBucketPerSec int64
bucketDuration time.Duration
winSize int
conf *Config
prevDrop atomic.Value
prevDropHit int32
rawMaxPASS int64
rawMinRt int64
maxPASSCache atomic.Value
minRtCache atomic.Value
}

// CounterCache is used to cache maxPASS and minRt result.
// Value of current bucket is not counted in real time.
// Cache time is equal to a bucket duration.
type CounterCache struct {
val int64
time time.Time
}

// Config contains configs of bbr limiter.
Expand All @@ -89,11 +98,14 @@ type Config struct {
}

func (l *BBR) maxPASS() int64 {
rawMaxPass := atomic.LoadInt64(&l.rawMaxPASS)
if rawMaxPass > 0 && l.passStat.Timespan() < 1 {
return rawMaxPass
passCache := l.maxPASSCache.Load()
if passCache != nil {
ps := passCache.(*CounterCache)
if l.timespan(ps.time) < 1 {
return ps.val
}
}
rawMaxPass = int64(l.passStat.Reduce(func(iterator metric.Iterator) float64 {
rawMaxPass := int64(l.passStat.Reduce(func(iterator metric.Iterator) float64 {
var result = 1.0
for i := 1; iterator.Next() && i < l.conf.WinBucket; i++ {
bucket := iterator.Bucket()
Expand All @@ -108,16 +120,30 @@ func (l *BBR) maxPASS() int64 {
if rawMaxPass == 0 {
rawMaxPass = 1
}
atomic.StoreInt64(&l.rawMaxPASS, rawMaxPass)
l.maxPASSCache.Store(&CounterCache{
val: rawMaxPass,
time: time.Now(),
})
return rawMaxPass
}

func (l *BBR) timespan(lastTime time.Time) int {
v := int(time.Since(lastTime) / l.bucketDuration)
if v > -1 {
return v
}
return l.winSize
}

func (l *BBR) minRT() int64 {
rawMinRT := atomic.LoadInt64(&l.rawMinRt)
if rawMinRT > 0 && l.rtStat.Timespan() < 1 {
return rawMinRT
rtCache := l.minRtCache.Load()
if rtCache != nil {
rc := rtCache.(*CounterCache)
if l.timespan(rc.time) < 1 {
return rc.val
}
}
rawMinRT = int64(math.Ceil(l.rtStat.Reduce(func(iterator metric.Iterator) float64 {
rawMinRT := int64(math.Ceil(l.rtStat.Reduce(func(iterator metric.Iterator) float64 {
var result = math.MaxFloat64
for i := 1; iterator.Next() && i < l.conf.WinBucket; i++ {
bucket := iterator.Bucket()
Expand All @@ -136,7 +162,10 @@ func (l *BBR) minRT() int64 {
if rawMinRT <= 0 {
rawMinRT = 1
}
atomic.StoreInt64(&l.rawMinRt, rawMinRT)
l.minRtCache.Store(&CounterCache{
val: rawMinRT,
time: time.Now(),
})
return rawMinRT
}

Expand All @@ -151,9 +180,6 @@ func (l *BBR) shouldDrop() bool {
return false
}
if time.Since(initTime)-prevDrop <= time.Second {
if atomic.LoadInt32(&l.prevDropHit) == 0 {
atomic.StoreInt32(&l.prevDropHit, 1)
}
inFlight := atomic.LoadInt64(&l.inFlight)
return inFlight > 1 && inFlight > l.maxFlight()
}
Expand Down Expand Up @@ -226,6 +252,8 @@ func newLimiter(conf *Config) limit.Limiter {
passStat: passStat,
rtStat: rtStat,
winBucketPerSec: int64(time.Second) / (int64(conf.Window) / int64(conf.WinBucket)),
bucketDuration: bucketDuration,
winSize: conf.WinBucket,
}
return limiter
}
Expand Down
41 changes: 38 additions & 3 deletions pkg/ratelimit/bbr/bbr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,19 +86,33 @@ func TestBBRMaxPass(t *testing.T) {
assert.Equal(t, int64(1), bbr.maxPASS())
}

func TestBBRMaxPassWithCache(t *testing.T) {
bucketDuration := time.Millisecond * 100
bbr := newLimiter(confForTest()).(*BBR)
// witch cache, value of latest bucket is not counted instently.
// after a bucket duration time, this bucket will be fullly counted.
for i := 1; i <= 11; i++ {
bbr.passStat.Add(int64(i * 50))
time.Sleep(bucketDuration / 2)
_ = bbr.maxPASS()
bbr.passStat.Add(int64(i * 50))
time.Sleep(bucketDuration / 2)
}
bbr.passStat.Add(int64(1))
assert.Equal(t, int64(1000), bbr.maxPASS())
}

func TestBBRMinRt(t *testing.T) {
bucketDuration := time.Millisecond * 100
bbr := newLimiter(confForTest()).(*BBR)
rtStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration})
for i := 0; i < 10; i++ {
for j := i*10 + 1; j <= i*10+10; j++ {
rtStat.Add(int64(j))
bbr.rtStat.Add(int64(j))
}
if i != 9 {
time.Sleep(bucketDuration)
}
}
bbr.rtStat = rtStat
assert.Equal(t, int64(6), bbr.minRT())

// default max min rt is equal to maxFloat64.
Expand All @@ -108,6 +122,27 @@ func TestBBRMinRt(t *testing.T) {
assert.Equal(t, int64(1), bbr.minRT())
}

func TestBBRMinRtWithCache(t *testing.T) {
bucketDuration := time.Millisecond * 100
bbr := newLimiter(confForTest()).(*BBR)
for i := 0; i < 10; i++ {
for j := i*10 + 1; j <= i*10+5; j++ {
bbr.rtStat.Add(int64(j))
}
if i != 9 {
time.Sleep(bucketDuration / 2)
}
_ = bbr.minRT()
for j := i*10 + 6; j <= i*10+10; j++ {
bbr.rtStat.Add(int64(j))
}
if i != 9 {
time.Sleep(bucketDuration / 2)
}
}
assert.Equal(t, int64(6), bbr.minRT())
}

func TestBBRMaxQps(t *testing.T) {
bbr := newLimiter(confForTest()).(*BBR)
bucketDuration := time.Millisecond * 100
Expand Down

0 comments on commit 3daca16

Please sign in to comment.