diff --git a/src/workflow_executor.rs b/src/workflow_executor.rs index 0533e43..ff2d01e 100644 --- a/src/workflow_executor.rs +++ b/src/workflow_executor.rs @@ -464,9 +464,9 @@ Use the `--produce-script myscript.sh` option for this."; None => Vec::new(), }; let mut finished_tasks: Vec = Vec::new(); // Vector of finished tasks - + // instance use to query memory / cpu for process pids - let mut system = sysinfo::System::new_all(); + let system = System::new_all(); println!("candidates: {:?}", candidates); @@ -520,98 +520,7 @@ Use the `--produce-script myscript.sh` option for this."; while self.wait_for_any(&mut finished_from_started, &mut failing) { if !self.arguments.dry_run { - - if self.internal_monitor_counter % 5 == 0 { - self.internal_monitor_id += 1; - - println!("DOING MONITORING"); - - let mut global_cpu: f32 = 0.0; - let mut global_rss: u64 = 0; - let mut resources_per_task: HashMap< - &usize, - HashMap<&str, serde_json::Value>, - > = HashMap::new(); - - for (tid, proc) in &self.process_list { - let pid = proc.id(); - let mut allprocs: Vec = Vec::new(); - - allprocs.push(Pid::from_u32(pid)); - allprocs.extend(find_child_processes_recursive(&system, Pid::from_u32(pid))); - - // accumulate total metrics - let mut total_cpu = 0.0; - let mut total_rss: u64 = 0; - - for child_proc_id in allprocs { - let mut this_rss: u64 = 0; - - // get process reference - if let Some(p) = system.process(child_proc_id) { - // MEMORY part - this_rss = p.memory(); - total_rss += this_rss; - - // CPU part - let this_cpu = p.cpu_usage(); - total_cpu += this_cpu; - } - } - let time_delta = self - .start_time - .map_or(0, |t| Instant::now().duration_since(t).as_millis() as i32); - - total_rss = total_rss / 1024 / 1024; - - let nice_value = unsafe { getpriority(PRIO_PROCESS, pid) }; - let mut task_resources: HashMap<&str, serde_json::Value> = - HashMap::new(); - - task_resources.insert( - "iter", - serde_json::Value::Number(self.internal_monitor_id.into()), - ); - task_resources.insert( - "name", - serde_json::Value::String(self.id_to_task[*tid].clone()), - ); - task_resources.insert( - "cpu", - serde_json::Value::Number((total_cpu as u64).into()), - ); - task_resources - .insert("rss", serde_json::Value::Number(total_rss.into())); - task_resources - .insert("nice", serde_json::Value::Number(nice_value.into())); - task_resources.insert( - "label", - self.workflow_spec["stages"][tid]["labels"].clone(), - ); - - resources_per_task.insert(tid, task_resources); - - self.resource_manager.add_monitored_resources( - tid, - time_delta, - total_cpu / 100.0, - total_rss, - ); - - if nice_value == self.resource_manager.nice_default { - global_cpu += total_cpu; - global_rss += total_rss; - } - - self.metric_logger - .info(&format!("{:?}", resources_per_task.get(tid).unwrap())); - } - - if global_rss > self.resource_manager.resource_boundaries.mem_limit { - self.metric_logger - .info(&format!("***MEMORY LIMIT EXCEEDED***",)); - } - } + self.monitor(&system); thread::sleep(Duration::from_secs(1)); self.internal_monitor_counter += 1; } else { @@ -737,93 +646,88 @@ Use the `--produce-script myscript.sh` option for this."; fs::remove_file(timef).unwrap(); } - // fn monitor(&'a mut self) { - // self.internal_monitor_counter += 1; - - // if self.internal_monitor_counter % 5 != 0 { - // return; - // } - - // self.internal_monitor_id += 1; - - // let mut global_cpu: f32 = 0.0; - // let mut global_rss: u64 = 0; - // let mut resources_per_task: HashMap<&usize, HashMap<&str, serde_json::Value>> = - // HashMap::new(); - - // for (tid, proc) in &self.process_list { - // let pid = proc.id(); - // let mut sysinfo_procs = Vec::new(); - - // sysinfo_procs.push(Pid::from_u32(pid)); - - // sysinfo_procs.extend(children(pid, &self.sysinfo)); - - // // accumulate total metrics - // let mut total_cpu = 0.0; - // let mut total_rss: u64 = 0; - - // for p in sysinfo_procs { - // let mut this_rss: u64 = 0; - - // if let Some(process) = sysinfo.process(p) { - // // MEMORY part - // this_rss = process.memory(); - // total_rss += this_rss; - - // // CPU part - // if let Some(cached_proc) = self.pid_to_sysinfo_proc.get(&p) { - // let this_cpu = cached_proc.cpu_usage(); - // total_cpu += this_cpu; - // } else { - // self.pid_to_sysinfo_proc.insert(p, process); - // } - // } - // } - // let time_delta = self - // .start_time - // .map_or(0, |t| Instant::now().duration_since(t).as_millis() as i32); - - // total_rss = total_rss / 1024 / 1024; - - // let nice_value = unsafe { getpriority(PRIO_PROCESS, pid) }; - // let mut task_resources: HashMap<&str, serde_json::Value> = HashMap::new(); - - // task_resources.insert( - // "iter", - // serde_json::Value::Number(self.internal_monitor_id.into()), - // ); - // task_resources.insert( - // "name", - // serde_json::Value::String(self.id_to_task[*tid].clone()), - // ); - // task_resources.insert("cpu", serde_json::Value::Number((total_cpu as u64).into())); - // task_resources.insert("rss", serde_json::Value::Number(total_rss.into())); - // task_resources.insert("nice", serde_json::Value::Number(nice_value.into())); - // task_resources.insert("label", self.workflow_spec["stages"][tid]["labels"].clone()); - - // resources_per_task.insert(tid, task_resources); - - // self.resource_manager.add_monitored_resources( - // tid, - // time_delta, - // total_cpu / 100.0, - // total_rss, - // ); - - // if nice_value == self.resource_manager.nice_default { - // global_cpu += total_cpu; - // global_rss += total_rss; - // } - - // self.metric_logger - // .info(&format!("{:?}", resources_per_task.get(tid).unwrap())); - // } - // if global_rss > self.resource_manager.resource_boundaries.mem_limit { - // self.metric_logger - // .info(&format!("***MEMORY LIMIT EXCEEDED***",)); - // } - // } + fn monitor(&mut self, system: &System) { + if self.internal_monitor_counter % 5 == 0 { + self.internal_monitor_id += 1; + + println!("DOING MONITORING"); + + let mut global_cpu: f32 = 0.0; + let mut global_rss: u64 = 0; + let mut resources_per_task: HashMap<&usize, HashMap<&str, serde_json::Value>> = + HashMap::new(); + + for (tid, proc) in &self.process_list { + let pid = proc.id(); + let mut allprocs: Vec = Vec::new(); + + allprocs.push(Pid::from_u32(pid)); + allprocs.extend(find_child_processes_recursive(&system, Pid::from_u32(pid))); + + // accumulate total metrics + let mut total_cpu = 0.0; + let mut total_rss: u64 = 0; + + for child_proc_id in allprocs { + let mut this_rss: u64 = 0; + + // get process reference + if let Some(p) = system.process(child_proc_id) { + // MEMORY part + this_rss = p.memory(); + total_rss += this_rss; + + // CPU part + let this_cpu = p.cpu_usage(); + total_cpu += this_cpu; + } + } + let time_delta = self + .start_time + .map_or(0, |t| Instant::now().duration_since(t).as_millis() as i32); + + total_rss = total_rss / 1024 / 1024; + + let nice_value = unsafe { getpriority(PRIO_PROCESS, pid) }; + let mut task_resources: HashMap<&str, serde_json::Value> = HashMap::new(); + + task_resources.insert( + "iter", + serde_json::Value::Number(self.internal_monitor_id.into()), + ); + task_resources.insert( + "name", + serde_json::Value::String(self.id_to_task[*tid].clone()), + ); + task_resources.insert("cpu", serde_json::Value::Number((total_cpu as u64).into())); + task_resources.insert("rss", serde_json::Value::Number(total_rss.into())); + task_resources.insert("nice", serde_json::Value::Number(nice_value.into())); + task_resources.insert("label", self.workflow_spec["stages"][tid]["labels"].clone()); + + resources_per_task.insert(tid, task_resources); + + self.resource_manager.add_monitored_resources( + tid, + time_delta, + total_cpu / 100.0, + total_rss, + ); + + if nice_value == self.resource_manager.nice_default { + global_cpu += total_cpu; + global_rss += total_rss; + } + + self.metric_logger + .info(&format!("{:?}", resources_per_task.get(tid).unwrap())); + } + + if global_rss > self.resource_manager.resource_boundaries.mem_limit { + self.metric_logger + .info(&format!("***MEMORY LIMIT EXCEEDED***",)); + } + } + } fn wait_for_any(&mut self, finished: &mut Vec, failingtasks: &mut Vec) -> bool { let mut failure_detected = false;