Skip to content

Commit

Permalink
Add vl3 load balancer (#744)
Browse files Browse the repository at this point in the history
* Add vl3 load balancer

Signed-off-by: Artem Glazychev <[email protected]>

* Get rid of executor

Signed-off-by: Artem Glazychev <[email protected]>

* Add logs

Signed-off-by: Artem Glazychev <[email protected]>

* Moved to chain element

Signed-off-by: Artem Glazychev <[email protected]>

---------

Signed-off-by: Artem Glazychev <[email protected]>
  • Loading branch information
glazychev-art authored Sep 14, 2023
1 parent 89b437e commit 68f99e3
Show file tree
Hide file tree
Showing 7 changed files with 642 additions and 1 deletion.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.20

require (
github.com/cilium/ebpf v0.10.0
github.com/edwarnicke/genericsync v0.0.0-20220910010113-61a344f9bc29
github.com/edwarnicke/serialize v1.0.7
github.com/golang/protobuf v1.5.3
github.com/google/uuid v1.3.0
Expand Down Expand Up @@ -32,7 +33,6 @@ require (
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/edwarnicke/exechelper v1.0.2 // indirect
github.com/edwarnicke/genericsync v0.0.0-20220910010113-61a344f9bc29 // indirect
github.com/edwarnicke/grpcfd v1.1.2 // indirect
github.com/fsnotify/fsnotify v1.5.4 // indirect
github.com/ftrvxmtrx/fd v0.0.0-20150925145434-c6d800382fff // indirect
Expand Down
114 changes: 114 additions & 0 deletions pkg/networkservice/vl3lb/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Copyright (c) 2023 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package vl3lb

import (
"context"
"net"
"net/url"
"time"

"go.fd.io/govpp/api"
"google.golang.org/grpc"

"github.com/golang/protobuf/ptypes/empty"
"github.com/networkservicemesh/api/pkg/api/networkservice"
"github.com/networkservicemesh/api/pkg/api/registry"
"github.com/networkservicemesh/govpp/binapi/ip_types"
"github.com/networkservicemesh/sdk/pkg/networkservice/core/next"
registryclient "github.com/networkservicemesh/sdk/pkg/registry/chains/client"
registryrecvfd "github.com/networkservicemesh/sdk/pkg/registry/common/recvfd"
registrysendfd "github.com/networkservicemesh/sdk/pkg/registry/common/sendfd"
)

type vl3lbClient struct {
port uint16
targetPort uint16
protocol ip_types.IPProto
selector map[string]string

chainCtx context.Context
vppConn api.Connection
nseRegistryClient registry.NetworkServiceEndpointRegistryClient
dialTimeout time.Duration
dialOpts []grpc.DialOption
}

// NewClient - return a new Client chain element implementing the vl3 load balancer
func NewClient(chainCtx context.Context, vppConn api.Connection, options ...Option) networkservice.NetworkServiceClient {
opts := &vl3LBOptions{
port: 80,
targetPort: 80,
protocol: ip_types.IP_API_PROTO_TCP,
selector: make(map[string]string),
clientURL: &url.URL{Scheme: "unix", Host: "connect.to.socket"},
}
for _, opt := range options {
opt(opts)
}

nseRegistryClient := registryclient.NewNetworkServiceEndpointRegistryClient(chainCtx,
registryclient.WithClientURL(opts.clientURL),
registryclient.WithNSEAdditionalFunctionality(
registryrecvfd.NewNetworkServiceEndpointRegistryClient(),
registrysendfd.NewNetworkServiceEndpointRegistryClient(),
),
registryclient.WithDialOptions(opts.dialOpts...),
)

return &vl3lbClient{
port: opts.port,
targetPort: opts.targetPort,
protocol: opts.protocol,
selector: opts.selector,
chainCtx: chainCtx,
vppConn: vppConn,
nseRegistryClient: nseRegistryClient,
dialTimeout: opts.dialTimeout,
dialOpts: opts.dialOpts,
}
}

func (lb *vl3lbClient) Request(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (*networkservice.Connection, error) {
requestContext := request.GetConnection().GetContext()
var previousIP net.IP
if requestContext != nil && requestContext.GetIpContext() != nil {
previousIP = requestContext.GetIpContext().GetSrcIPNets()[0].IP
}
conn, err := next.Client(ctx).Request(ctx, request, opts...)
if err != nil {
return nil, err
}
if !previousIP.Equal(conn.GetContext().GetIpContext().GetSrcIPNets()[0].IP) {
if cancel, ok := loadAndDeleteCancel(ctx); ok {
cancel()
}
cancelCtx, cancel := context.WithCancel(lb.chainCtx)
go lb.balanceService(cancelCtx, conn)

storeCancel(ctx, cancel)
}

return conn, nil
}

func (lb *vl3lbClient) Close(ctx context.Context, conn *networkservice.Connection, opts ...grpc.CallOption) (*empty.Empty, error) {
if cancel, ok := loadAndDeleteCancel(ctx); ok {
cancel()
}
return next.Client(ctx).Close(ctx, conn, opts...)
}
160 changes: 160 additions & 0 deletions pkg/networkservice/vl3lb/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
// Copyright (c) 2023 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package vl3lb

import (
"context"
"net/url"

"google.golang.org/grpc"

"github.com/networkservicemesh/api/pkg/api/networkservice"
"github.com/networkservicemesh/api/pkg/api/registry"
"github.com/networkservicemesh/sdk/pkg/tools/grpcutils"
"github.com/networkservicemesh/sdk/pkg/tools/log"
)

// This function balances the load between the 'real' servers.
// The 'real' servers are searched by 'Selector'
//
// Briefly, the work looks like this:
// 1. Find all vl3-NSE
// 2. Connect to grpc monitor services of these vl3-NSE
// 3. Monitor all connections containing the vl3-NSE name
// 4. Filter out those connections that contain 'Selector' labels
// 5. Get SrcIPs from these connections
// 6. Configure VPP load balancing

func (lb *vl3lbClient) balanceService(ctx context.Context, conn *networkservice.Connection) {
loggerLb := log.FromContext(ctx).WithField("LoadBalancer", conn.NetworkService)

// 1. Find all vl3-NSE
nseStream, err := lb.nseRegistryClient.Find(ctx, &registry.NetworkServiceEndpointQuery{
NetworkServiceEndpoint: &registry.NetworkServiceEndpoint{
NetworkServiceNames: []string{conn.NetworkService},
},
Watch: true,
})
if err != nil {
loggerLb.Errorf("error getting nses: %+v", err)
return
}

lbVpp := newHandler(lb.vppConn, &endpoint{
IP: conn.GetContext().GetIpContext().GetSrcIPNets()[0].IP,
Port: lb.port,
}, lb.protocol)

monitoredNSEs := make(map[string]string)
for {
msg, err := nseStream.Recv()
if err != nil {
break
}

// Do not monitor the same NSE multiple times
if msg.Deleted {
delete(monitoredNSEs, msg.GetNetworkServiceEndpoint().GetName())
continue
}
if _, ok := monitoredNSEs[msg.GetNetworkServiceEndpoint().GetName()]; ok {
continue
}
monitoredNSEs[msg.GetNetworkServiceEndpoint().GetName()] = ""

go lb.balanceNSE(ctx, loggerLb, lbVpp, msg.NetworkServiceEndpoint)
}
}

func (lb *vl3lbClient) balanceNSE(ctx context.Context, loggerLb log.Logger, lbVpp *handler, nse *registry.NetworkServiceEndpoint) {
logger := loggerLb.WithField("NSE", nse.Name)
urlNSE, err := url.Parse(nse.Url)
if err != nil {
logger.Errorf("url.Parse: %+v", err)
return
}

// 2. Connect to grpc monitor services of these vl3-NSE
dialCtx, cancelDial := context.WithTimeout(ctx, lb.dialTimeout)
defer cancelDial()

ccMonitor, err := grpc.DialContext(dialCtx, grpcutils.URLToTarget(urlNSE), lb.dialOpts...)
if err != nil {
logger.Errorf("failed to dial: %v, URL: %v, err: %v", nse.Name, urlNSE.String(), err.Error())
return
}
logger.Info("connected")

// 3. Monitor all connections containing the vl3-NSE name
monitorClientNse := networkservice.NewMonitorConnectionClient(ccMonitor)
monitorCtx, cancelMonitor := context.WithCancel(ctx)
defer cancelMonitor()

stream, err := monitorClientNse.MonitorConnections(monitorCtx, &networkservice.MonitorScopeSelector{
PathSegments: []*networkservice.PathSegment{{
Name: nse.Name,
}},
})
if err != nil {
logger.WithField("NSE", nse.Name).Errorf("failed to MonitorConnections: %v", err.Error())
return
}
for {
event, err := stream.Recv()
if err != nil {
logger.WithField("NSE", nse.Name).Errorf("error MonitorConnections stream: %v", err.Error())
_ = lbVpp.deleteServers(context.Background(), nse.Name, lbVpp.getServerIDsByVL3Name(nse.Name))
break
}

// 4. Filter out those connections that contain 'Selector' labels
add, del := filterConnections(event, lb.selector, lb.targetPort)
// 6. Configure VPP load balancing
if err = lbVpp.addServers(ctx, nse.Name, add); err != nil {
logger.Errorf("addServers error: %v", err.Error())
}
if err = lbVpp.deleteServers(ctx, nse.Name, del); err != nil {
logger.Errorf("deleteServers error: %v", err.Error())
}
}
}

func filterConnections(event *networkservice.ConnectionEvent, selector map[string]string, targetPort uint16) (add map[string]*endpoint, del []string) {
add = make(map[string]*endpoint)
for _, eventConnection := range event.Connections {
for k, v := range selector {
if eventConnection.Labels[k] == v {
if event.GetType() == networkservice.ConnectionEventType_DELETE {
del = append(del, eventConnection.Id)
} else {
// 5. Get SrcIPs from these connections
switch eventConnection.GetState() {
case networkservice.State_DOWN:
del = append(del, eventConnection.Id)
default:
add[eventConnection.Id] = &endpoint{
IP: eventConnection.GetContext().GetIpContext().GetSrcIPNets()[0].IP,
Port: targetPort,
}
}
break
}
}
}
}
return
}
18 changes: 18 additions & 0 deletions pkg/networkservice/vl3lb/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright (c) 2023 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package vl3lb provides vl3 load balancing chain element
package vl3lb
Loading

0 comments on commit 68f99e3

Please sign in to comment.