Skip to content

Commit

Permalink
update(workflow_executor): extract the monitoring logic into a separa…
Browse files Browse the repository at this point in the history
…te function
  • Loading branch information
bkatsevych committed Mar 29, 2024
1 parent cdadb17 commit 0c1531f
Showing 1 changed file with 85 additions and 181 deletions.
266 changes: 85 additions & 181 deletions src/workflow_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -464,9 +464,9 @@ Use the `--produce-script myscript.sh` option for this.";
None => Vec::new(),
};
let mut finished_tasks: Vec<usize> = Vec::new(); // Vector of finished tasks

// instance use to query memory / cpu for process pids
let mut system = sysinfo::System::new_all();
let system = System::new_all();

println!("candidates: {:?}", candidates);

Expand Down Expand Up @@ -520,98 +520,7 @@ Use the `--produce-script myscript.sh` option for this.";

while self.wait_for_any(&mut finished_from_started, &mut failing) {
if !self.arguments.dry_run {

if self.internal_monitor_counter % 5 == 0 {
self.internal_monitor_id += 1;

println!("DOING MONITORING");

let mut global_cpu: f32 = 0.0;
let mut global_rss: u64 = 0;
let mut resources_per_task: HashMap<
&usize,
HashMap<&str, serde_json::Value>,
> = HashMap::new();

for (tid, proc) in &self.process_list {
let pid = proc.id();
let mut allprocs: Vec<Pid> = Vec::new();

allprocs.push(Pid::from_u32(pid));
allprocs.extend(find_child_processes_recursive(&system, Pid::from_u32(pid)));

// accumulate total metrics
let mut total_cpu = 0.0;
let mut total_rss: u64 = 0;

for child_proc_id in allprocs {
let mut this_rss: u64 = 0;

// get process reference
if let Some(p) = system.process(child_proc_id) {
// MEMORY part
this_rss = p.memory();
total_rss += this_rss;

// CPU part
let this_cpu = p.cpu_usage();
total_cpu += this_cpu;
}
}
let time_delta = self
.start_time
.map_or(0, |t| Instant::now().duration_since(t).as_millis() as i32);

total_rss = total_rss / 1024 / 1024;

let nice_value = unsafe { getpriority(PRIO_PROCESS, pid) };
let mut task_resources: HashMap<&str, serde_json::Value> =
HashMap::new();

task_resources.insert(
"iter",
serde_json::Value::Number(self.internal_monitor_id.into()),
);
task_resources.insert(
"name",
serde_json::Value::String(self.id_to_task[*tid].clone()),
);
task_resources.insert(
"cpu",
serde_json::Value::Number((total_cpu as u64).into()),
);
task_resources
.insert("rss", serde_json::Value::Number(total_rss.into()));
task_resources
.insert("nice", serde_json::Value::Number(nice_value.into()));
task_resources.insert(
"label",
self.workflow_spec["stages"][tid]["labels"].clone(),
);

resources_per_task.insert(tid, task_resources);

self.resource_manager.add_monitored_resources(
tid,
time_delta,
total_cpu / 100.0,
total_rss,
);

if nice_value == self.resource_manager.nice_default {
global_cpu += total_cpu;
global_rss += total_rss;
}

self.metric_logger
.info(&format!("{:?}", resources_per_task.get(tid).unwrap()));
}

if global_rss > self.resource_manager.resource_boundaries.mem_limit {
self.metric_logger
.info(&format!("***MEMORY LIMIT EXCEEDED***",));
}
}
self.monitor(&system);
thread::sleep(Duration::from_secs(1));
self.internal_monitor_counter += 1;
} else {
Expand Down Expand Up @@ -737,93 +646,88 @@ Use the `--produce-script myscript.sh` option for this.";
fs::remove_file(timef).unwrap();
}

// fn monitor(&'a mut self) {
// self.internal_monitor_counter += 1;

// if self.internal_monitor_counter % 5 != 0 {
// return;
// }

// self.internal_monitor_id += 1;

// let mut global_cpu: f32 = 0.0;
// let mut global_rss: u64 = 0;
// let mut resources_per_task: HashMap<&usize, HashMap<&str, serde_json::Value>> =
// HashMap::new();

// for (tid, proc) in &self.process_list {
// let pid = proc.id();
// let mut sysinfo_procs = Vec::new();

// sysinfo_procs.push(Pid::from_u32(pid));

// sysinfo_procs.extend(children(pid, &self.sysinfo));

// // accumulate total metrics
// let mut total_cpu = 0.0;
// let mut total_rss: u64 = 0;

// for p in sysinfo_procs {
// let mut this_rss: u64 = 0;

// if let Some(process) = sysinfo.process(p) {
// // MEMORY part
// this_rss = process.memory();
// total_rss += this_rss;

// // CPU part
// if let Some(cached_proc) = self.pid_to_sysinfo_proc.get(&p) {
// let this_cpu = cached_proc.cpu_usage();
// total_cpu += this_cpu;
// } else {
// self.pid_to_sysinfo_proc.insert(p, process);
// }
// }
// }
// let time_delta = self
// .start_time
// .map_or(0, |t| Instant::now().duration_since(t).as_millis() as i32);

// total_rss = total_rss / 1024 / 1024;

// let nice_value = unsafe { getpriority(PRIO_PROCESS, pid) };
// let mut task_resources: HashMap<&str, serde_json::Value> = HashMap::new();

// task_resources.insert(
// "iter",
// serde_json::Value::Number(self.internal_monitor_id.into()),
// );
// task_resources.insert(
// "name",
// serde_json::Value::String(self.id_to_task[*tid].clone()),
// );
// task_resources.insert("cpu", serde_json::Value::Number((total_cpu as u64).into()));
// task_resources.insert("rss", serde_json::Value::Number(total_rss.into()));
// task_resources.insert("nice", serde_json::Value::Number(nice_value.into()));
// task_resources.insert("label", self.workflow_spec["stages"][tid]["labels"].clone());

// resources_per_task.insert(tid, task_resources);

// self.resource_manager.add_monitored_resources(
// tid,
// time_delta,
// total_cpu / 100.0,
// total_rss,
// );

// if nice_value == self.resource_manager.nice_default {
// global_cpu += total_cpu;
// global_rss += total_rss;
// }

// self.metric_logger
// .info(&format!("{:?}", resources_per_task.get(tid).unwrap()));
// }
// if global_rss > self.resource_manager.resource_boundaries.mem_limit {
// self.metric_logger
// .info(&format!("***MEMORY LIMIT EXCEEDED***",));
// }
// }
fn monitor(&mut self, system: &System) {
if self.internal_monitor_counter % 5 == 0 {
self.internal_monitor_id += 1;

println!("DOING MONITORING");

let mut global_cpu: f32 = 0.0;
let mut global_rss: u64 = 0;
let mut resources_per_task: HashMap<&usize, HashMap<&str, serde_json::Value>> =
HashMap::new();

for (tid, proc) in &self.process_list {
let pid = proc.id();
let mut allprocs: Vec<Pid> = Vec::new();

allprocs.push(Pid::from_u32(pid));
allprocs.extend(find_child_processes_recursive(&system, Pid::from_u32(pid)));

// accumulate total metrics
let mut total_cpu = 0.0;
let mut total_rss: u64 = 0;

for child_proc_id in allprocs {
let mut this_rss: u64 = 0;

// get process reference
if let Some(p) = system.process(child_proc_id) {
// MEMORY part
this_rss = p.memory();
total_rss += this_rss;

// CPU part
let this_cpu = p.cpu_usage();
total_cpu += this_cpu;
}
}
let time_delta = self
.start_time
.map_or(0, |t| Instant::now().duration_since(t).as_millis() as i32);

total_rss = total_rss / 1024 / 1024;

let nice_value = unsafe { getpriority(PRIO_PROCESS, pid) };
let mut task_resources: HashMap<&str, serde_json::Value> = HashMap::new();

task_resources.insert(
"iter",
serde_json::Value::Number(self.internal_monitor_id.into()),
);
task_resources.insert(
"name",
serde_json::Value::String(self.id_to_task[*tid].clone()),
);
task_resources.insert("cpu", serde_json::Value::Number((total_cpu as u64).into()));
task_resources.insert("rss", serde_json::Value::Number(total_rss.into()));
task_resources.insert("nice", serde_json::Value::Number(nice_value.into()));
task_resources.insert("label", self.workflow_spec["stages"][tid]["labels"].clone());

resources_per_task.insert(tid, task_resources);

self.resource_manager.add_monitored_resources(
tid,
time_delta,
total_cpu / 100.0,
total_rss,
);

if nice_value == self.resource_manager.nice_default {
global_cpu += total_cpu;
global_rss += total_rss;
}

self.metric_logger
.info(&format!("{:?}", resources_per_task.get(tid).unwrap()));
}

if global_rss > self.resource_manager.resource_boundaries.mem_limit {
self.metric_logger
.info(&format!("***MEMORY LIMIT EXCEEDED***",));
}
}
}

fn wait_for_any(&mut self, finished: &mut Vec<usize>, failingtasks: &mut Vec<usize>) -> bool {
let mut failure_detected = false;
Expand Down

0 comments on commit 0c1531f

Please sign in to comment.