Skip to content

Commit

Permalink
Fall back to polling the supervisor for apiserver addresses when the …
Browse files Browse the repository at this point in the history
…watch fails

Signed-off-by: Brad Davidson <[email protected]>
  • Loading branch information
brandond committed Nov 22, 2024
1 parent e160f73 commit 18cf2b4
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 61 deletions.
25 changes: 14 additions & 11 deletions pkg/agent/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,24 @@ func KubeProxyDisabled(ctx context.Context, node *config.Node, proxy proxy.Proxy
return disabled
}

// APIServers returns a list of apiserver endpoints, suitable for seeding client loadbalancer configurations.
// WaitForAPIServers returns a list of apiserver endpoints, suitable for seeding client loadbalancer configurations.
// This function will block until it can return a populated list of apiservers, or if the remote server returns
// an error (indicating that it does not support this functionality).
func APIServers(ctx context.Context, node *config.Node, proxy proxy.Proxy) []string {
func WaitForAPIServers(ctx context.Context, node *config.Node, proxy proxy.Proxy) []string {
var addresses []string
var info *clientaccess.Info
var err error

_ = wait.PollUntilContextCancel(ctx, 5*time.Second, true, func(ctx context.Context) (bool, error) {
addresses, err = getAPIServers(ctx, node, proxy)
if info == nil {
withCert := clientaccess.WithClientCertificate(node.AgentConfig.ClientKubeletCert, node.AgentConfig.ClientKubeletKey)
info, err = clientaccess.ParseAndValidateToken(proxy.SupervisorURL(), node.Token, withCert)
if err != nil {
logrus.Warnf("Failed to validate server token: %v", err)
return false, nil
}
}
addresses, err = GetAPIServers(ctx, info)
if err != nil {
logrus.Infof("Failed to retrieve list of apiservers from server: %v", err)
return false, err
Expand Down Expand Up @@ -772,14 +781,8 @@ func get(ctx context.Context, envInfo *cmds.Agent, proxy proxy.Proxy) (*config.N
return nodeConfig, nil
}

// getAPIServers attempts to return a list of apiservers from the server.
func getAPIServers(ctx context.Context, node *config.Node, proxy proxy.Proxy) ([]string, error) {
withCert := clientaccess.WithClientCertificate(node.AgentConfig.ClientKubeletCert, node.AgentConfig.ClientKubeletKey)
info, err := clientaccess.ParseAndValidateToken(proxy.SupervisorURL(), node.Token, withCert)
if err != nil {
return nil, err
}

// GetAPIServers attempts to return a list of apiservers from the server.
func GetAPIServers(ctx context.Context, info *clientaccess.Info) ([]string, error) {
data, err := info.Get("/v1-" + version.Program + "/apiservers")
if err != nil {
return nil, err
Expand Down
146 changes: 96 additions & 50 deletions pkg/agent/tunnel/tunnel.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"fmt"
"net"
"os"
"reflect"
"strconv"
"sync"
"time"
Expand All @@ -16,6 +15,7 @@ import (
agentconfig "github.com/k3s-io/k3s/pkg/agent/config"
"github.com/k3s-io/k3s/pkg/agent/loadbalancer"
"github.com/k3s-io/k3s/pkg/agent/proxy"
"github.com/k3s-io/k3s/pkg/clientaccess"
daemonconfig "github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/util"
"github.com/k3s-io/k3s/pkg/version"
Expand All @@ -27,6 +27,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -138,17 +139,18 @@ func Setup(ctx context.Context, config *daemonconfig.Node, proxy proxy.Proxy) er
// connecting to. If that fails, fall back to querying the endpoints list from Kubernetes. This
// fallback requires that the server we're joining be running an apiserver, but is the only safe
// thing to do if its supervisor is down-level and can't provide us with an endpoint list.
addresses := agentconfig.APIServers(ctx, config, proxy)
logrus.Infof("Got apiserver addresses from supervisor: %v", addresses)

addresses := agentconfig.WaitForAPIServers(ctx, config, proxy)
if len(addresses) > 0 {
logrus.Infof("Got apiserver addresses from supervisor: %v", addresses)
if localSupervisorDefault {
proxy.SetSupervisorDefault(addresses[0])
}
proxy.Update(addresses)
} else {
if endpoint, _ := client.CoreV1().Endpoints(metav1.NamespaceDefault).Get(ctx, "kubernetes", metav1.GetOptions{}); endpoint != nil {
addresses = util.GetAddresses(endpoint)
if endpoint, err := client.CoreV1().Endpoints(metav1.NamespaceDefault).Get(ctx, "kubernetes", metav1.GetOptions{}); err != nil {
logrus.Errorf("Failed to get apiserver addresses from kubernetes endpoints: %v", err)
} else {
addresses := util.GetAddresses(endpoint)
logrus.Infof("Got apiserver addresses from kubernetes endpoints: %v", addresses)
if len(addresses) > 0 {
proxy.Update(addresses)
Expand All @@ -159,7 +161,7 @@ func Setup(ctx context.Context, config *daemonconfig.Node, proxy proxy.Proxy) er

wg := &sync.WaitGroup{}

go tunnel.watchEndpoints(ctx, apiServerReady, wg, tlsConfig, proxy)
go tunnel.watchEndpoints(ctx, apiServerReady, wg, tlsConfig, config, proxy)

wait := make(chan int, 1)
go func() {
Expand Down Expand Up @@ -302,23 +304,21 @@ func (a *agentTunnel) watchPods(ctx context.Context, apiServerReady <-chan struc
// WatchEndpoints attempts to create tunnels to all supervisor addresses. Once the
// apiserver is up, go into a watch loop, adding and removing tunnels as endpoints come
// and go from the cluster.
func (a *agentTunnel) watchEndpoints(ctx context.Context, apiServerReady <-chan struct{}, wg *sync.WaitGroup, tlsConfig *tls.Config, proxy proxy.Proxy) {
// Attempt to connect to supervisors, storing their cancellation function for later when we
// need to disconnect.
disconnect := map[string]context.CancelFunc{}
for _, address := range proxy.SupervisorAddresses() {
if _, ok := disconnect[address]; !ok {
conn := a.connect(ctx, wg, address, tlsConfig)
disconnect[address] = conn.cancel
proxy.SetHealthCheck(address, conn.healthCheck)
}
}
func (a *agentTunnel) watchEndpoints(ctx context.Context, apiServerReady <-chan struct{}, wg *sync.WaitGroup, tlsConfig *tls.Config, node *daemonconfig.Node, proxy proxy.Proxy) {
syncProxyAddresses := a.getProxySyncer(ctx, wg, tlsConfig, proxy)
refreshFromSupervisor := getAPIServersRequester(node, proxy, syncProxyAddresses)

<-apiServerReady

endpoints := a.client.CoreV1().Endpoints(metav1.NamespaceDefault)
fieldSelector := fields.Set{metav1.ObjectNameField: "kubernetes"}.String()
lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (object runtime.Object, e error) {
// if we're being called to re-list, then likely there was an
// interruption to the apiserver connection and the listwatch is retrying
// its connection. This is a good suggestion that it might be necessary
// to refresh the apiserver address from the supervisor.
go refreshFromSupervisor(ctx)
options.FieldSelector = fieldSelector
return endpoints.List(ctx, options)
},
Expand Down Expand Up @@ -364,38 +364,7 @@ func (a *agentTunnel) watchEndpoints(ctx context.Context, apiServerReady <-chan
// goroutine that sleeps for a short period before checking for changes and updating
// the proxy addresses. If another update occurs, the previous update operation
// will be cancelled and a new one queued.
go func() {
select {
case <-time.After(endpointDebounceDelay):
case <-debounceCtx.Done():
return
}

newAddresses := util.GetAddresses(endpoint)
if reflect.DeepEqual(newAddresses, proxy.SupervisorAddresses()) {
return
}
proxy.Update(newAddresses)

validEndpoint := map[string]bool{}

for _, address := range proxy.SupervisorAddresses() {
validEndpoint[address] = true
if _, ok := disconnect[address]; !ok {
conn := a.connect(ctx, nil, address, tlsConfig)
disconnect[address] = conn.cancel
proxy.SetHealthCheck(address, conn.healthCheck)
}
}

for address, cancel := range disconnect {
if !validEndpoint[address] {
cancel()
delete(disconnect, address)
logrus.Infof("Stopped tunnel to %s", address)
}
}
}()
go syncProxyAddresses(debounceCtx, util.GetAddresses(endpoint))
}
}
}
Expand Down Expand Up @@ -494,6 +463,9 @@ func (a *agentTunnel) isKubeletOrStreamPort(proto, host, port string) bool {
return proto == "tcp" && (host == "127.0.0.1" || host == "::1") && (port == a.kubeletPort || port == daemonconfig.StreamServerPort)
}

// proxySyncer is a common signature for functions that sync the proxy address list with a context
type proxySyncer func(ctx context.Context, addresses []string)

// dialContext dials a local connection on behalf of the remote server. If the
// connection is to the kubelet port on the loopback address, the kubelet is dialed
// at its configured bind address. Otherwise, the connection is dialed normally.
Expand All @@ -507,3 +479,77 @@ func (a *agentTunnel) dialContext(ctx context.Context, network, address string)
}
return defaultDialer.DialContext(ctx, network, address)
}

// getProxySyncer returns a function that can be called to update the list of supervisors.
// This function is responsible for connecting to or disconnecting websocket tunnels,
// as well as updating the proxy loadbalancer server list.
func (a *agentTunnel) getProxySyncer(ctx context.Context, wg *sync.WaitGroup, tlsConfig *tls.Config, proxy proxy.Proxy) proxySyncer {
disconnect := map[string]context.CancelFunc{}
// Attempt to connect to supervisors, storing their cancellation function for later when we
// need to disconnect.
for _, address := range proxy.SupervisorAddresses() {
if _, ok := disconnect[address]; !ok {
conn := a.connect(ctx, wg, address, tlsConfig)
disconnect[address] = conn.cancel
proxy.SetHealthCheck(address, conn.healthCheck)
}
}

return func(debounceCtx context.Context, addresses []string) {
select {
case <-time.After(endpointDebounceDelay):
case <-debounceCtx.Done():
return
}

newAddresses := sets.New(addresses...)
curAddresses := sets.New(proxy.SupervisorAddresses()...)
if newAddresses.Equal(curAddresses) {
return
}

proxy.Update(addresses)

// add new servers
for address := range newAddresses.Difference(curAddresses) {
if _, ok := disconnect[address]; !ok {
conn := a.connect(ctx, nil, address, tlsConfig)
logrus.Infof("Started tunnel to %s", address)
disconnect[address] = conn.cancel
proxy.SetHealthCheck(address, conn.healthCheck)
}
}

// remove old servers
for address := range curAddresses.Difference(newAddresses) {
if cancel, ok := disconnect[address]; ok {
cancel()
delete(disconnect, address)
logrus.Infof("Stopped tunnel to %s", address)
}
}
}
}

// getAPIServersRequester returns a function that can be called to update the
// proxy apiserver endpoints with addresses retrieved from the supervisor.
func getAPIServersRequester(node *daemonconfig.Node, proxy proxy.Proxy, syncProxyAddresses proxySyncer) func(ctx context.Context) {
var info *clientaccess.Info
return func(ctx context.Context) {
if info == nil {
var err error
withCert := clientaccess.WithClientCertificate(node.AgentConfig.ClientKubeletCert, node.AgentConfig.ClientKubeletKey)
info, err = clientaccess.ParseAndValidateToken(proxy.SupervisorURL(), node.Token, withCert)
if err != nil {
logrus.Warnf("Failed to validate server token: %v", err)
return
}
}

if addresses, err := agentconfig.GetAPIServers(ctx, info); err != nil {
logrus.Warnf("Failed to get apiserver addresses from supervisor: %v", err)
} else {
syncProxyAddresses(ctx, addresses)
}
}
}

0 comments on commit 18cf2b4

Please sign in to comment.