diff --git a/pkg/registry/common/querycache/cache.go b/pkg/registry/common/querycache/ns_cache.go similarity index 57% rename from pkg/registry/common/querycache/cache.go rename to pkg/registry/common/querycache/ns_cache.go index f9558dc42..f9606afbf 100644 --- a/pkg/registry/common/querycache/cache.go +++ b/pkg/registry/common/querycache/ns_cache.go @@ -1,6 +1,4 @@ -// Copyright (c) 2021 Doc.ai and/or its affiliates. -// -// Copyright (c) 2023 Cisco and/or its affiliates. +// Copyright (c) 2024 Cisco and/or its affiliates. // // SPDX-License-Identifier: Apache-2.0 // @@ -24,19 +22,19 @@ import ( "time" "github.com/edwarnicke/genericsync" - "github.com/networkservicemesh/api/pkg/api/registry" + "github.com/networkservicemesh/api/pkg/api/registry" "github.com/networkservicemesh/sdk/pkg/tools/clock" ) -type cache struct { +type nsCache struct { expireTimeout time.Duration - entries genericsync.Map[string, *cacheEntry] + entries genericsync.Map[string, *cacheEntry[registry.NetworkService]] clockTime clock.Clock } -func newCache(ctx context.Context, opts ...Option) *cache { - c := &cache{ +func newNSCache(ctx context.Context, opts ...Option) *nsCache { + c := &nsCache{ expireTimeout: time.Minute, clockTime: clock.FromContext(ctx), } @@ -53,14 +51,13 @@ func newCache(ctx context.Context, opts ...Option) *cache { ticker.Stop() return case <-ticker.C(): - c.entries.Range(func(_ string, e *cacheEntry) bool { + c.entries.Range(func(_ string, e *cacheEntry[registry.NetworkService]) bool { e.lock.Lock() defer e.lock.Unlock() if c.clockTime.Until(e.expirationTime) < 0 { e.cleanup() } - return true }) } @@ -70,54 +67,53 @@ func newCache(ctx context.Context, opts ...Option) *cache { return c } -func (c *cache) LoadOrStore(key string, nse *registry.NetworkServiceEndpoint, cancel context.CancelFunc) (*cacheEntry, bool) { +func (c *nsCache) LoadOrStore(value *registry.NetworkService, cancel context.CancelFunc) (*cacheEntry[registry.NetworkService], bool) { var once sync.Once - return c.entries.LoadOrStore(key, &cacheEntry{ - nse: nse, + + entry, ok := c.entries.LoadOrStore(value.GetName(), &cacheEntry[registry.NetworkService]{ + value: value, expirationTime: c.clockTime.Now().Add(c.expireTimeout), cleanup: func() { once.Do(func() { - c.entries.Delete(key) + c.entries.Delete(value.GetName()) cancel() }) - }, - }) -} + }}) -func (c *cache) Load(key string) (*registry.NetworkServiceEndpoint, bool) { - e, ok := c.entries.Load(key) - if !ok { - return nil, false - } - - e.lock.Lock() - defer e.lock.Unlock() + return entry, ok +} - if c.clockTime.Until(e.expirationTime) < 0 { - e.cleanup() - return nil, false +func (c *nsCache) Load(ctx context.Context, query *registry.NetworkService) *registry.NetworkService { + entry, ok := c.entries.Load(query.Name) + if ok { + entry.lock.Lock() + defer entry.lock.Unlock() + if c.clockTime.Until(entry.expirationTime) < 0 { + entry.cleanup() + } else { + entry.expirationTime = c.clockTime.Now().Add(c.expireTimeout) + return entry.value + } } - e.expirationTime = c.clockTime.Now().Add(c.expireTimeout) - - return e.nse, true + return nil } -type cacheEntry struct { - nse *registry.NetworkServiceEndpoint +type cacheEntry[T registry.NetworkService | registry.NetworkServiceEndpoint] struct { + value *T expirationTime time.Time lock sync.Mutex cleanup func() } -func (e *cacheEntry) Update(nse *registry.NetworkServiceEndpoint) { +func (e *cacheEntry[T]) Update(value *T) { e.lock.Lock() defer e.lock.Unlock() - e.nse = nse + e.value = value } -func (e *cacheEntry) Cleanup() { +func (e *cacheEntry[_]) Cleanup() { e.lock.Lock() defer e.lock.Unlock() diff --git a/pkg/registry/common/querycache/ns_client.go b/pkg/registry/common/querycache/ns_client.go new file mode 100644 index 000000000..93b61f647 --- /dev/null +++ b/pkg/registry/common/querycache/ns_client.go @@ -0,0 +1,138 @@ +// Copyright (c) 2020-2021 Doc.ai and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package querycache adds possibility to cache Find queries +package querycache + +import ( + "context" + "time" + + "github.com/golang/protobuf/ptypes/empty" + "google.golang.org/grpc" + + "github.com/networkservicemesh/api/pkg/api/registry" + + "github.com/networkservicemesh/sdk/pkg/registry/core/next" + "github.com/networkservicemesh/sdk/pkg/registry/core/streamchannel" + "github.com/networkservicemesh/sdk/pkg/tools/log" +) + +type queryCacheNSClient struct { + ctx context.Context + cache *nsCache +} + +// NewNetworkServiceClient creates new querycache NS registry client that caches all resolved NSs +func NewNetworkServiceClient(ctx context.Context, opts ...Option) registry.NetworkServiceRegistryClient { + return &queryCacheNSClient{ + ctx: ctx, + cache: newNSCache(ctx, opts...), + } +} + +func (q *queryCacheNSClient) Register(ctx context.Context, nse *registry.NetworkService, opts ...grpc.CallOption) (*registry.NetworkService, error) { + return next.NetworkServiceRegistryClient(ctx).Register(ctx, nse, opts...) +} + +func (q *queryCacheNSClient) Find(ctx context.Context, query *registry.NetworkServiceQuery, opts ...grpc.CallOption) (registry.NetworkServiceRegistry_FindClient, error) { + log.FromContext(ctx).WithField("time", time.Now()).Infof("queryCacheNSClient forth") + if query.Watch { + return next.NetworkServiceRegistryClient(ctx).Find(ctx, query, opts...) + } + + log.FromContext(ctx).WithField("time", time.Now()).Info("queryCacheNSClient search in cache") + if client, ok := q.findInCache(ctx, query); ok { + log.FromContext(ctx).Info("queryCacheNSClient found in cache") + return client, nil + } + + log.FromContext(ctx).WithField("time", time.Now()).Info("queryCacheNSClient not found in cache") + + client, err := next.NetworkServiceRegistryClient(ctx).Find(ctx, query, opts...) + if err != nil { + return nil, err + } + + nses := registry.ReadNetworkServiceList(client) + + resultCh := make(chan *registry.NetworkServiceResponse, len(nses)) + for _, nse := range nses { + resultCh <- ®istry.NetworkServiceResponse{NetworkService: nse} + q.storeInCache(ctx, nse.Clone(), opts...) + } + close(resultCh) + + return streamchannel.NewNetworkServiceFindClient(ctx, resultCh), nil +} + +func (q *queryCacheNSClient) findInCache(ctx context.Context, query *registry.NetworkServiceQuery) (registry.NetworkServiceRegistry_FindClient, bool) { + log.FromContext(ctx).WithField("time", time.Now()).Infof("queryCacheNSClient checking key: %v", query.NetworkService) + ns := q.cache.Load(ctx, query.NetworkService) + if ns == nil { + return nil, false + } + + log.FromContext(ctx).WithField("time", time.Now()).Infof("found NS in cache: %v", ns) + + resultCh := make(chan *registry.NetworkServiceResponse, 1) + resultCh <- ®istry.NetworkServiceResponse{NetworkService: ns.Clone()} + close(resultCh) + + return streamchannel.NewNetworkServiceFindClient(ctx, resultCh), true +} + +func (q *queryCacheNSClient) storeInCache(ctx context.Context, ns *registry.NetworkService, opts ...grpc.CallOption) { + nsQuery := ®istry.NetworkServiceQuery{ + NetworkService: ®istry.NetworkService{ + Name: ns.Name, + }, + } + + findCtx, cancel := context.WithCancel(q.ctx) + + entry, loaded := q.cache.LoadOrStore(ns, cancel) + if loaded { + cancel() + return + } + + go func() { + defer entry.Cleanup() + + nsQuery.Watch = true + + stream, err := next.NetworkServiceRegistryClient(ctx).Find(findCtx, nsQuery, opts...) + if err != nil { + return + } + + for nsResp, err := stream.Recv(); err == nil; nsResp, err = stream.Recv() { + if nsResp.NetworkService.Name != nsQuery.NetworkService.Name { + continue + } + if nsResp.Deleted { + break + } + + entry.Update(nsResp.NetworkService) + } + }() +} + +func (q *queryCacheNSClient) Unregister(ctx context.Context, in *registry.NetworkService, opts ...grpc.CallOption) (*empty.Empty, error) { + return next.NetworkServiceRegistryClient(ctx).Unregister(ctx, in, opts...) +} diff --git a/pkg/registry/common/querycache/nse_cache.go b/pkg/registry/common/querycache/nse_cache.go new file mode 100644 index 000000000..cc6c07ab2 --- /dev/null +++ b/pkg/registry/common/querycache/nse_cache.go @@ -0,0 +1,145 @@ +// Copyright (c) 2021 Doc.ai and/or its affiliates. +// +// Copyright (c) 2023 Cisco and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package querycache + +import ( + "context" + "sync" + "time" + + "github.com/edwarnicke/genericsync" + + "github.com/networkservicemesh/api/pkg/api/registry" + "github.com/networkservicemesh/sdk/pkg/tools/clock" + "github.com/networkservicemesh/sdk/pkg/tools/log" +) + +type nseCache struct { + expireTimeout time.Duration + entries genericsync.Map[string, *cacheEntry[registry.NetworkServiceEndpoint]] + clockTime clock.Clock +} + +func newNSECache(ctx context.Context, opts ...Option) *nseCache { + c := &nseCache{ + expireTimeout: time.Minute, + clockTime: clock.FromContext(ctx), + } + + // for _, opt := range opts { + // opt(c) + // } + + ticker := c.clockTime.Ticker(c.expireTimeout) + go func() { + for { + select { + case <-ctx.Done(): + ticker.Stop() + return + case <-ticker.C(): + c.entries.Range(func(_ string, e *cacheEntry[registry.NetworkServiceEndpoint]) bool { + e.lock.Lock() + defer e.lock.Unlock() + + if c.clockTime.Until(e.expirationTime) < 0 { + e.cleanup() + } + return true + }) + } + } + }() + + return c +} + +func (c *nseCache) LoadOrStore(value *registry.NetworkServiceEndpoint, cancel context.CancelFunc) (*cacheEntry[registry.NetworkServiceEndpoint], bool) { + var once sync.Once + + entry, ok := c.entries.LoadOrStore(value.GetName(), &cacheEntry[registry.NetworkServiceEndpoint]{ + value: value, + expirationTime: c.clockTime.Now().Add(c.expireTimeout), + cleanup: func() { + once.Do(func() { + c.entries.Delete(value.GetName()) + cancel() + }) + }}) + + return entry, ok +} + +func (c *nseCache) add(entry *cacheEntry[registry.NetworkServiceEndpoint], values []*registry.NetworkServiceEndpoint) []*registry.NetworkServiceEndpoint { + entry.lock.Lock() + defer entry.lock.Unlock() + if c.clockTime.Until(entry.expirationTime) < 0 { + entry.cleanup() + } else { + entry.expirationTime = c.clockTime.Now().Add(c.expireTimeout) + values = append(values, entry.value) + } + + return values +} + +// Checks if a is a subset of b +func subset(a, b []string) bool { + set := make(map[string]struct{}) + for _, value := range a { + set[value] = struct{}{} + } + + for _, value := range b { + if _, found := set[value]; !found { + return false + } + } + + return true +} + +func (c *nseCache) Load(ctx context.Context, query *registry.NetworkServiceEndpointQuery) []*registry.NetworkServiceEndpoint { + values := make([]*registry.NetworkServiceEndpoint, 0) + + log.FromContext(ctx).WithField("time", time.Now()).Infof("query: %v\n", query) + + if query.NetworkServiceEndpoint.Name != "" { + entry, ok := c.entries.Load(query.NetworkServiceEndpoint.Name) + if ok { + values = c.add(entry, values) + } + return values + } + + log.FromContext(ctx).WithField("time", time.Now()).Infof("Range") + c.entries.Range(func(key string, entry *cacheEntry[registry.NetworkServiceEndpoint]) bool { + log.FromContext(ctx).WithField("time", time.Now()).Infof("key: %v\n", key) + log.FromContext(ctx).WithField("time", time.Now()).Infof("entry.value: %v\n", entry.value) + if subset(query.NetworkServiceEndpoint.NetworkServiceNames, entry.value.NetworkServiceNames) { + log.FromContext(ctx).WithField("time", time.Now()).Infof("adding entry to nses\n") + values = c.add(entry, values) + } + return true + }) + + log.FromContext(ctx).WithField("time", time.Now()).Infof("values: %v\n", values) + + return values +} diff --git a/pkg/registry/common/querycache/nse_client.go b/pkg/registry/common/querycache/nse_client.go index efc37f204..b7a7797af 100644 --- a/pkg/registry/common/querycache/nse_client.go +++ b/pkg/registry/common/querycache/nse_client.go @@ -27,18 +27,19 @@ import ( "github.com/networkservicemesh/sdk/pkg/registry/core/next" "github.com/networkservicemesh/sdk/pkg/registry/core/streamchannel" + "github.com/networkservicemesh/sdk/pkg/tools/log" ) type queryCacheNSEClient struct { ctx context.Context - cache *cache + cache *nseCache } -// NewClient creates new querycache NSE registry client that caches all resolved NSEs -func NewClient(ctx context.Context, opts ...Option) registry.NetworkServiceEndpointRegistryClient { +// NewNetworkServiceEndpointClient creates new querycache NSE registry client that caches all resolved NSEs +func NewNetworkServiceEndpointClient(ctx context.Context, opts ...Option) registry.NetworkServiceEndpointRegistryClient { return &queryCacheNSEClient{ ctx: ctx, - cache: newCache(ctx, opts...), + cache: newNSECache(ctx, opts...), } } @@ -47,14 +48,19 @@ func (q *queryCacheNSEClient) Register(ctx context.Context, nse *registry.Networ } func (q *queryCacheNSEClient) Find(ctx context.Context, query *registry.NetworkServiceEndpointQuery, opts ...grpc.CallOption) (registry.NetworkServiceEndpointRegistry_FindClient, error) { + log.FromContext(ctx).Infof("queryCacheNSEClient forth") if query.Watch { return next.NetworkServiceEndpointRegistryClient(ctx).Find(ctx, query, opts...) } - if client, ok := q.findInCache(ctx, query.String()); ok { + log.FromContext(ctx).Info("queryCacheNSEClient search in cache") + if client, ok := q.findInCache(ctx, query); ok { + log.FromContext(ctx).Info("queryCacheNSEClient found in cache") return client, nil } + log.FromContext(ctx).Info("queryCacheNSEClient not found in cache") + client, err := next.NetworkServiceEndpointRegistryClient(ctx).Find(ctx, query, opts...) if err != nil { return nil, err @@ -72,14 +78,26 @@ func (q *queryCacheNSEClient) Find(ctx context.Context, query *registry.NetworkS return streamchannel.NewNetworkServiceEndpointFindClient(ctx, resultCh), nil } -func (q *queryCacheNSEClient) findInCache(ctx context.Context, key string) (registry.NetworkServiceEndpointRegistry_FindClient, bool) { - nse, ok := q.cache.Load(key) - if !ok { +func (q *queryCacheNSEClient) findInCache(ctx context.Context, query *registry.NetworkServiceEndpointQuery) (registry.NetworkServiceEndpointRegistry_FindClient, bool) { + log.FromContext(ctx).Infof("queryCacheNSEClient checking key: %v", query.NetworkServiceEndpoint) + + q.cache.entries.Range(func(key string, value *cacheEntry[registry.NetworkServiceEndpoint]) bool { + log.FromContext(ctx).Infof("Entries: %v, %v", key, value.value) + + return true + }) + + nses := q.cache.Load(ctx, query) + if len(nses) == 0 { return nil, false } - resultCh := make(chan *registry.NetworkServiceEndpointResponse, 1) - resultCh <- ®istry.NetworkServiceEndpointResponse{NetworkServiceEndpoint: nse.Clone()} + log.FromContext(ctx).Infof("found NSEs in cache: %v", nses) + + resultCh := make(chan *registry.NetworkServiceEndpointResponse, len(nses)) + for _, nse := range nses { + resultCh <- ®istry.NetworkServiceEndpointResponse{NetworkServiceEndpoint: nse.Clone()} + } close(resultCh) return streamchannel.NewNetworkServiceEndpointFindClient(ctx, resultCh), true @@ -92,11 +110,9 @@ func (q *queryCacheNSEClient) storeInCache(ctx context.Context, nse *registry.Ne }, } - key := nseQuery.String() - findCtx, cancel := context.WithCancel(q.ctx) - entry, loaded := q.cache.LoadOrStore(key, nse, cancel) + entry, loaded := q.cache.LoadOrStore(nse, cancel) if loaded { cancel() return diff --git a/pkg/registry/common/querycache/nse_client_test.go b/pkg/registry/common/querycache/nse_client_test.go index 516a911ba..10a5c8800 100644 --- a/pkg/registry/common/querycache/nse_client_test.go +++ b/pkg/registry/common/querycache/nse_client_test.go @@ -64,7 +64,7 @@ func Test_QueryCacheClient_ShouldCacheNSEs(t *testing.T) { failureClient := new(failureNSEClient) c := next.NewNetworkServiceEndpointRegistryClient( - querycache.NewClient(ctx, querycache.WithExpireTimeout(expireTimeout)), + querycache.NewNetworkServiceEndpointClient(ctx, querycache.WithExpireTimeout(expireTimeout)), failureClient, adapters.NetworkServiceEndpointServerToClient(mem), ) @@ -142,7 +142,7 @@ func Test_QueryCacheClient_ShouldCleanUpOnTimeout(t *testing.T) { failureClient := new(failureNSEClient) c := next.NewNetworkServiceEndpointRegistryClient( - querycache.NewClient(ctx, querycache.WithExpireTimeout(expireTimeout)), + querycache.NewNetworkServiceEndpointClient(ctx, querycache.WithExpireTimeout(expireTimeout)), failureClient, adapters.NetworkServiceEndpointServerToClient(mem), ) diff --git a/pkg/registry/common/querycache/option.go b/pkg/registry/common/querycache/option.go index f70220737..fd1288f48 100644 --- a/pkg/registry/common/querycache/option.go +++ b/pkg/registry/common/querycache/option.go @@ -19,11 +19,11 @@ package querycache import "time" // Option is an option for cache -type Option func(c *cache) +type Option func(c *nsCache) // WithExpireTimeout sets cache expire timeout func WithExpireTimeout(expireTimeout time.Duration) Option { - return func(c *cache) { + return func(c *nsCache) { c.expireTimeout = expireTimeout } }