Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NSM connection (VIPs and routes) update in TAPA #178

Merged
merged 3 commits into from
Apr 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions examples/target/helm/templates/target.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,6 @@ spec:
- name: meridio-socket
mountPath: /var/lib/meridio
readOnly: false
securityContext:
capabilities:
add: ["NET_ADMIN"]
volumes:
- name: spire-agent-socket
hostPath:
Expand Down
127 changes: 77 additions & 50 deletions pkg/ambassador/tap/conduit/conduit.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ import (
"context"
"errors"
"fmt"
"net"
"strings"
"sync"
"time"

"github.com/networkservicemesh/api/pkg/api/networkservice"
"github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/cls"
Expand All @@ -32,6 +35,7 @@ import (
"github.com/nordix/meridio/pkg/ambassador/tap/types"
"github.com/nordix/meridio/pkg/conduit"
"github.com/nordix/meridio/pkg/networking"
"github.com/nordix/meridio/pkg/retry"
"github.com/sirupsen/logrus"
)

Expand All @@ -53,8 +57,9 @@ type Conduit struct {
StreamFactory StreamFactory
connection *networkservice.Connection
mu sync.Mutex
vips []*virtualIP
tableID int
localIPs []string
ctx context.Context
cancel context.CancelFunc
}

// New is the constructor of Conduit.
Expand All @@ -76,8 +81,7 @@ func New(conduit *ambassadorAPI.Conduit,
NetworkServiceClient: networkServiceClient,
NetUtils: netUtils,
connection: nil,
vips: []*virtualIP{},
tableID: 1,
localIPs: []string{},
}
c.StreamFactory = stream.NewFactory(targetRegistryClient, stream.MaxNumberOfTargets, c)
c.StreamManager = NewStreamManager(configurationManagerClient, targetRegistryClient, streamRegistry, c.StreamFactory, PendingTime)
Expand All @@ -93,9 +97,10 @@ func (c *Conduit) Connect(ctx context.Context) error {
if c.isConnected() {
return nil
}
c.ctx, c.cancel = context.WithCancel(ctx)
logrus.Infof("Attempt to connect conduit: %v", c.Conduit)
nsName := conduit.GetNetworkServiceNameWithProxy(c.Conduit.GetName(), c.Conduit.GetTrench().GetName(), c.Namespace)
connection, err := c.NetworkServiceClient.Request(ctx,
connection, err := c.NetworkServiceClient.Request(c.ctx,
&networkservice.NetworkServiceRequest{
Connection: &networkservice.Connection{
Id: fmt.Sprintf("%s-%s-%d", c.TargetName, nsName, 0),
Expand All @@ -117,6 +122,7 @@ func (c *Conduit) Connect(ctx context.Context) error {
}
logrus.Infof("Conduit connected: %v", c.Conduit)
c.connection = connection
c.localIPs = c.connection.GetContext().GetIpContext().GetSrcIpAddrs()

c.Configuration.Watch()

Expand All @@ -127,14 +133,14 @@ func (c *Conduit) Connect(ctx context.Context) error {
// Disconnect closes the connection from NSM, closes all streams
// and stop the VIP watcher
func (c *Conduit) Disconnect(ctx context.Context) error {
if c.cancel != nil {
c.cancel()
}
c.mu.Lock()
defer c.mu.Unlock()
logrus.Infof("Disconnect from conduit: %v", c.Conduit)
// Stops the configuration
c.Configuration.Stop()
// reset the VIPs related configuration
c.deleteVIPs(c.vips)
c.tableID = 1
var errFinal error
// Stop the stream manager (close the streams)
errFinal = c.StreamManager.Stop(ctx)
Expand Down Expand Up @@ -171,10 +177,10 @@ func (c *Conduit) GetConduit() *ambassadorAPI.Conduit {
return c.Conduit
}

// GetStreams returns the local IPs for this conduit
// GetIPs returns the local IPs for this conduit
func (c *Conduit) GetIPs() []string {
if c.connection != nil {
return c.connection.GetContext().GetIpContext().GetSrcIpAddrs()
return c.localIPs
}
return []string{}
}
Expand All @@ -186,34 +192,69 @@ func (c *Conduit) SetVIPs(vips []string) error {
if !c.isConnected() {
return nil
}
currentVIPs := make(map[string]*virtualIP)
for _, vip := range c.vips {
currentVIPs[vip.prefix] = vip
// prepare SrcIpAddrs (IPs allocated by the proxy + VIPs)
c.connection.Context.IpContext.SrcIpAddrs = append(c.GetIPs(), vips...)
// prepare the routes (nexthops = proxy bridge IPs)
ipv4Nexthops := []*networkservice.Route{}
ipv6Nexthops := []*networkservice.Route{}
for _, nexthop := range c.getGateways() {
gw, _, err := net.ParseCIDR(nexthop)
if err != nil {
continue
}
route := &networkservice.Route{
NextHop: gw.String(),
}
if isIPv6(nexthop) {
route.Prefix = "::/0"
ipv6Nexthops = append(ipv6Nexthops, route)
} else {
route.Prefix = "0.0.0.0/0"
ipv4Nexthops = append(ipv4Nexthops, route)
}
}
// prepare the policies (only based on VIP address for now)
c.connection.Context.IpContext.Policies = []*networkservice.PolicyRoute{}
for _, vip := range vips {
if _, ok := currentVIPs[vip]; !ok {
newVIP, err := newVirtualIP(vip, c.tableID, c.NetUtils)
if err != nil {
logrus.Errorf("SimpleTarget: Error adding SourceBaseRoute: %v", err) // todo: err handling
continue
}
c.tableID++
c.vips = append(c.vips, newVIP)
for _, nexthop := range c.getGateways() {
err = newVIP.AddNexthop(nexthop)
if err != nil {
logrus.Errorf("Client: Error adding nexthop: %v", err) // todo: err handling
}
}
nexthops := ipv4Nexthops
if isIPv6(vip) {
nexthops = ipv6Nexthops
}
delete(currentVIPs, vip)
}
// delete remaining vips
vipsSlice := []*virtualIP{}
for _, vip := range currentVIPs {
vipsSlice = append(vipsSlice, vip)
newPolicyRoute := &networkservice.PolicyRoute{
From: vip,
Routes: nexthops,
}
c.connection.Context.IpContext.Policies = append(c.connection.Context.IpContext.Policies, newPolicyRoute)
}
c.deleteVIPs(vipsSlice)
var err error
c.ctx, c.cancel = context.WithCancel(context.TODO())
// update the NSM connection
err = retry.Do(func() error {
c.connection, err = c.NetworkServiceClient.Request(c.ctx,
&networkservice.NetworkServiceRequest{
Connection: &networkservice.Connection{
Id: c.connection.GetId(),
NetworkService: c.connection.GetNetworkService(),
Labels: c.connection.GetLabels(),
Payload: c.connection.GetPayload(),
Context: &networkservice.ConnectionContext{
IpContext: c.connection.GetContext().GetIpContext(),
},
},
MechanismPreferences: []*networkservice.Mechanism{
{
Cls: cls.LOCAL,
Type: kernelmech.MECHANISM,
},
},
})
if err != nil {
return fmt.Errorf("error updating the VIPs in conduit: %v - %v", c.Conduit, err)
}
return nil
}, retry.WithContext(c.ctx),
retry.WithDelay(500*time.Millisecond))
logrus.Infof("VIPs in conduit updated: %v - %v", c.Conduit, vips)
return nil
}

Expand All @@ -226,22 +267,8 @@ func (c *Conduit) isConnected() bool {
return c.connection != nil
}

func (c *Conduit) deleteVIPs(vips []*virtualIP) {
vipsMap := make(map[string]*virtualIP)
for _, vip := range vips {
vipsMap[vip.prefix] = vip
}
for index := 0; index < len(c.vips); index++ {
vip := c.vips[index]
if _, ok := vipsMap[vip.prefix]; ok {
c.vips = append(c.vips[:index], c.vips[index+1:]...)
index--
err := vip.Delete()
if err != nil {
logrus.Errorf("Client: Error deleting vip: %v", err) // todo: err handling
}
}
}
func isIPv6(address string) bool {
return strings.Count(address, ":") >= 2
}

// TODO: Requires the IPs of the bridge
Expand Down
74 changes: 0 additions & 74 deletions pkg/ambassador/tap/conduit/vip.go

This file was deleted.