Skip to content

Commit

Permalink
Notifier update; MemberState update (#14294)
Browse files Browse the repository at this point in the history
This includes preparatory changes as described in #14247:

- Notifier improvements
  - Use `state.ServerClustered`
  - Don't query for `OfflineThreshold` unless needed (see commit msg)
  - Allow passing `db.NodeInfo` to `NewNotifier`
- Pass `db.NodeInfo` to the notifier hook to reduce the need for extra
http calls for basic info (like the target server's name)
- Member state updates
  - Add `CPUThreads` to `ClusterMemberSysInfo` (name tbd)
  - Refactor `MemberState` to expose local `ClusterMemberSysinfo`
- Implement helper to collect member state from all cluster members,
using the notifier improvements above
- Other odds and ends

Performance improvements for project limits will be in a separate PR.
  • Loading branch information
tomponline authored Oct 21, 2024
2 parents 860fdb0 + a608e3b commit d638232
Show file tree
Hide file tree
Showing 26 changed files with 316 additions and 119 deletions.
4 changes: 4 additions & 0 deletions doc/rest-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,10 @@ definitions:
type: number
type: array
x-go-name: LoadAverages
logical_cpus:
format: uint64
type: integer
x-go-name: LogicalCPUs
processes:
format: uint16
type: integer
Expand Down
8 changes: 3 additions & 5 deletions lxd/api_1.0.go
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,7 @@ func doAPI10Update(d *Daemon, r *http.Request, req api.ServerPut, patch bool) re
// Then deal with cluster wide configuration
var clusterChanged map[string]string
var newClusterConfig *clusterConfig.Config
oldClusterConfig := make(map[string]any)
var oldClusterConfig map[string]any

err = s.DB.Cluster.Transaction(context.Background(), func(ctx context.Context, tx *db.ClusterTx) error {
var err error
Expand All @@ -711,9 +711,7 @@ func doAPI10Update(d *Daemon, r *http.Request, req api.ServerPut, patch bool) re
}

// Keep old config around in case something goes wrong. In that case the config will be reverted.
for k, v := range newClusterConfig.Dump() {
oldClusterConfig[k] = v
}
oldClusterConfig = newClusterConfig.Dump()

if patch {
clusterChanged, err = newClusterConfig.Patch(req.Config)
Expand Down Expand Up @@ -767,7 +765,7 @@ func doAPI10Update(d *Daemon, r *http.Request, req api.ServerPut, patch bool) re
return response.SmartError(err)
}

err = notifier(func(client lxd.InstanceServer) error {
err = notifier(func(member db.NodeInfo, client lxd.InstanceServer) error {
server, etag, err := client.GetServer()
if err != nil {
return err
Expand Down
7 changes: 6 additions & 1 deletion lxd/api_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1634,6 +1634,11 @@ func updateClusterNode(s *state.State, gateway *cluster.Gateway, r *http.Request
return response.SmartError(err)
}

resp := forwardedResponseToNode(s, r, name)
if resp != nil {
return resp
}

leaderAddress, err := gateway.LeaderAddress()
if err != nil {
return response.InternalError(err)
Expand Down Expand Up @@ -2974,7 +2979,7 @@ func clusterNodeStateGet(d *Daemon, r *http.Request) response.Response {
return resp
}

memberState, err := cluster.MemberState(r.Context(), s, memberName)
memberState, err := cluster.MemberState(r.Context(), s)
if err != nil {
return response.SmartError(err)
}
Expand Down
4 changes: 2 additions & 2 deletions lxd/auth_groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -713,7 +713,7 @@ func renameAuthGroup(d *Daemon, r *http.Request) response.Response {
return response.SmartError(err)
}

err = notifier(func(client lxd.InstanceServer) error {
err = notifier(func(member db.NodeInfo, client lxd.InstanceServer) error {
_, _, err := client.RawQuery(http.MethodPost, "/internal/identity-cache-refresh", nil, "")
return err
})
Expand Down Expand Up @@ -774,7 +774,7 @@ func deleteAuthGroup(d *Daemon, r *http.Request) response.Response {
return response.SmartError(err)
}

err = notifier(func(client lxd.InstanceServer) error {
err = notifier(func(member db.NodeInfo, client lxd.InstanceServer) error {
_, _, err := client.RawQuery(http.MethodPost, "/internal/identity-cache-refresh", nil, "")
return err
})
Expand Down
6 changes: 3 additions & 3 deletions lxd/certificates.go
Original file line number Diff line number Diff line change
Expand Up @@ -710,7 +710,7 @@ func certificatesPost(d *Daemon, r *http.Request) response.Response {
Type: api.CertificateTypeClient,
}

err = notifier(func(client lxd.InstanceServer) error {
err = notifier(func(member db.NodeInfo, client lxd.InstanceServer) error {
_, _, err := client.RawQuery(http.MethodPost, "/internal/identity-cache-refresh", nil, "")
return err
})
Expand Down Expand Up @@ -1051,7 +1051,7 @@ func doCertificateUpdate(ctx context.Context, d *Daemon, dbInfo api.Certificate,
return response.SmartError(err)
}

err = notifier(func(client lxd.InstanceServer) error {
err = notifier(func(member db.NodeInfo, client lxd.InstanceServer) error {
_, _, err := client.RawQuery(http.MethodPost, "/internal/identity-cache-refresh", nil, "")
return err
})
Expand Down Expand Up @@ -1162,7 +1162,7 @@ func certificateDelete(d *Daemon, r *http.Request) response.Response {
return response.SmartError(err)
}

err = notifier(func(client lxd.InstanceServer) error {
err = notifier(func(member db.NodeInfo, client lxd.InstanceServer) error {
_, _, err := client.RawQuery(http.MethodPost, "/internal/identity-cache-refresh", nil, "")
return err
})
Expand Down
122 changes: 106 additions & 16 deletions lxd/cluster/member_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,18 @@ import (
"context"
"fmt"
"os"
"runtime"
"strconv"
"strings"
"sync"

"golang.org/x/sys/unix"

"github.com/canonical/lxd/client"
"github.com/canonical/lxd/lxd/db"
"github.com/canonical/lxd/lxd/state"
storagePools "github.com/canonical/lxd/lxd/storage"
"github.com/canonical/lxd/shared"
"github.com/canonical/lxd/shared/api"
"github.com/canonical/lxd/shared/logger"
)
Expand Down Expand Up @@ -45,35 +49,121 @@ func getLoadAvgs() ([]float64, error) {
return loadAvgs, nil
}

// MemberState retrieves state information about the cluster member.
func MemberState(ctx context.Context, s *state.State, memberName string) (*api.ClusterMemberState, error) {
var err error
var memberState api.ClusterMemberState

// LocalSysInfo retrieves system information about a cluster member.
func LocalSysInfo() (*api.ClusterMemberSysInfo, error) {
// Get system info.
info := unix.Sysinfo_t{}
err = unix.Sysinfo(&info)
err := unix.Sysinfo(&info)
if err != nil {
logger.Warn("Failed getting sysinfo", logger.Ctx{"err": err})

return nil, err
}

sysInfo := &api.ClusterMemberSysInfo{}

// Account for different representations of Sysinfo_t on different architectures.
memberState.SysInfo.Uptime = int64(info.Uptime)
memberState.SysInfo.TotalRAM = uint64(info.Totalram)
memberState.SysInfo.SharedRAM = uint64(info.Sharedram)
memberState.SysInfo.BufferRAM = uint64(info.Bufferram)
memberState.SysInfo.FreeRAM = uint64(info.Freeram)
memberState.SysInfo.TotalSwap = uint64(info.Totalswap)
memberState.SysInfo.FreeSwap = uint64(info.Freeswap)

memberState.SysInfo.Processes = info.Procs
memberState.SysInfo.LoadAverages, err = getLoadAvgs()
sysInfo.Uptime = int64(info.Uptime)
sysInfo.TotalRAM = uint64(info.Totalram)
sysInfo.SharedRAM = uint64(info.Sharedram)
sysInfo.BufferRAM = uint64(info.Bufferram)
sysInfo.FreeRAM = uint64(info.Freeram)
sysInfo.TotalSwap = uint64(info.Totalswap)
sysInfo.FreeSwap = uint64(info.Freeswap)

sysInfo.Processes = info.Procs
sysInfo.LoadAverages, err = getLoadAvgs()
if err != nil {
return nil, fmt.Errorf("Failed getting load averages: %w", err)
}

// NumCPU gives the number of threads available to the LXD server at startup,
// not the currently available number of threads.
sysInfo.LogicalCPUs = uint64(runtime.NumCPU())

return sysInfo, nil
}

// ClusterState returns a map from clusterMemberName -> state for every member
// of the cluster. This requires an HTTP call to the rest of the cluster.
func ClusterState(s *state.State, networkCert *shared.CertInfo, members ...db.NodeInfo) (map[string]api.ClusterMemberState, error) {
serverCert := s.ServerCert()

notifier, err := NewNotifier(s, networkCert, serverCert, NotifyAll, members...)
if err != nil {
return nil, err
}

type stateTuple struct {
name string
state *api.ClusterMemberState
}

memberStates := make(map[string]api.ClusterMemberState)
statesChan := make(chan stateTuple)

var wg sync.WaitGroup
wg.Add(1)
go func() {
for state := range statesChan {
memberStates[state.name] = *state.state
}

wg.Done()
}()

err = notifier(func(member db.NodeInfo, client lxd.InstanceServer) error {
state, _, err := client.GetClusterMemberState(member.Name)
if err != nil {
return err
}

statesChan <- stateTuple{
name: member.Name,
state: state,
}

return nil
})
if err != nil {
return nil, err
}

close(statesChan)

includeLocalMember := len(members) == 0
for _, member := range members {
if member.Name == s.ServerName {
includeLocalMember = true
break
}
}

wg.Wait()

if includeLocalMember {
localState, err := MemberState(context.TODO(), s)
if err != nil {
return nil, fmt.Errorf("Failed to get local member state: %w", err)
}

memberStates[s.ServerName] = *localState
}

return memberStates, nil
}

// MemberState retrieves state information about the cluster member.
func MemberState(ctx context.Context, s *state.State) (*api.ClusterMemberState, error) {
var memberState api.ClusterMemberState

sysInfo, err := LocalSysInfo()
if err != nil {
return nil, err
}

memberState.SysInfo = *sysInfo

// Get storage pool states.
stateCreated := db.StoragePoolCreated

Expand Down
77 changes: 77 additions & 0 deletions lxd/cluster/member_state_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package cluster_test

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/canonical/lxd/lxd/cluster"
"github.com/canonical/lxd/lxd/db"
"github.com/canonical/lxd/lxd/node"
"github.com/canonical/lxd/lxd/state"
"github.com/canonical/lxd/shared"
)

func TestClusterState(t *testing.T) {
state, cleanup := state.NewTestState(t)
defer cleanup()

cert := shared.TestingKeyPair()

state.ServerCert = func() *shared.CertInfo { return cert }

f := notifyFixtures{t: t, state: state}
cleanupF := f.Nodes(cert, 3)
defer cleanupF()

// Populate state.LocalConfig after nodes created above.
var err error
var nodeConfig *node.Config
err = state.DB.Node.Transaction(context.TODO(), func(ctx context.Context, tx *db.NodeTx) error {
nodeConfig, err = node.ConfigLoad(ctx, tx)
return err
})
require.NoError(t, err)

state.LocalConfig = nodeConfig

states, err := cluster.ClusterState(state, cert)
require.NoError(t, err)

assert.Equal(t, 3, len(states))

for clusterMemberName, state := range states {
// Local cluster member
if clusterMemberName == "0" {
assert.Greater(t, state.SysInfo.LogicalCPUs, uint64(0))
continue
}

assert.Equal(t, uint64(24), state.SysInfo.LogicalCPUs)
}

var members []db.NodeInfo
err = state.DB.Cluster.Transaction(context.TODO(), func(ctx context.Context, tx *db.ClusterTx) error {
members, err = tx.GetNodes(ctx)
return err
})
require.NoError(t, err)

for i, memberInfo := range members {
if memberInfo.Name == "0" {
members[i] = members[len(members)-1]
members = members[:len(members)-1]
break
}
}

states, err = cluster.ClusterState(state, cert, members...)
require.NoError(t, err)

assert.Equal(t, 2, len(states))
for _, state := range states {
assert.Equal(t, uint64(24), state.SysInfo.LogicalCPUs)
}
}
Loading

0 comments on commit d638232

Please sign in to comment.