diff --git a/pkg/collector/container_hc_collector.go b/pkg/collector/container_hc_collector.go index 060dff8492..ac8172e918 100644 --- a/pkg/collector/container_hc_collector.go +++ b/pkg/collector/container_hc_collector.go @@ -17,12 +17,14 @@ limitations under the License. package collector import ( + "strconv" "unsafe" "github.com/sustainable-computing-io/kepler/pkg/bpfassets/attacher" "github.com/sustainable-computing-io/kepler/pkg/cgroup" collector_metric "github.com/sustainable-computing-io/kepler/pkg/collector/metric" "github.com/sustainable-computing-io/kepler/pkg/config" + "github.com/sustainable-computing-io/kepler/pkg/libvirt" "k8s.io/klog/v2" ) @@ -33,7 +35,7 @@ import "C" type ProcessBPFMetrics = attacher.ProcessBPFMetrics // updateBasicBPF -func (c *Collector) updateBasicBPF(containerID string, ct *ProcessBPFMetrics, isSystemProcess bool) { +func (c *Collector) updateBasicBPF(containerID string, ct *ProcessBPFMetrics, isSystemProcess, isSystemVM bool) { // update ebpf metrics // first update CPU time err := c.ContainersMetrics[containerID].CPUTime.AddNewDelta(ct.ProcessRunTime) @@ -56,10 +58,16 @@ func (c *Collector) updateBasicBPF(containerID string, ct *ProcessBPFMetrics, is } } } + // track virtual machine metrics + if isSystemVM && config.EnableProcessMetrics { + for i := 0; i < config.MaxIRQ; i++ { + c.VMMetrics[ct.PID].SoftIRQCount = c.ProcessMetrics[ct.PID].SoftIRQCount + } + } } // updateHWCounters -func (c *Collector) updateHWCounters(containerID string, ct *ProcessBPFMetrics, isSystemProcess bool) { +func (c *Collector) updateHWCounters(containerID string, ct *ProcessBPFMetrics, isSystemProcess, isSystemVM bool) { // update HW counters for _, counterKey := range collector_metric.AvailableHWCounters { var val uint64 @@ -84,6 +92,10 @@ func (c *Collector) updateHWCounters(containerID string, ct *ProcessBPFMetrics, klog.V(5).Infoln(err) } } + // track virtual machine metrics + if isSystemVM && config.EnableProcessMetrics { + c.VMMetrics[ct.PID].CounterStats = c.ProcessMetrics[ct.PID].CounterStats + } } } @@ -91,7 +103,9 @@ func (c *Collector) updateHWCounters(containerID string, ct *ProcessBPFMetrics, func (c *Collector) updateBPFMetrics() { foundContainer := make(map[string]bool) foundProcess := make(map[uint64]bool) + foundVM := make(map[uint64]bool) processesData, err := attacher.CollectProcesses() + vmPIDList, _ := libvirt.GetCurrentVMPID() if err != nil { return } @@ -116,12 +130,21 @@ func (c *Collector) updateBPFMetrics() { if err != nil { klog.V(5).Infoln(err) } + for vmpid, name := range vmPIDList { + pid, _ := strconv.ParseUint(vmpid, 10, 64) + + if pid == ct.PID { + c.createVMMetricsIfNotExist(ct.PID, name) + foundVM[pid] = true + c.VMMetrics[ct.PID].CPUTime = c.ProcessMetrics[ct.PID].CPUTime + } + } } c.ContainersMetrics[containerID].CurrProcesses++ - c.updateBasicBPF(containerID, &ct, isSystemProcess) - c.updateHWCounters(containerID, &ct, isSystemProcess) + c.updateBasicBPF(containerID, &ct, isSystemProcess, foundVM[ct.PID]) + c.updateHWCounters(containerID, &ct, isSystemProcess, foundVM[ct.PID]) // TODO: improve the removal of deleted containers from ContainersMetrics. Currently we verify the maxInactiveContainers using the foundContainer map foundContainer[containerID] = true @@ -132,6 +155,7 @@ func (c *Collector) updateBPFMetrics() { c.handleInactiveContainers(foundContainer) if config.EnableProcessMetrics { c.handleInactiveProcesses(foundProcess) + c.handleInactiveVM(foundVM) } } @@ -166,3 +190,15 @@ func (c *Collector) handleInactiveProcesses(foundProcess map[uint64]bool) { } } } + +// handleInactiveVirtualMachine +func (c *Collector) handleInactiveVM(foundVM map[uint64]bool) { + numOfInactive := len(c.VMMetrics) - len(foundVM) + if numOfInactive > maxInactiveVM { + for pid := range c.VMMetrics { + if _, found := foundVM[pid]; !found { + delete(c.VMMetrics, pid) + } + } + } +} diff --git a/pkg/collector/metric/vm_metric.go b/pkg/collector/metric/vm_metric.go new file mode 100644 index 0000000000..566a1266a2 --- /dev/null +++ b/pkg/collector/metric/vm_metric.go @@ -0,0 +1,118 @@ +/* +Copyright 2023. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metric + +import ( + "github.com/sustainable-computing-io/kepler/pkg/collector/metric/types" + "github.com/sustainable-computing-io/kepler/pkg/config" + "github.com/sustainable-computing-io/kepler/pkg/power/accelerator/gpu" +) + +var ( + // VMMetricNames holds the list of names of the vm metric + VMMetricNames []string + // VMFloatFeatureNames holds the feature name of the vm float collector_metric. This is specific for the machine-learning based models. + VMFloatFeatureNames []string = []string{} + // VMUintFeaturesNames holds the feature name of the vm utint collector_metric. This is specific for the machine-learning based models. + VMUintFeaturesNames []string + // VMFeaturesNames holds all the feature name of the vm collector_metric. This is specific for the machine-learning based models. + VMFeaturesNames []string +) + +type VMMetrics struct { + PID uint64 + Name string + CounterStats map[string]*types.UInt64Stat + // ebpf metrics + CPUTime *types.UInt64Stat + SoftIRQCount []types.UInt64Stat + DynEnergyInCore *types.UInt64Stat + DynEnergyInDRAM *types.UInt64Stat + DynEnergyInUncore *types.UInt64Stat + DynEnergyInPkg *types.UInt64Stat + DynEnergyInGPU *types.UInt64Stat + DynEnergyInOther *types.UInt64Stat + DynEnergyInPlatform *types.UInt64Stat + + IdleEnergyInCore *types.UInt64Stat + IdleEnergyInDRAM *types.UInt64Stat + IdleEnergyInUncore *types.UInt64Stat + IdleEnergyInPkg *types.UInt64Stat + IdleEnergyInGPU *types.UInt64Stat + IdleEnergyInOther *types.UInt64Stat + IdleEnergyInPlatform *types.UInt64Stat +} + +// NewVMMetrics creates a new VMMetrics instance +func NewVMMetrics(pid uint64, name string) *VMMetrics { + p := &VMMetrics{ + PID: pid, + Name: name, + CPUTime: &types.UInt64Stat{}, + CounterStats: make(map[string]*types.UInt64Stat), + SoftIRQCount: make([]types.UInt64Stat, config.MaxIRQ), + DynEnergyInCore: &types.UInt64Stat{}, + DynEnergyInDRAM: &types.UInt64Stat{}, + DynEnergyInUncore: &types.UInt64Stat{}, + DynEnergyInPkg: &types.UInt64Stat{}, + DynEnergyInOther: &types.UInt64Stat{}, + DynEnergyInGPU: &types.UInt64Stat{}, + DynEnergyInPlatform: &types.UInt64Stat{}, + IdleEnergyInCore: &types.UInt64Stat{}, + IdleEnergyInDRAM: &types.UInt64Stat{}, + IdleEnergyInUncore: &types.UInt64Stat{}, + IdleEnergyInPkg: &types.UInt64Stat{}, + IdleEnergyInOther: &types.UInt64Stat{}, + IdleEnergyInGPU: &types.UInt64Stat{}, + IdleEnergyInPlatform: &types.UInt64Stat{}, + } + + for _, metricName := range AvailableHWCounters { + p.CounterStats[metricName] = &types.UInt64Stat{} + } + // TODO: transparently list the other metrics and do not initialize them when they are not supported, e.g. HC + if gpu.IsGPUCollectionSupported() { + p.CounterStats[config.GPUSMUtilization] = &types.UInt64Stat{} + p.CounterStats[config.GPUMemUtilization] = &types.UInt64Stat{} + } + return p +} + +// ResetCurr reset all current value to 0 +func (p *VMMetrics) ResetDeltaValues() { + p.CPUTime.ResetDeltaValues() + for counterKey := range p.CounterStats { + p.CounterStats[counterKey].ResetDeltaValues() + } + for i := 0; i < config.MaxIRQ; i++ { + p.SoftIRQCount[i].ResetDeltaValues() + } + p.DynEnergyInCore.ResetDeltaValues() + p.DynEnergyInDRAM.ResetDeltaValues() + p.DynEnergyInUncore.ResetDeltaValues() + p.DynEnergyInPkg.ResetDeltaValues() + p.DynEnergyInOther.ResetDeltaValues() + p.DynEnergyInGPU.ResetDeltaValues() + p.DynEnergyInPlatform.ResetDeltaValues() + p.IdleEnergyInCore.ResetDeltaValues() + p.IdleEnergyInDRAM.ResetDeltaValues() + p.IdleEnergyInUncore.ResetDeltaValues() + p.IdleEnergyInPkg.ResetDeltaValues() + p.IdleEnergyInOther.ResetDeltaValues() + p.IdleEnergyInGPU.ResetDeltaValues() + p.IdleEnergyInPlatform.ResetDeltaValues() +} diff --git a/pkg/collector/metric/vm_metric_test.go b/pkg/collector/metric/vm_metric_test.go new file mode 100644 index 0000000000..d0a273a1db --- /dev/null +++ b/pkg/collector/metric/vm_metric_test.go @@ -0,0 +1,15 @@ +package metric + +import ( + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("VMMetric", func() { + + It("Test ResetDeltaValues", func() { + p := NewVMMetrics(0, "name") + p.ResetDeltaValues() + Expect(p.CPUTime.Delta).To(Equal(uint64(0))) + }) +}) diff --git a/pkg/collector/metric_collector.go b/pkg/collector/metric_collector.go index 5f9e2adc9c..c14d6ae997 100644 --- a/pkg/collector/metric_collector.go +++ b/pkg/collector/metric_collector.go @@ -36,6 +36,7 @@ import ( const ( maxInactiveContainers = 10 maxInactiveProcesses = 5 + maxInactiveVM = 3 ) type Collector struct { @@ -48,6 +49,9 @@ type Collector struct { // ProcessMetrics hold all process energy and resource usage metrics ProcessMetrics map[uint64]*collector_metric.ProcessMetrics + // VMMetrics hold all Virtual Machine energy and resource usage metrics + VMMetrics map[uint64]*collector_metric.VMMetrics + // generic names to be used for process that are not within a pod systemProcessName string systemProcessNamespace string @@ -58,6 +62,7 @@ func NewCollector() *Collector { NodeMetrics: *collector_metric.NewNodeMetrics(), ContainersMetrics: map[string]*collector_metric.ContainerMetrics{}, ProcessMetrics: map[uint64]*collector_metric.ProcessMetrics{}, + VMMetrics: map[uint64]*collector_metric.VMMetrics{}, systemProcessName: utils.SystemProcessName, systemProcessNamespace: utils.SystemProcessNamespace, } @@ -127,6 +132,7 @@ func (c *Collector) Update() { // calculate the process energy consumption using its resource utilization and the node components energy consumption if config.EnableProcessMetrics { c.updateProcessEnergy() + c.updateVMEnergy() } // check the log verbosity level before iterating in all container diff --git a/pkg/collector/metric_collector_test.go b/pkg/collector/metric_collector_test.go index bd3166e2f8..7e2c8291de 100644 --- a/pkg/collector/metric_collector_test.go +++ b/pkg/collector/metric_collector_test.go @@ -139,6 +139,7 @@ func newMockCollector() *Collector { metricCollector := NewCollector() metricCollector.ContainersMetrics = createMockContainersMetrics() metricCollector.ProcessMetrics = map[uint64]*collector_metric.ProcessMetrics{} + metricCollector.VMMetrics = map[uint64]*collector_metric.VMMetrics{} metricCollector.NodeMetrics = createMockNodeMetrics(metricCollector.ContainersMetrics) return metricCollector diff --git a/pkg/collector/prometheus_collector.go b/pkg/collector/prometheus_collector.go index fb75fde794..1024801577 100644 --- a/pkg/collector/prometheus_collector.go +++ b/pkg/collector/prometheus_collector.go @@ -102,6 +102,7 @@ type PrometheusCollector struct { containerDesc *ContainerDesc podDesc *PodDesc processDesc *processDesc + vmDesc *vmDesc // NodeMetrics holds all node energy and resource usage metrics NodeMetrics *collector_metric.NodeMetrics @@ -112,6 +113,9 @@ type PrometheusCollector struct { // ProcessMetrics hold all process energy and resource usage metrics ProcessMetrics *map[uint64]*collector_metric.ProcessMetrics + // VMMetrics hold all Virtual Machine energy and resource usage metrics + VMMetrics *map[uint64]*collector_metric.VMMetrics + // SamplePeriodSec the collector metric collection interval SamplePeriodSec float64 @@ -132,11 +136,13 @@ func NewPrometheusExporter() *PrometheusCollector { nodeDesc: &NodeDesc{}, podDesc: &PodDesc{}, processDesc: &processDesc{}, + vmDesc: &vmDesc{}, } exporter.newNodeMetrics() exporter.newContainerMetrics() exporter.newPodMetrics() exporter.newprocessMetrics() + exporter.newVMMetrics() return &exporter } @@ -216,6 +222,7 @@ func (p *PrometheusCollector) Describe(ch chan<- *prometheus.Desc) { ch <- p.containerDesc.containerBlockIRQTotal } p.describeProcess(ch) + p.describeVM(ch) } func (p *PrometheusCollector) newNodeMetrics() { @@ -451,6 +458,7 @@ func (p *PrometheusCollector) Collect(ch chan<- prometheus.Metric) { p.updateNodeMetrics(&wg, ch) p.updatePodMetrics(&wg, ch) p.updateProcessMetrics(&wg, ch) + p.updateVMMetrics(&wg, ch) wg.Wait() } diff --git a/pkg/collector/prometheus_collector_test.go b/pkg/collector/prometheus_collector_test.go index ff2faf16c1..332637ae90 100644 --- a/pkg/collector/prometheus_collector_test.go +++ b/pkg/collector/prometheus_collector_test.go @@ -67,6 +67,7 @@ func newMockPrometheusExporter() *PrometheusCollector { exporter.NodeMetrics = collector_metric.NewNodeMetrics() exporter.ContainersMetrics = &map[string]*collector_metric.ContainerMetrics{} exporter.ProcessMetrics = &map[uint64]*collector_metric.ProcessMetrics{} + exporter.VMMetrics = &map[uint64]*collector_metric.VMMetrics{} exporter.SamplePeriodSec = 3.0 collector_metric.ContainerFeaturesNames = []string{config.CoreUsageMetric} collector_metric.NodeMetadataFeatureNames = []string{"cpu_architecture"} diff --git a/pkg/collector/prometheus_process_collector.go b/pkg/collector/prometheus_process_collector.go index 26eb631162..5de03bd658 100644 --- a/pkg/collector/prometheus_process_collector.go +++ b/pkg/collector/prometheus_process_collector.go @@ -13,7 +13,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ - +//nolint:dupl // should be refactor with vm collector package collector import ( diff --git a/pkg/collector/prometheus_vm_collector.go b/pkg/collector/prometheus_vm_collector.go new file mode 100644 index 0000000000..7d2ad04ae5 --- /dev/null +++ b/pkg/collector/prometheus_vm_collector.go @@ -0,0 +1,337 @@ +/* +Copyright 2023. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +//nolint:dupl // should be refactor with process collector +package collector + +import ( + "strconv" + "sync" + + "github.com/prometheus/client_golang/prometheus" + + "github.com/sustainable-computing-io/kepler/pkg/bpfassets/attacher" + collector_metric "github.com/sustainable-computing-io/kepler/pkg/collector/metric" + "github.com/sustainable-computing-io/kepler/pkg/config" +) + +type vmDesc struct { + // Energy (counter) + vmCoreJoulesTotal *prometheus.Desc + vmUncoreJoulesTotal *prometheus.Desc + vmDramJoulesTotal *prometheus.Desc + vmPackageJoulesTotal *prometheus.Desc + vmOtherComponentsJoulesTotal *prometheus.Desc + vmGPUJoulesTotal *prometheus.Desc + vmJoulesTotal *prometheus.Desc + + // Hardware Counters (counter) + vmCPUCyclesTotal *prometheus.Desc + vmCPUInstrTotal *prometheus.Desc + vmCacheMissTotal *prometheus.Desc + + // Additional metrics (gauge) + vmCPUTime *prometheus.Desc + + // IRQ metrics + vmNetTxIRQTotal *prometheus.Desc + vmNetRxIRQTotal *prometheus.Desc + vmBlockIRQTotal *prometheus.Desc +} + +// describevm is called by Describe to implement the prometheus.Collector interface +func (p *PrometheusCollector) describeVM(ch chan<- *prometheus.Desc) { + // vm Energy (counter) + ch <- p.vmDesc.vmCoreJoulesTotal + ch <- p.vmDesc.vmUncoreJoulesTotal + ch <- p.vmDesc.vmDramJoulesTotal + ch <- p.vmDesc.vmPackageJoulesTotal + ch <- p.vmDesc.vmOtherComponentsJoulesTotal + if config.EnabledGPU { + ch <- p.vmDesc.vmGPUJoulesTotal + } + ch <- p.vmDesc.vmJoulesTotal + + // vm Hardware Counters (counter) + if collector_metric.CPUHardwareCounterEnabled { + ch <- p.vmDesc.vmCPUCyclesTotal + ch <- p.vmDesc.vmCPUInstrTotal + ch <- p.vmDesc.vmCacheMissTotal + } + + if config.ExposeIRQCounterMetrics { + ch <- p.vmDesc.vmNetTxIRQTotal + ch <- p.vmDesc.vmNetRxIRQTotal + ch <- p.vmDesc.vmBlockIRQTotal + } +} + +func (p *PrometheusCollector) newVMMetrics() { + // Energy (counter) + vmCoreJoulesTotal := prometheus.NewDesc( + prometheus.BuildFQName(namespace, "vm", "core_joules_total"), + "Aggregated RAPL value in core in joules", + []string{"pid", "name", "mode"}, nil, + ) + vmUncoreJoulesTotal := prometheus.NewDesc( + prometheus.BuildFQName(namespace, "vm", "uncore_joules_total"), + "Aggregated RAPL value in uncore in joules", + []string{"pid", "name", "mode"}, nil, + ) + vmDramJoulesTotal := prometheus.NewDesc( + prometheus.BuildFQName(namespace, "vm", "dram_joules_total"), + "Aggregated RAPL value in dram in joules", + []string{"pid", "name", "mode"}, nil, + ) + vmPackageJoulesTotal := prometheus.NewDesc( + prometheus.BuildFQName(namespace, "vm", "package_joules_total"), + "Aggregated RAPL value in package (socket) in joules", + []string{"pid", "name", "mode"}, nil, + ) + vmOtherComponentsJoulesTotal := prometheus.NewDesc( + prometheus.BuildFQName(namespace, "vm", "other_host_components_joules_total"), + "Aggregated value in other host components (platform - package - dram) in joules", + []string{"pid", "name", "mode"}, nil, + ) + vmGPUJoulesTotal := prometheus.NewDesc( + prometheus.BuildFQName(namespace, "vm", "gpu_joules_total"), + "Aggregated GPU value in joules", + []string{"pid", "name", "mode"}, nil, + ) + vmJoulesTotal := prometheus.NewDesc( + prometheus.BuildFQName(namespace, "vm", "joules_total"), + "Aggregated RAPL Package + Uncore + DRAM + GPU + other host components (platform - package - dram) in joules", + []string{"pid", "name", "mode"}, nil, + ) + + // Hardware Counters (counter) + vmCPUCyclesTotal := prometheus.NewDesc( + prometheus.BuildFQName(namespace, "vm", "cpu_cycles_total"), + "Aggregated CPU cycle value", + []string{"pid", "name"}, nil, + ) + vmCPUInstrTotal := prometheus.NewDesc( + prometheus.BuildFQName(namespace, "vm", "cpu_instructions_total"), + "Aggregated CPU instruction value", + []string{"pid", "name"}, nil, + ) + vmCacheMissTotal := prometheus.NewDesc( + prometheus.BuildFQName(namespace, "vm", "cache_miss_total"), + "Aggregated cache miss value", + []string{"pid", "name"}, nil, + ) + // Additional metrics (gauge) + vmCPUTime := prometheus.NewDesc( + prometheus.BuildFQName(namespace, "vm", "cpu_cpu_time_us"), + "Aggregated CPU time", + []string{"pid", "name"}, nil) + + // network irq metrics + vmNetTxIRQTotal := prometheus.NewDesc( + prometheus.BuildFQName(namespace, "vm", "bpf_net_tx_irq_total"), + "Aggregated network tx irq value obtained from BPF", + []string{"pid", "name"}, nil, + ) + vmNetRxIRQTotal := prometheus.NewDesc( + prometheus.BuildFQName(namespace, "vm", "bpf_net_rx_irq_total"), + "Aggregated network rx irq value obtained from BPF", + []string{"pid", "name"}, nil, + ) + vmBlockIRQTotal := prometheus.NewDesc( + prometheus.BuildFQName(namespace, "vm", "bpf_block_irq_total"), + "Aggregated block irq value obtained from BPF", + []string{"pid", "name"}, nil, + ) + + p.vmDesc = &vmDesc{ + vmCoreJoulesTotal: vmCoreJoulesTotal, + vmUncoreJoulesTotal: vmUncoreJoulesTotal, + vmDramJoulesTotal: vmDramJoulesTotal, + vmPackageJoulesTotal: vmPackageJoulesTotal, + vmOtherComponentsJoulesTotal: vmOtherComponentsJoulesTotal, + vmGPUJoulesTotal: vmGPUJoulesTotal, + vmJoulesTotal: vmJoulesTotal, + vmCPUCyclesTotal: vmCPUCyclesTotal, + vmCPUInstrTotal: vmCPUInstrTotal, + vmCacheMissTotal: vmCacheMissTotal, + vmCPUTime: vmCPUTime, + vmNetTxIRQTotal: vmNetTxIRQTotal, + vmNetRxIRQTotal: vmNetRxIRQTotal, + vmBlockIRQTotal: vmBlockIRQTotal, + } +} + +// updatevmMetrics send vm metrics to prometheus +func (p *PrometheusCollector) updateVMMetrics(wg *sync.WaitGroup, ch chan<- prometheus.Metric) { + // "Instance Name" in openstack are 17 characters strings + const nameLenLimit = 17 + for pid, vm := range *p.VMMetrics { + wg.Add(1) + go func(pid uint64, vm *collector_metric.VMMetrics) { + defer wg.Done() + vmName := vm.Name + if len(vmName) > nameLenLimit { + vmName = vm.Name[:nameLenLimit] + } + pidStr := strconv.FormatUint(pid, 10) + ch <- prometheus.MustNewConstMetric( + p.vmDesc.vmCPUTime, + prometheus.CounterValue, + float64(vm.CPUTime.Aggr), + pidStr, vmName, + ) + ch <- prometheus.MustNewConstMetric( + p.vmDesc.vmCoreJoulesTotal, + prometheus.CounterValue, + float64(vm.DynEnergyInCore.Aggr)/miliJouleToJoule, + pidStr, vmName, "dynamic", + ) + ch <- prometheus.MustNewConstMetric( + p.vmDesc.vmCoreJoulesTotal, + prometheus.CounterValue, + float64(vm.IdleEnergyInCore.Aggr)/miliJouleToJoule, + pidStr, vmName, "idle", + ) + ch <- prometheus.MustNewConstMetric( + p.vmDesc.vmUncoreJoulesTotal, + prometheus.CounterValue, + float64(vm.DynEnergyInUncore.Aggr)/miliJouleToJoule, + pidStr, vmName, "dynamic", + ) + ch <- prometheus.MustNewConstMetric( + p.vmDesc.vmUncoreJoulesTotal, + prometheus.CounterValue, + float64(vm.IdleEnergyInUncore.Aggr)/miliJouleToJoule, + pidStr, vmName, "idle", + ) + ch <- prometheus.MustNewConstMetric( + p.vmDesc.vmDramJoulesTotal, + prometheus.CounterValue, + float64(vm.DynEnergyInDRAM.Aggr)/miliJouleToJoule, + pidStr, vmName, "dynamic", + ) + ch <- prometheus.MustNewConstMetric( + p.vmDesc.vmDramJoulesTotal, + prometheus.CounterValue, + float64(vm.IdleEnergyInDRAM.Aggr)/miliJouleToJoule, + pidStr, vmName, "idle", + ) + ch <- prometheus.MustNewConstMetric( + p.vmDesc.vmPackageJoulesTotal, + prometheus.CounterValue, + float64(vm.DynEnergyInPkg.Aggr)/miliJouleToJoule, + pidStr, vmName, "dynamic", + ) + ch <- prometheus.MustNewConstMetric( + p.vmDesc.vmPackageJoulesTotal, + prometheus.CounterValue, + float64(vm.IdleEnergyInPkg.Aggr)/miliJouleToJoule, + pidStr, vmName, "idle", + ) + ch <- prometheus.MustNewConstMetric( + p.vmDesc.vmOtherComponentsJoulesTotal, + prometheus.CounterValue, + float64(vm.DynEnergyInOther.Aggr)/miliJouleToJoule, + pidStr, vmName, "dynamic", + ) + ch <- prometheus.MustNewConstMetric( + p.vmDesc.vmOtherComponentsJoulesTotal, + prometheus.CounterValue, + float64(vm.IdleEnergyInOther.Aggr)/miliJouleToJoule, + pidStr, vmName, "idle", + ) + if config.EnabledGPU { + ch <- prometheus.MustNewConstMetric( + p.vmDesc.vmGPUJoulesTotal, + prometheus.CounterValue, + float64(vm.DynEnergyInGPU.Aggr)/miliJouleToJoule, + pidStr, vmName, "dynamic", + ) + ch <- prometheus.MustNewConstMetric( + p.vmDesc.vmGPUJoulesTotal, + prometheus.CounterValue, + float64(vm.IdleEnergyInGPU.Aggr)/miliJouleToJoule, + pidStr, vmName, "idle", + ) + } + ch <- prometheus.MustNewConstMetric( + p.vmDesc.vmJoulesTotal, + prometheus.CounterValue, + (float64(vm.DynEnergyInPkg.Aggr)/miliJouleToJoule + + float64(vm.DynEnergyInUncore.Aggr)/miliJouleToJoule + + float64(vm.DynEnergyInDRAM.Aggr)/miliJouleToJoule + + float64(vm.DynEnergyInGPU.Aggr)/miliJouleToJoule + + float64(vm.DynEnergyInOther.Aggr)/miliJouleToJoule), + pidStr, vmName, "dynamic", + ) + ch <- prometheus.MustNewConstMetric( + p.vmDesc.vmJoulesTotal, + prometheus.CounterValue, + (float64(vm.IdleEnergyInPkg.Aggr)/miliJouleToJoule + + float64(vm.IdleEnergyInUncore.Aggr)/miliJouleToJoule + + float64(vm.IdleEnergyInDRAM.Aggr)/miliJouleToJoule + + float64(vm.IdleEnergyInGPU.Aggr)/miliJouleToJoule + + float64(vm.IdleEnergyInOther.Aggr)/miliJouleToJoule), + pidStr, vmName, "idle", + ) + if collector_metric.CPUHardwareCounterEnabled { + if vm.CounterStats[attacher.CPUCycleLabel] != nil { + ch <- prometheus.MustNewConstMetric( + p.vmDesc.vmCPUCyclesTotal, + prometheus.CounterValue, + float64(vm.CounterStats[attacher.CPUCycleLabel].Aggr), + pidStr, vmName, + ) + } + if vm.CounterStats[attacher.CPUInstructionLabel] != nil { + ch <- prometheus.MustNewConstMetric( + p.vmDesc.vmCPUInstrTotal, + prometheus.CounterValue, + float64(vm.CounterStats[attacher.CPUInstructionLabel].Aggr), + pidStr, vmName, + ) + } + if vm.CounterStats[attacher.CacheMissLabel] != nil { + ch <- prometheus.MustNewConstMetric( + p.vmDesc.vmCacheMissTotal, + prometheus.CounterValue, + float64(vm.CounterStats[attacher.CacheMissLabel].Aggr), + pidStr, vmName, + ) + } + } + if config.ExposeIRQCounterMetrics { + ch <- prometheus.MustNewConstMetric( + p.vmDesc.vmNetTxIRQTotal, + prometheus.CounterValue, + float64(vm.SoftIRQCount[attacher.IRQNetTX].Aggr), + pidStr, vmName, + ) + ch <- prometheus.MustNewConstMetric( + p.vmDesc.vmNetRxIRQTotal, + prometheus.CounterValue, + float64(vm.SoftIRQCount[attacher.IRQNetRX].Aggr), + pidStr, vmName, + ) + ch <- prometheus.MustNewConstMetric( + p.vmDesc.vmBlockIRQTotal, + prometheus.CounterValue, + float64(vm.SoftIRQCount[attacher.IRQBlock].Aggr), + pidStr, vmName, + ) + } + }(pid, vm) + } +} diff --git a/pkg/collector/utils.go b/pkg/collector/utils.go index 019deefcec..a5dad62847 100644 --- a/pkg/collector/utils.go +++ b/pkg/collector/utils.go @@ -60,3 +60,11 @@ func (c *Collector) createProcessMetricsIfNotExist(pid uint64, command string) { func addSuffix(name, suffix string) string { return fmt.Sprintf("%s_%s", name, suffix) } + +func (c *Collector) createVMMetricsIfNotExist(pid uint64, name string) { + if p, ok := c.VMMetrics[pid]; !ok { + c.VMMetrics[pid] = collector_metric.NewVMMetrics(pid, name) + } else if p.Name == "" { + p.Name = name + } +} diff --git a/pkg/collector/vm_energy_collector.go b/pkg/collector/vm_energy_collector.go new file mode 100644 index 0000000000..87c754d46f --- /dev/null +++ b/pkg/collector/vm_energy_collector.go @@ -0,0 +1,26 @@ +/* +Copyright 2023. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package collector + +import ( + "github.com/sustainable-computing-io/kepler/pkg/model" +) + +// updateVMEnergy matches the virtual machine metrics with process metrics +func (c *Collector) updateVMEnergy() { + model.UpdateVMEnergy(c.VMMetrics, c.ProcessMetrics) +} diff --git a/pkg/libvirt/resolve_vm.go b/pkg/libvirt/resolve_vm.go new file mode 100644 index 0000000000..aa58caa69f --- /dev/null +++ b/pkg/libvirt/resolve_vm.go @@ -0,0 +1,91 @@ +/* +Copyright 2023. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package libvirt + +import ( + "fmt" + "io/ioutil" + "path/filepath" +) + +const ( + libvirtPath string = "/var/run/libvirt/qemu/" + procPath string = "/proc/%s/task" +) + +func getThreadIDsForPID(pid, extraPath string) []string { + threadIDs := []string{} + fullPath := "" + + if procPath != "" { + fullPath = filepath.Join(extraPath, procPath) + } else { + fullPath = procPath + } + + procDir := fmt.Sprintf(fullPath, pid) + files, err := ioutil.ReadDir(procDir) + if err != nil { + return nil + } + + for _, file := range files { + threadIDs = append(threadIDs, file.Name()) + } + + return threadIDs +} + +func GetCurrentVMPID(path ...string) (map[string]string, error) { + pidFiles := make(map[string]string) + + if len(path) == 0 { + path = []string{libvirtPath, procPath} + } + + files, err := ioutil.ReadDir(path[0]) + if err != nil { + return nil, err + } + + for _, file := range files { + if file.IsDir() { + continue + } + + if filepath.Ext(file.Name()) == ".pid" { + filePath := filepath.Join(path[0], file.Name()) + content, err := ioutil.ReadFile(filePath) + if err != nil { + fmt.Printf("Error reading %s: %v\n", filePath, err) + continue + } + + currentPid := string(content) + currentName := file.Name() + + tid := getThreadIDsForPID(currentPid, path[1]) + + for _, currentTid := range tid { + // Get rid of the ".pid" before storing the name + pidFiles[currentTid] = currentName[:len(currentName)-4] + } + } + } + + return pidFiles, nil +} diff --git a/pkg/libvirt/resolve_vm_test.go b/pkg/libvirt/resolve_vm_test.go new file mode 100644 index 0000000000..1b300908b3 --- /dev/null +++ b/pkg/libvirt/resolve_vm_test.go @@ -0,0 +1,85 @@ +/* +Copyright 2023. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package libvirt + +import ( + "io/ioutil" + "os" + "path/filepath" + "reflect" + "testing" +) + +func createMockLibvirtDir(directory string) { + mockFiles := []struct { + name string + content string + }{ + {"vm1.pid", "1234"}, + {"vm2.pid", "5678"}, + } + + for _, file := range mockFiles { + err := ioutil.WriteFile(filepath.Join(directory, file.name), []byte(file.content), 0644) + if err != nil { + panic(err) + } + } +} + +func createMockProcDir(directory string) { + mockThreadDirs := []string{ + "/proc/1234/task/123", + "/proc/1234/task/456", + "/proc/1234/task/789", + "/proc/5678/task/1234", + "/proc/5678/task/4567", + "/proc/5678/task/7890", + } + for _, dir := range mockThreadDirs { + err := os.MkdirAll(filepath.Join(directory, dir), 0755) + if err != nil { + panic(err) + } + } +} + +func TestGetCurrentVMPID(t *testing.T) { + mockLibvirtDir := t.TempDir() + createMockLibvirtDir(mockLibvirtDir) + + mockProcDir := t.TempDir() + createMockProcDir(mockProcDir) + + pidFiles, err := GetCurrentVMPID(mockLibvirtDir, mockProcDir) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + expectedResult := map[string]string{ + "123": "vm1", + "456": "vm1", + "789": "vm1", + "1234": "vm2", + "4567": "vm2", + "7890": "vm2", + } + + if !reflect.DeepEqual(pidFiles, expectedResult) { + t.Errorf("Expected: %v, Got: %v", expectedResult, pidFiles) + } +} diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 1a013fb6b3..63e1e3654e 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -47,6 +47,7 @@ func New() *CollectorManager { manager.PrometheusCollector.NodeMetrics = &manager.MetricCollector.NodeMetrics manager.PrometheusCollector.ContainersMetrics = &manager.MetricCollector.ContainersMetrics manager.PrometheusCollector.ProcessMetrics = &manager.MetricCollector.ProcessMetrics + manager.PrometheusCollector.VMMetrics = &manager.MetricCollector.VMMetrics manager.PrometheusCollector.SamplePeriodSec = config.SamplePeriodSec // configure the wather manager.Watcher = kubernetes.NewObjListWatcher() diff --git a/pkg/model/vm_power.go b/pkg/model/vm_power.go new file mode 100644 index 0000000000..8bff3f9ead --- /dev/null +++ b/pkg/model/vm_power.go @@ -0,0 +1,53 @@ +/* +Copyright 2023. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package model + +import ( + collector_metric "github.com/sustainable-computing-io/kepler/pkg/collector/metric" +) + +// UpdateVMEnergy matches the VM metrics with the process metrics already computed +func UpdateVMEnergy(vmMetrics map[uint64]*collector_metric.VMMetrics, processMetrics map[uint64]*collector_metric.ProcessMetrics) { + for _, vmmetrics := range vmMetrics { + for _, procmetrics := range processMetrics { + if procmetrics.PID != vmmetrics.PID { + continue + } + + vmmetrics.CPUTime = procmetrics.CPUTime + vmmetrics.CounterStats = procmetrics.CounterStats + + vmmetrics.SoftIRQCount = procmetrics.SoftIRQCount + + vmmetrics.DynEnergyInCore = procmetrics.DynEnergyInCore + vmmetrics.DynEnergyInDRAM = procmetrics.DynEnergyInDRAM + vmmetrics.DynEnergyInUncore = procmetrics.DynEnergyInUncore + vmmetrics.DynEnergyInPkg = procmetrics.DynEnergyInPkg + vmmetrics.DynEnergyInGPU = procmetrics.DynEnergyInGPU + vmmetrics.DynEnergyInOther = procmetrics.DynEnergyInOther + vmmetrics.DynEnergyInPlatform = procmetrics.DynEnergyInPlatform + + vmmetrics.IdleEnergyInCore = procmetrics.IdleEnergyInCore + vmmetrics.IdleEnergyInDRAM = procmetrics.IdleEnergyInDRAM + vmmetrics.IdleEnergyInUncore = procmetrics.IdleEnergyInUncore + vmmetrics.IdleEnergyInPkg = procmetrics.IdleEnergyInPkg + vmmetrics.IdleEnergyInGPU = procmetrics.IdleEnergyInGPU + vmmetrics.IdleEnergyInOther = procmetrics.IdleEnergyInOther + vmmetrics.IdleEnergyInPlatform = procmetrics.IdleEnergyInPlatform + } + } +}