Skip to content

Commit

Permalink
add unit tests + cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
NikitaSkrynnik committed Oct 9, 2024
1 parent da96efa commit 4340742
Show file tree
Hide file tree
Showing 7 changed files with 233 additions and 70 deletions.
2 changes: 1 addition & 1 deletion pkg/registry/common/querycache/ns_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type nsCache struct {
clockTime clock.Clock
}

func newNSCache(ctx context.Context, opts ...Option) *nsCache {
func newNSCache(ctx context.Context, opts ...NSCacheOption) *nsCache {
c := &nsCache{
expireTimeout: time.Minute,
clockTime: clock.FromContext(ctx),
Expand Down
14 changes: 1 addition & 13 deletions pkg/registry/common/querycache/ns_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package querycache

import (
"context"
"time"

"github.com/golang/protobuf/ptypes/empty"
"google.golang.org/grpc"
Expand All @@ -28,7 +27,6 @@ import (

"github.com/networkservicemesh/sdk/pkg/registry/core/next"
"github.com/networkservicemesh/sdk/pkg/registry/core/streamchannel"
"github.com/networkservicemesh/sdk/pkg/tools/log"
)

type queryCacheNSClient struct {
Expand All @@ -37,7 +35,7 @@ type queryCacheNSClient struct {
}

// NewNetworkServiceClient creates new querycache NS registry client that caches all resolved NSs
func NewNetworkServiceClient(ctx context.Context, opts ...Option) registry.NetworkServiceRegistryClient {
func NewNetworkServiceClient(ctx context.Context, opts ...NSCacheOption) registry.NetworkServiceRegistryClient {
return &queryCacheNSClient{
ctx: ctx,
cache: newNSCache(ctx, opts...),
Expand All @@ -49,19 +47,14 @@ func (q *queryCacheNSClient) Register(ctx context.Context, nse *registry.Network
}

func (q *queryCacheNSClient) Find(ctx context.Context, query *registry.NetworkServiceQuery, opts ...grpc.CallOption) (registry.NetworkServiceRegistry_FindClient, error) {
log.FromContext(ctx).WithField("time", time.Now()).Infof("queryCacheNSClient forth")
if query.Watch {
return next.NetworkServiceRegistryClient(ctx).Find(ctx, query, opts...)
}

log.FromContext(ctx).WithField("time", time.Now()).Info("queryCacheNSClient search in cache")
if client, ok := q.findInCache(ctx, query); ok {
log.FromContext(ctx).Info("queryCacheNSClient found in cache")
return client, nil
}

log.FromContext(ctx).WithField("time", time.Now()).Info("queryCacheNSClient not found in cache")

client, err := next.NetworkServiceRegistryClient(ctx).Find(ctx, query, opts...)
if err != nil {
return nil, err
Expand All @@ -80,14 +73,11 @@ func (q *queryCacheNSClient) Find(ctx context.Context, query *registry.NetworkSe
}

func (q *queryCacheNSClient) findInCache(ctx context.Context, query *registry.NetworkServiceQuery) (registry.NetworkServiceRegistry_FindClient, bool) {
log.FromContext(ctx).WithField("time", time.Now()).Infof("queryCacheNSClient checking key: %v", query.NetworkService)
ns := q.cache.Load(ctx, query.NetworkService)
if ns == nil {
return nil, false
}

log.FromContext(ctx).WithField("time", time.Now()).Infof("found NS in cache: %v", ns)

resultCh := make(chan *registry.NetworkServiceResponse, 1)
resultCh <- &registry.NetworkServiceResponse{NetworkService: ns.Clone()}
close(resultCh)
Expand All @@ -103,7 +93,6 @@ func (q *queryCacheNSClient) storeInCache(ctx context.Context, ns *registry.Netw
}

findCtx, cancel := context.WithCancel(q.ctx)

entry, loaded := q.cache.LoadOrStore(ns, cancel)
if loaded {
cancel()
Expand All @@ -114,7 +103,6 @@ func (q *queryCacheNSClient) storeInCache(ctx context.Context, ns *registry.Netw
defer entry.Cleanup()

nsQuery.Watch = true

stream, err := next.NetworkServiceRegistryClient(ctx).Find(findCtx, nsQuery, opts...)
if err != nil {
return
Expand Down
197 changes: 197 additions & 0 deletions pkg/registry/common/querycache/ns_client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
// Copyright (c) 2020-2021 Doc.ai and/or its affiliates.
//
// SPDX-Licens-Identifier: Apache-2.0
//
// Licensd under the Apache Licens, Version 2.0 (the "Licens");
// you may not use this file except in compliance with the Licens.
// You may obtain a copy of the Licens at:
//
// http://www.apache.org/licenss/LICEns-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the Licens is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the Licens for the specific language governing permissions and
// limitations under the Licens.

package querycache_test

import (
"context"
"sync/atomic"
"testing"

"github.com/golang/protobuf/ptypes/empty"
"github.com/networkservicemesh/api/pkg/api/registry"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
"google.golang.org/grpc"

"github.com/networkservicemesh/sdk/pkg/registry/common/memory"
"github.com/networkservicemesh/sdk/pkg/registry/common/querycache"
"github.com/networkservicemesh/sdk/pkg/registry/core/adapters"
"github.com/networkservicemesh/sdk/pkg/registry/core/next"
"github.com/networkservicemesh/sdk/pkg/tools/clock"
"github.com/networkservicemesh/sdk/pkg/tools/clockmock"
)

const (
payload1 = "ethernet"
payload2 = "ip"
)

func testNSQuery(nsName string) *registry.NetworkServiceQuery {
return &registry.NetworkServiceQuery{
NetworkService: &registry.NetworkService{
Name: nsName,
},
}
}

func Test_QueryCacheClient_ShouldCacheNSs(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

mem := memory.NewNetworkServiceRegistryServer()

failureClient := new(failureNSClient)
c := next.NewNetworkServiceRegistryClient(
querycache.NewNetworkServiceClient(ctx, querycache.WithNSExpireTimeout(expireTimeout)),
failureClient,
adapters.NetworkServiceServerToClient(mem),
)

reg, err := mem.Register(ctx, &registry.NetworkService{
Name: name,
Payload: payload1,
})
require.NoError(t, err)

// Goroutines should be cleaned up on ns unregister
t.Cleanup(func() { goleak.VerifyNone(t) })

// 1. Find from memory
atomic.StoreInt32(&failureClient.shouldFail, 0)

stream, err := c.Find(ctx, testNSQuery(""))
require.NoError(t, err)
nsResp, err := stream.Recv()
require.NoError(t, err)
require.Equal(t, name, nsResp.NetworkService.Name)

// 2. Find from cache
atomic.StoreInt32(&failureClient.shouldFail, 1)

stream, err = c.Find(ctx, testNSQuery(name))
require.NoError(t, err)
nsResp, err = stream.Recv()
require.NoError(t, err)
require.Equal(t, name, nsResp.NetworkService.Name)

// 3. Update NS in memory
reg.Payload = payload2
reg, err = mem.Register(ctx, reg)
require.NoError(t, err)

require.Eventually(t, func() bool {
if stream, err = c.Find(ctx, testNSQuery(name)); err != nil {
return false
}
if nsResp, err = stream.Recv(); err != nil {
return false
}
return name == nsResp.NetworkService.Name && payload2 == nsResp.NetworkService.Payload
}, testWait, testTick)

// 4. Delete ns from memory
_, err = mem.Unregister(ctx, reg)
require.NoError(t, err)

require.Eventually(t, func() bool {
_, err = c.Find(ctx, testNSQuery(name))
return err != nil
}, testWait, testTick)
}

func Test_QueryCacheClient_ShouldCleanUpNSOnTimeout(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

clockMock := clockmock.New(ctx)
ctx = clock.WithClock(ctx, clockMock)

mem := memory.NewNetworkServiceRegistryServer()

failureClient := new(failureNSClient)
c := next.NewNetworkServiceRegistryClient(
querycache.NewNetworkServiceClient(ctx, querycache.WithNSExpireTimeout(expireTimeout)),
failureClient,
adapters.NetworkServiceServerToClient(mem),
)

_, err := mem.Register(ctx, &registry.NetworkService{
Name: name,
})
require.NoError(t, err)

// Goroutines should be cleaned up on cache entry expiration
t.Cleanup(func() { goleak.VerifyNone(t) })

// 1. Find from memory
atomic.StoreInt32(&failureClient.shouldFail, 0)

stream, err := c.Find(ctx, testNSQuery(""))
require.NoError(t, err)

_, err = stream.Recv()
require.NoError(t, err)

// 2. Find from cache
atomic.StoreInt32(&failureClient.shouldFail, 1)

require.Eventually(t, func() bool {
if stream, err = c.Find(ctx, testNSQuery(name)); err == nil {
_, err = stream.Recv()
}
return err == nil
}, testWait, testTick)

// 3. Keep finding from cache to prevent expiration
for start := clockMock.Now(); clockMock.Since(start) < 2*expireTimeout; clockMock.Add(expireTimeout / 3) {
stream, err = c.Find(ctx, testNSQuery(name))
require.NoError(t, err)

_, err = stream.Recv()
require.NoError(t, err)
}

// 4. Wait for the expire to happen
clockMock.Add(expireTimeout)

_, err = c.Find(ctx, testNSQuery(name))
require.Errorf(t, err, "find error")
}

type failureNSClient struct {
shouldFail int32
}

func (c *failureNSClient) Register(ctx context.Context, ns *registry.NetworkService, opts ...grpc.CallOption) (*registry.NetworkService, error) {
return next.NetworkServiceRegistryClient(ctx).Register(ctx, ns, opts...)
}

func (c *failureNSClient) Find(ctx context.Context, query *registry.NetworkServiceQuery, opts ...grpc.CallOption) (registry.NetworkServiceRegistry_FindClient, error) {
if atomic.LoadInt32(&c.shouldFail) == 1 && !query.Watch {
return nil, errors.New("find error")
}
return next.NetworkServiceRegistryClient(ctx).Find(ctx, query, opts...)
}

func (c *failureNSClient) Unregister(ctx context.Context, ns *registry.NetworkService, opts ...grpc.CallOption) (*empty.Empty, error) {
return next.NetworkServiceRegistryClient(ctx).Unregister(ctx, ns, opts...)
}
17 changes: 4 additions & 13 deletions pkg/registry/common/querycache/nse_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (

"github.com/networkservicemesh/api/pkg/api/registry"
"github.com/networkservicemesh/sdk/pkg/tools/clock"
"github.com/networkservicemesh/sdk/pkg/tools/log"
)

type nseCache struct {
Expand All @@ -36,15 +35,15 @@ type nseCache struct {
clockTime clock.Clock
}

func newNSECache(ctx context.Context, opts ...Option) *nseCache {
func newNSECache(ctx context.Context, opts ...NSECacheOption) *nseCache {
c := &nseCache{
expireTimeout: time.Minute,
clockTime: clock.FromContext(ctx),
}

// for _, opt := range opts {
// opt(c)
// }
for _, opt := range opts {
opt(c)
}

ticker := c.clockTime.Ticker(c.expireTimeout)
go func() {
Expand Down Expand Up @@ -118,8 +117,6 @@ func subset(a, b []string) bool {
func (c *nseCache) Load(ctx context.Context, query *registry.NetworkServiceEndpointQuery) []*registry.NetworkServiceEndpoint {
values := make([]*registry.NetworkServiceEndpoint, 0)

log.FromContext(ctx).WithField("time", time.Now()).Infof("query: %v\n", query)

if query.NetworkServiceEndpoint.Name != "" {
entry, ok := c.entries.Load(query.NetworkServiceEndpoint.Name)
if ok {
Expand All @@ -128,18 +125,12 @@ func (c *nseCache) Load(ctx context.Context, query *registry.NetworkServiceEndpo
return values
}

log.FromContext(ctx).WithField("time", time.Now()).Infof("Range")
c.entries.Range(func(key string, entry *cacheEntry[registry.NetworkServiceEndpoint]) bool {
log.FromContext(ctx).WithField("time", time.Now()).Infof("key: %v\n", key)
log.FromContext(ctx).WithField("time", time.Now()).Infof("entry.value: %v\n", entry.value)
if subset(query.NetworkServiceEndpoint.NetworkServiceNames, entry.value.NetworkServiceNames) {
log.FromContext(ctx).WithField("time", time.Now()).Infof("adding entry to nses\n")
values = c.add(entry, values)
}
return true
})

log.FromContext(ctx).WithField("time", time.Now()).Infof("values: %v\n", values)

return values
}
Loading

0 comments on commit 4340742

Please sign in to comment.