Skip to content

Commit

Permalink
collector: add kepler_vm metrics (#931)
Browse files Browse the repository at this point in the history
Signed-off-by: Anthony Harivel <[email protected]>
  • Loading branch information
aharivel authored Sep 18, 2023
1 parent 74cbd7b commit 4bf7a09
Show file tree
Hide file tree
Showing 15 changed files with 791 additions and 5 deletions.
44 changes: 40 additions & 4 deletions pkg/collector/container_hc_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ limitations under the License.
package collector

import (
"strconv"
"unsafe"

"github.com/sustainable-computing-io/kepler/pkg/bpfassets/attacher"
"github.com/sustainable-computing-io/kepler/pkg/cgroup"
collector_metric "github.com/sustainable-computing-io/kepler/pkg/collector/metric"
"github.com/sustainable-computing-io/kepler/pkg/config"
"github.com/sustainable-computing-io/kepler/pkg/libvirt"

"k8s.io/klog/v2"
)
Expand All @@ -33,7 +35,7 @@ import "C"
type ProcessBPFMetrics = attacher.ProcessBPFMetrics

// updateBasicBPF
func (c *Collector) updateBasicBPF(containerID string, ct *ProcessBPFMetrics, isSystemProcess bool) {
func (c *Collector) updateBasicBPF(containerID string, ct *ProcessBPFMetrics, isSystemProcess, isSystemVM bool) {
// update ebpf metrics
// first update CPU time
err := c.ContainersMetrics[containerID].CPUTime.AddNewDelta(ct.ProcessRunTime)
Expand All @@ -56,10 +58,16 @@ func (c *Collector) updateBasicBPF(containerID string, ct *ProcessBPFMetrics, is
}
}
}
// track virtual machine metrics
if isSystemVM && config.EnableProcessMetrics {
for i := 0; i < config.MaxIRQ; i++ {
c.VMMetrics[ct.PID].SoftIRQCount = c.ProcessMetrics[ct.PID].SoftIRQCount
}
}
}

// updateHWCounters
func (c *Collector) updateHWCounters(containerID string, ct *ProcessBPFMetrics, isSystemProcess bool) {
func (c *Collector) updateHWCounters(containerID string, ct *ProcessBPFMetrics, isSystemProcess, isSystemVM bool) {
// update HW counters
for _, counterKey := range collector_metric.AvailableHWCounters {
var val uint64
Expand All @@ -84,14 +92,20 @@ func (c *Collector) updateHWCounters(containerID string, ct *ProcessBPFMetrics,
klog.V(5).Infoln(err)
}
}
// track virtual machine metrics
if isSystemVM && config.EnableProcessMetrics {
c.VMMetrics[ct.PID].CounterStats = c.ProcessMetrics[ct.PID].CounterStats
}
}
}

// updateBPFMetrics reads the BPF tables with process/pid/cgroupid metrics (CPU time, available HW counters)
func (c *Collector) updateBPFMetrics() {
foundContainer := make(map[string]bool)
foundProcess := make(map[uint64]bool)
foundVM := make(map[uint64]bool)
processesData, err := attacher.CollectProcesses()
vmPIDList, _ := libvirt.GetCurrentVMPID()
if err != nil {
return
}
Expand All @@ -116,12 +130,21 @@ func (c *Collector) updateBPFMetrics() {
if err != nil {
klog.V(5).Infoln(err)
}
for vmpid, name := range vmPIDList {
pid, _ := strconv.ParseUint(vmpid, 10, 64)

if pid == ct.PID {
c.createVMMetricsIfNotExist(ct.PID, name)
foundVM[pid] = true
c.VMMetrics[ct.PID].CPUTime = c.ProcessMetrics[ct.PID].CPUTime
}
}
}

c.ContainersMetrics[containerID].CurrProcesses++

c.updateBasicBPF(containerID, &ct, isSystemProcess)
c.updateHWCounters(containerID, &ct, isSystemProcess)
c.updateBasicBPF(containerID, &ct, isSystemProcess, foundVM[ct.PID])
c.updateHWCounters(containerID, &ct, isSystemProcess, foundVM[ct.PID])

// TODO: improve the removal of deleted containers from ContainersMetrics. Currently we verify the maxInactiveContainers using the foundContainer map
foundContainer[containerID] = true
Expand All @@ -132,6 +155,7 @@ func (c *Collector) updateBPFMetrics() {
c.handleInactiveContainers(foundContainer)
if config.EnableProcessMetrics {
c.handleInactiveProcesses(foundProcess)
c.handleInactiveVM(foundVM)
}
}

Expand Down Expand Up @@ -166,3 +190,15 @@ func (c *Collector) handleInactiveProcesses(foundProcess map[uint64]bool) {
}
}
}

// handleInactiveVirtualMachine
func (c *Collector) handleInactiveVM(foundVM map[uint64]bool) {
numOfInactive := len(c.VMMetrics) - len(foundVM)
if numOfInactive > maxInactiveVM {
for pid := range c.VMMetrics {
if _, found := foundVM[pid]; !found {
delete(c.VMMetrics, pid)
}
}
}
}
118 changes: 118 additions & 0 deletions pkg/collector/metric/vm_metric.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
Copyright 2023.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package metric

import (
"github.com/sustainable-computing-io/kepler/pkg/collector/metric/types"
"github.com/sustainable-computing-io/kepler/pkg/config"
"github.com/sustainable-computing-io/kepler/pkg/power/accelerator/gpu"
)

var (
// VMMetricNames holds the list of names of the vm metric
VMMetricNames []string
// VMFloatFeatureNames holds the feature name of the vm float collector_metric. This is specific for the machine-learning based models.
VMFloatFeatureNames []string = []string{}
// VMUintFeaturesNames holds the feature name of the vm utint collector_metric. This is specific for the machine-learning based models.
VMUintFeaturesNames []string
// VMFeaturesNames holds all the feature name of the vm collector_metric. This is specific for the machine-learning based models.
VMFeaturesNames []string
)

type VMMetrics struct {
PID uint64
Name string
CounterStats map[string]*types.UInt64Stat
// ebpf metrics
CPUTime *types.UInt64Stat
SoftIRQCount []types.UInt64Stat
DynEnergyInCore *types.UInt64Stat
DynEnergyInDRAM *types.UInt64Stat
DynEnergyInUncore *types.UInt64Stat
DynEnergyInPkg *types.UInt64Stat
DynEnergyInGPU *types.UInt64Stat
DynEnergyInOther *types.UInt64Stat
DynEnergyInPlatform *types.UInt64Stat

IdleEnergyInCore *types.UInt64Stat
IdleEnergyInDRAM *types.UInt64Stat
IdleEnergyInUncore *types.UInt64Stat
IdleEnergyInPkg *types.UInt64Stat
IdleEnergyInGPU *types.UInt64Stat
IdleEnergyInOther *types.UInt64Stat
IdleEnergyInPlatform *types.UInt64Stat
}

// NewVMMetrics creates a new VMMetrics instance
func NewVMMetrics(pid uint64, name string) *VMMetrics {
p := &VMMetrics{
PID: pid,
Name: name,
CPUTime: &types.UInt64Stat{},
CounterStats: make(map[string]*types.UInt64Stat),
SoftIRQCount: make([]types.UInt64Stat, config.MaxIRQ),
DynEnergyInCore: &types.UInt64Stat{},
DynEnergyInDRAM: &types.UInt64Stat{},
DynEnergyInUncore: &types.UInt64Stat{},
DynEnergyInPkg: &types.UInt64Stat{},
DynEnergyInOther: &types.UInt64Stat{},
DynEnergyInGPU: &types.UInt64Stat{},
DynEnergyInPlatform: &types.UInt64Stat{},
IdleEnergyInCore: &types.UInt64Stat{},
IdleEnergyInDRAM: &types.UInt64Stat{},
IdleEnergyInUncore: &types.UInt64Stat{},
IdleEnergyInPkg: &types.UInt64Stat{},
IdleEnergyInOther: &types.UInt64Stat{},
IdleEnergyInGPU: &types.UInt64Stat{},
IdleEnergyInPlatform: &types.UInt64Stat{},
}

for _, metricName := range AvailableHWCounters {
p.CounterStats[metricName] = &types.UInt64Stat{}
}
// TODO: transparently list the other metrics and do not initialize them when they are not supported, e.g. HC
if gpu.IsGPUCollectionSupported() {
p.CounterStats[config.GPUSMUtilization] = &types.UInt64Stat{}
p.CounterStats[config.GPUMemUtilization] = &types.UInt64Stat{}
}
return p
}

// ResetCurr reset all current value to 0
func (p *VMMetrics) ResetDeltaValues() {
p.CPUTime.ResetDeltaValues()
for counterKey := range p.CounterStats {
p.CounterStats[counterKey].ResetDeltaValues()
}
for i := 0; i < config.MaxIRQ; i++ {
p.SoftIRQCount[i].ResetDeltaValues()
}
p.DynEnergyInCore.ResetDeltaValues()
p.DynEnergyInDRAM.ResetDeltaValues()
p.DynEnergyInUncore.ResetDeltaValues()
p.DynEnergyInPkg.ResetDeltaValues()
p.DynEnergyInOther.ResetDeltaValues()
p.DynEnergyInGPU.ResetDeltaValues()
p.DynEnergyInPlatform.ResetDeltaValues()
p.IdleEnergyInCore.ResetDeltaValues()
p.IdleEnergyInDRAM.ResetDeltaValues()
p.IdleEnergyInUncore.ResetDeltaValues()
p.IdleEnergyInPkg.ResetDeltaValues()
p.IdleEnergyInOther.ResetDeltaValues()
p.IdleEnergyInGPU.ResetDeltaValues()
p.IdleEnergyInPlatform.ResetDeltaValues()
}
15 changes: 15 additions & 0 deletions pkg/collector/metric/vm_metric_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package metric

import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

var _ = Describe("VMMetric", func() {

It("Test ResetDeltaValues", func() {
p := NewVMMetrics(0, "name")
p.ResetDeltaValues()
Expect(p.CPUTime.Delta).To(Equal(uint64(0)))
})
})
6 changes: 6 additions & 0 deletions pkg/collector/metric_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
const (
maxInactiveContainers = 10
maxInactiveProcesses = 5
maxInactiveVM = 3
)

type Collector struct {
Expand All @@ -48,6 +49,9 @@ type Collector struct {
// ProcessMetrics hold all process energy and resource usage metrics
ProcessMetrics map[uint64]*collector_metric.ProcessMetrics

// VMMetrics hold all Virtual Machine energy and resource usage metrics
VMMetrics map[uint64]*collector_metric.VMMetrics

// generic names to be used for process that are not within a pod
systemProcessName string
systemProcessNamespace string
Expand All @@ -58,6 +62,7 @@ func NewCollector() *Collector {
NodeMetrics: *collector_metric.NewNodeMetrics(),
ContainersMetrics: map[string]*collector_metric.ContainerMetrics{},
ProcessMetrics: map[uint64]*collector_metric.ProcessMetrics{},
VMMetrics: map[uint64]*collector_metric.VMMetrics{},
systemProcessName: utils.SystemProcessName,
systemProcessNamespace: utils.SystemProcessNamespace,
}
Expand Down Expand Up @@ -127,6 +132,7 @@ func (c *Collector) Update() {
// calculate the process energy consumption using its resource utilization and the node components energy consumption
if config.EnableProcessMetrics {
c.updateProcessEnergy()
c.updateVMEnergy()
}

// check the log verbosity level before iterating in all container
Expand Down
1 change: 1 addition & 0 deletions pkg/collector/metric_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ func newMockCollector() *Collector {
metricCollector := NewCollector()
metricCollector.ContainersMetrics = createMockContainersMetrics()
metricCollector.ProcessMetrics = map[uint64]*collector_metric.ProcessMetrics{}
metricCollector.VMMetrics = map[uint64]*collector_metric.VMMetrics{}
metricCollector.NodeMetrics = createMockNodeMetrics(metricCollector.ContainersMetrics)

return metricCollector
Expand Down
8 changes: 8 additions & 0 deletions pkg/collector/prometheus_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ type PrometheusCollector struct {
containerDesc *ContainerDesc
podDesc *PodDesc
processDesc *processDesc
vmDesc *vmDesc

// NodeMetrics holds all node energy and resource usage metrics
NodeMetrics *collector_metric.NodeMetrics
Expand All @@ -112,6 +113,9 @@ type PrometheusCollector struct {
// ProcessMetrics hold all process energy and resource usage metrics
ProcessMetrics *map[uint64]*collector_metric.ProcessMetrics

// VMMetrics hold all Virtual Machine energy and resource usage metrics
VMMetrics *map[uint64]*collector_metric.VMMetrics

// SamplePeriodSec the collector metric collection interval
SamplePeriodSec float64

Expand All @@ -132,11 +136,13 @@ func NewPrometheusExporter() *PrometheusCollector {
nodeDesc: &NodeDesc{},
podDesc: &PodDesc{},
processDesc: &processDesc{},
vmDesc: &vmDesc{},
}
exporter.newNodeMetrics()
exporter.newContainerMetrics()
exporter.newPodMetrics()
exporter.newprocessMetrics()
exporter.newVMMetrics()
return &exporter
}

Expand Down Expand Up @@ -216,6 +222,7 @@ func (p *PrometheusCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- p.containerDesc.containerBlockIRQTotal
}
p.describeProcess(ch)
p.describeVM(ch)
}

func (p *PrometheusCollector) newNodeMetrics() {
Expand Down Expand Up @@ -451,6 +458,7 @@ func (p *PrometheusCollector) Collect(ch chan<- prometheus.Metric) {
p.updateNodeMetrics(&wg, ch)
p.updatePodMetrics(&wg, ch)
p.updateProcessMetrics(&wg, ch)
p.updateVMMetrics(&wg, ch)
wg.Wait()
}

Expand Down
1 change: 1 addition & 0 deletions pkg/collector/prometheus_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func newMockPrometheusExporter() *PrometheusCollector {
exporter.NodeMetrics = collector_metric.NewNodeMetrics()
exporter.ContainersMetrics = &map[string]*collector_metric.ContainerMetrics{}
exporter.ProcessMetrics = &map[uint64]*collector_metric.ProcessMetrics{}
exporter.VMMetrics = &map[uint64]*collector_metric.VMMetrics{}
exporter.SamplePeriodSec = 3.0
collector_metric.ContainerFeaturesNames = []string{config.CoreUsageMetric}
collector_metric.NodeMetadataFeatureNames = []string{"cpu_architecture"}
Expand Down
2 changes: 1 addition & 1 deletion pkg/collector/prometheus_process_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

//nolint:dupl // should be refactor with vm collector
package collector

import (
Expand Down
Loading

0 comments on commit 4bf7a09

Please sign in to comment.