Skip to content

Commit

Permalink
filters/auth: use sync.Map for tokeninfo cache
Browse files Browse the repository at this point in the history
* use sync.Map for tokeninfo cache to avoid synchronizing all callers
  on a single mutex
* evict stale entries periodically instead of least recently used
* store token expiration time instead of creation time

```
                                                  │      master      │                HEAD                 │
                                                  │      sec/op      │   sec/op     vs base                │
TokeninfoCache/tokens=1,cacheSize=1,p=0-8               275.5n ±  6%   170.1n ± 4%  -38.26% (p=0.000 n=10)
TokeninfoCache/tokens=2,cacheSize=2,p=0-8               492.9n ± 21%   176.8n ± 2%  -64.12% (p=0.000 n=10)
TokeninfoCache/tokens=100,cacheSize=100,p=0-8           455.9n ±  7%   165.5n ± 1%  -63.70% (p=0.000 n=10)
TokeninfoCache/tokens=100,cacheSize=100,p=10000-8       593.4n ±  4%   179.8n ± 4%  -69.71% (p=0.000 n=10)
TokeninfoCache/tokens=4,cacheSize=2,p=0-8           2571424.0n ±  0%   149.7n ± 3%  -99.99% (p=0.000 n=10)
TokeninfoCache/tokens=100,cacheSize=10,p=0-8        2579227.5n ±  0%   139.3n ± 1%  -99.99% (p=0.000 n=10)
geomean                                                 7.903µ         162.9n       -97.94%

                                                  │   master   │                  HEAD                   │
                                                  │    B/op    │    B/op      vs base                    │
TokeninfoCache/tokens=1,cacheSize=1,p=0-8           344.0 ± 0%    344.0 ± 0%          ~ (p=1.000 n=10) ¹
TokeninfoCache/tokens=2,cacheSize=2,p=0-8           344.0 ± 0%    344.0 ± 0%          ~ (p=1.000 n=10) ¹
TokeninfoCache/tokens=100,cacheSize=100,p=0-8       344.0 ± 0%    344.0 ± 0%          ~ (p=1.000 n=10) ¹
TokeninfoCache/tokens=100,cacheSize=100,p=10000-8   368.0 ± 1%    350.0 ± 0%     -4.89% (p=0.000 n=10)
TokeninfoCache/tokens=4,cacheSize=2,p=0-8           27.00 ± 0%   344.00 ± 0%  +1174.07% (p=0.000 n=10)
TokeninfoCache/tokens=100,cacheSize=10,p=0-8        27.00 ± 7%   344.00 ± 0%  +1174.07% (p=0.000 n=10)
geomean                                             149.0         345.0        +131.62%
¹ all samples are equal

                                                  │    master    │              HEAD              │
                                                  │  allocs/op   │ allocs/op   vs base            │
TokeninfoCache/tokens=1,cacheSize=1,p=0-8           3.000 ± 0%     3.000 ± 0%  ~ (p=1.000 n=10) ¹
TokeninfoCache/tokens=2,cacheSize=2,p=0-8           3.000 ± 0%     3.000 ± 0%  ~ (p=1.000 n=10) ¹
TokeninfoCache/tokens=100,cacheSize=100,p=0-8       3.000 ± 0%     3.000 ± 0%  ~ (p=1.000 n=10) ¹
TokeninfoCache/tokens=100,cacheSize=100,p=10000-8   3.000 ± 0%     3.000 ± 0%  ~ (p=1.000 n=10) ¹
TokeninfoCache/tokens=4,cacheSize=2,p=0-8           0.000 ± 0%     3.000 ± 0%  ? (p=0.000 n=10)
TokeninfoCache/tokens=100,cacheSize=10,p=0-8        0.000 ± 0%     3.000 ± 0%  ? (p=0.000 n=10)
geomean                                                        ²   3.000       ?
¹ all samples are equal
² summaries must be >0 to compute geomean
```

Signed-off-by: Alexander Yastrebov <[email protected]>
  • Loading branch information
AlexanderYastrebov committed Oct 10, 2024
1 parent 3d9c020 commit 818dd4d
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 83 deletions.
1 change: 1 addition & 0 deletions filters/auth/authclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type authClient struct {

type tokeninfoClient interface {
getTokeninfo(token string, ctx filters.FilterContext) (map[string]any, error)
Close()
}

var _ tokeninfoClient = &authClient{}
Expand Down
6 changes: 1 addition & 5 deletions filters/auth/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,7 @@ func TestMain(m *testing.M) {

func cleanupAuthClients() {
for _, c := range tokeninfoAuthClient {
if ac, ok := c.(*authClient); ok {
ac.Close()
} else if cc, ok := c.(*tokeninfoCache); ok {
cc.client.(*authClient).Close()
}
c.Close()
}

for _, c := range issuerAuthClient {
Expand Down
6 changes: 4 additions & 2 deletions filters/auth/tokeninfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/zalando/skipper/filters"
"github.com/zalando/skipper/filters/annotate"
"github.com/zalando/skipper/metrics"
)

const (
Expand All @@ -32,9 +33,10 @@ type TokeninfoOptions struct {
Timeout time.Duration
MaxIdleConns int
Tracer opentracing.Tracer
Metrics metrics.Metrics

// CacheSize configures the maximum number of cached tokens.
// The cache evicts least recently used items first.
// The cache periodically evicts random items when number of cached tokens exceeds CacheSize.
// Zero value disables tokeninfo cache.
CacheSize int

Expand Down Expand Up @@ -100,7 +102,7 @@ func (o *TokeninfoOptions) newTokeninfoClient() (tokeninfoClient, error) {
}

if o.CacheSize > 0 {
c = newTokeninfoCache(c, o.CacheSize, o.CacheTTL)
c = newTokeninfoCache(c, o.Metrics, o.CacheSize, o.CacheTTL)
}
return c, nil
}
Expand Down
148 changes: 78 additions & 70 deletions filters/auth/tokeninfocache.go
Original file line number Diff line number Diff line change
@@ -1,48 +1,55 @@
package auth

import (
"container/list"
"maps"
"sync"
"sync/atomic"
"time"

"github.com/zalando/skipper/filters"
"github.com/zalando/skipper/metrics"
)

type (
tokeninfoCache struct {
client tokeninfoClient
size int
ttl time.Duration
now func() time.Time

mu sync.Mutex
cache map[string]*entry
// least recently used token at the end
history *list.List
client tokeninfoClient
metrics metrics.Metrics
size int
ttl time.Duration
now func() time.Time

cache sync.Map // map[string]*entry
count atomic.Int64 // estimated number of cached entries, see https://github.com/golang/go/issues/20680
quit chan struct{}
}

entry struct {
cachedAt time.Time
expiresAt time.Time
info map[string]any
// reference in the history
href *list.Element
expiresAt time.Time
info map[string]any
infoExpiresAt time.Time
}
)

var _ tokeninfoClient = &tokeninfoCache{}

const expiresInField = "expires_in"

func newTokeninfoCache(client tokeninfoClient, size int, ttl time.Duration) *tokeninfoCache {
return &tokeninfoCache{
func newTokeninfoCache(client tokeninfoClient, metrics metrics.Metrics, size int, ttl time.Duration) *tokeninfoCache {
c := &tokeninfoCache{
client: client,
metrics: metrics,
size: size,
ttl: ttl,
now: time.Now,
cache: make(map[string]*entry, size),
history: list.New(),
quit: make(chan struct{}),
}
go c.evictLoop()
return c
}

func (c *tokeninfoCache) Close() {
c.client.Close()
close(c.quit)
}

func (c *tokeninfoCache) getTokeninfo(token string, ctx filters.FilterContext) (map[string]any, error) {
Expand All @@ -58,35 +65,21 @@ func (c *tokeninfoCache) getTokeninfo(token string, ctx filters.FilterContext) (
}

func (c *tokeninfoCache) cached(token string) map[string]any {
now := c.now()

c.mu.Lock()

if e, ok := c.cache[token]; ok {
if v, ok := c.cache.Load(token); ok {
now := c.now()
e := v.(*entry)
if now.Before(e.expiresAt) {
c.history.MoveToFront(e.href)
cachedInfo := e.info
c.mu.Unlock()

// It might be ok to return cached value
// without adjusting "expires_in" to avoid copy
// if caller never modifies the result and
// when "expires_in" did not change (same second)
// or for small TTL values
info := shallowCopyOf(cachedInfo)
info := maps.Clone(e.info)

elapsed := now.Sub(e.cachedAt).Truncate(time.Second).Seconds()
info[expiresInField] = info[expiresInField].(float64) - elapsed
info[expiresInField] = e.infoExpiresAt.Sub(now).Truncate(time.Second).Seconds()
return info
} else {
// remove expired
delete(c.cache, token)
c.history.Remove(e.href)
}
}

c.mu.Unlock()

return nil
}

Expand All @@ -95,38 +88,61 @@ func (c *tokeninfoCache) tryCache(token string, info map[string]any) {
if expiresIn <= 0 {
return
}
if c.ttl > 0 && expiresIn > c.ttl {
expiresIn = c.ttl
}

now := c.now()
expiresAt := now.Add(expiresIn)
e := &entry{
info: info,
infoExpiresAt: now.Add(expiresIn),
}

c.mu.Lock()
defer c.mu.Unlock()
if c.ttl > 0 && expiresIn > c.ttl {
e.expiresAt = now.Add(c.ttl)
} else {
e.expiresAt = e.infoExpiresAt
}

if e, ok := c.cache[token]; ok {
// update
e.cachedAt = now
e.expiresAt = expiresAt
e.info = info
c.history.MoveToFront(e.href)
return
if _, loaded := c.cache.Swap(token, e); !loaded {
c.count.Add(1)
}
}

// create
c.cache[token] = &entry{
cachedAt: now,
expiresAt: expiresAt,
info: info,
href: c.history.PushFront(token),
func (c *tokeninfoCache) evictLoop() {
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for {
select {
case <-c.quit:
return
case <-ticker.C:
c.evict()
}
}
}

// remove least used
if len(c.cache) > c.size {
leastUsed := c.history.Back()
delete(c.cache, leastUsed.Value.(string))
c.history.Remove(leastUsed)
func (c *tokeninfoCache) evict() {
// Evict expired entries
c.cache.Range(func(key, value any) bool {
e := value.(*entry)
if c.now().After(e.expiresAt) {
if c.cache.CompareAndDelete(key, value) {
c.count.Add(-1)
}
}
return true
})

// Evict random entries until the cache size is within limits
if c.count.Load() > int64(c.size) {
c.cache.Range(func(key, value any) bool {
if c.cache.CompareAndDelete(key, value) {
c.count.Add(-1)
}
return c.count.Load() > int64(c.size)
})
}

if c.metrics != nil {
c.metrics.UpdateGauge("tokeninfocache.count", float64(c.count.Load()))
}
}

Expand All @@ -141,11 +157,3 @@ func expiresIn(info map[string]any) time.Duration {
}
return 0
}

func shallowCopyOf(info map[string]any) map[string]any {
m := make(map[string]any, len(info))
for k, v := range info {
m[k] = v
}
return m
}
34 changes: 28 additions & 6 deletions filters/auth/tokeninfocache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/zalando/skipper/filters"
"github.com/zalando/skipper/filters/filtertest"
"github.com/zalando/skipper/metrics/metricstest"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -22,6 +23,7 @@ type tokeninfoClientFunc func(string, filters.FilterContext) (map[string]any, er
func (f tokeninfoClientFunc) getTokeninfo(token string, ctx filters.FilterContext) (map[string]any, error) {
return f(token, ctx)
}
func (f tokeninfoClientFunc) Close() {}

type testTokeninfoToken string

Expand Down Expand Up @@ -58,6 +60,7 @@ func (c *testClock) now() time.Time {
func TestTokeninfoCache(t *testing.T) {
const (
TokenTTLSeconds = 600
CacheSize = 1
CacheTTL = 300 * time.Second // less than TokenTTLSeconds
)

Expand All @@ -79,15 +82,19 @@ func TestTokeninfoCache(t *testing.T) {
}))
defer authServer.Close()

m := &metricstest.MockMetrics{}
defer m.Close()

o := TokeninfoOptions{
URL: authServer.URL + "/oauth2/tokeninfo",
CacheSize: 1,
CacheSize: CacheSize,
CacheTTL: CacheTTL,
Metrics: m,
}
c, err := o.newTokeninfoClient()
require.NoError(t, err)

defer c.(*tokeninfoCache).client.(*authClient).Close()
defer c.Close()
c.(*tokeninfoCache).now = clock.now

ctx := &filtertest.Context{FRequest: &http.Request{}}
Expand All @@ -111,7 +118,7 @@ func TestTokeninfoCache(t *testing.T) {

assert.Equal(t, int32(1), authRequests, "expected no request to auth sever")
assert.Equal(t, token, info["uid"])
assert.Equal(t, float64(595), info["expires_in"], "expected TokenTTLSeconds - truncate(delay)")
assert.Equal(t, float64(594), info["expires_in"], "expected truncate(TokenTTLSeconds - delay)")

// Third request after "sleeping" longer than cache TTL
clock.add(CacheTTL)
Expand All @@ -123,7 +130,7 @@ func TestTokeninfoCache(t *testing.T) {
assert.Equal(t, token, info["uid"])
assert.Equal(t, float64(294), info["expires_in"], "expected truncate(TokenTTLSeconds - CacheTTL - delay)")

// Fourth request with a new token evicts cached value
// Fourth request with a new token
token = newTestTokeninfoToken(clock.now()).String()

info, err = c.getTokeninfo(token, ctx)
Expand All @@ -132,6 +139,19 @@ func TestTokeninfoCache(t *testing.T) {
assert.Equal(t, int32(3), authRequests, "expected new request to auth sever")
assert.Equal(t, token, info["uid"])
assert.Equal(t, float64(600), info["expires_in"], "expected TokenTTLSeconds")

// Force eviction and verify cache size is within limits
c.(*tokeninfoCache).evict()
m.WithGauges(func(g map[string]float64) {
assert.Equal(t, float64(CacheSize), g["tokeninfocache.count"])
})

// Force eviction after all entries expired and verify cache is empty
clock.add(CacheTTL + time.Second)
c.(*tokeninfoCache).evict()
m.WithGauges(func(g map[string]float64) {
assert.Equal(t, float64(0), g["tokeninfocache.count"])
})
}

// Tests race between reading and writing cache for the same token
Expand All @@ -152,7 +172,8 @@ func TestTokeninfoCacheUpdateRace(t *testing.T) {
return map[string]any{"requestNumber": requestNumber, "uid": token, "expires_in": float64(600)}, nil
})

c := newTokeninfoCache(mc, 1, time.Hour)
c := newTokeninfoCache(mc, nil, 1, time.Hour)
defer c.Close()

const token = "atoken"

Expand Down Expand Up @@ -234,7 +255,8 @@ func BenchmarkTokeninfoCache(b *testing.B) {
return tokenValues[token], nil
})

c := newTokeninfoCache(mc, bi.cacheSize, time.Hour)
c := newTokeninfoCache(mc, nil, bi.cacheSize, time.Hour)
defer c.Close()

var tokens []string
for i := 0; i < bi.tokens; i++ {
Expand Down
1 change: 1 addition & 0 deletions skipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -1615,6 +1615,7 @@ func run(o Options, sig chan os.Signal, idleConnsCH chan struct{}) error {
Timeout: o.OAuthTokeninfoTimeout,
MaxIdleConns: o.IdleConnectionsPerHost,
Tracer: tracer,
Metrics: mtr,
CacheSize: o.OAuthTokeninfoCacheSize,
CacheTTL: o.OAuthTokeninfoCacheTTL,
}
Expand Down

0 comments on commit 818dd4d

Please sign in to comment.