Skip to content

Commit

Permalink
Merge pull request #248 from pynbody/pt-loop-resume
Browse files Browse the repository at this point in the history
Improve statistics reporting
  • Loading branch information
apontzen authored Dec 30, 2023
2 parents f7e4f5b + d231830 commit 6358013
Show file tree
Hide file tree
Showing 14 changed files with 502 additions and 84 deletions.
3 changes: 3 additions & 0 deletions tangos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@
PROPERTY_WRITER_MINIMUM_TIME_BETWEEN_COMMITS = 300 # seconds

# Minimum time between providing updates to the user during tangos write, when running in parallel
# Note that this is a 'polling' interval, for checking whether to update the display. Internally, the
# statistics are updated whenever a commit is made by any process (and the frequency of such commits
# is determined above).
PROPERTY_WRITER_PARALLEL_STATISTICS_TIME_BETWEEN_UPDATES = 600 # seconds

try:
Expand Down
58 changes: 47 additions & 11 deletions tangos/parallel_tasks/accumulative_statistics.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,59 @@
import logging
import copy
import threading
import time
from typing import Optional

from ..config import PROPERTY_WRITER_PARALLEL_STATISTICS_TIME_BETWEEN_UPDATES
from ..log import logger
from .message import BarrierMessageWithResponse, Message

_new_accumulator_requested_for_ranks = []
_new_accumulator = None
_existing_accumulators = []
_accumulator_reporting_thread: Optional[threading.Thread] = None
_accumulator_reporting_semaphore: Optional[threading.Semaphore] = None

def _accumulator_reporting():
global _existing_accumulators
while True:
if _accumulator_reporting_semaphore.acquire(timeout=PROPERTY_WRITER_PARALLEL_STATISTICS_TIME_BETWEEN_UPDATES):
break # semaphore has been released, meaning the thread should stop

# otherwise, semaphore has timed out, so report
for a in _existing_accumulators:
a.report_to_log_if_needed(logger)
def _start_accumulator_reporting_thread():
from . import on_exit_parallelism
global _accumulator_reporting_thread, _accumulator_reporting_semaphore
if _accumulator_reporting_thread is None:
_accumulator_reporting_semaphore = threading.Semaphore(0)
_accumulator_reporting_thread = threading.Thread(target=_accumulator_reporting)
_accumulator_reporting_thread.start()
on_exit_parallelism(_stop_accumulator_reporting_thread)

def _stop_accumulator_reporting_thread():
global _accumulator_reporting_thread, _accumulator_reporting_semaphore
if _accumulator_reporting_thread is not None:
_accumulator_reporting_semaphore.release()
_accumulator_reporting_thread.join()
_accumulator_reporting_thread = None
_accumulator_reporting_semaphore = None

class CreateNewAccumulatorMessage(BarrierMessageWithResponse):

def process_global(self):
from . import backend, on_exit_parallelism
global _existing_accumulators

_start_accumulator_reporting_thread()

from . import on_exit_parallelism

new_accumulator = self.contents()
accumulator_id = len(_existing_accumulators)
_existing_accumulators.append(new_accumulator)

locally_bound_accumulator = new_accumulator
logger.debug("Created new accumulator of type %s with id %d" % (
locally_bound_accumulator.__class__.__name__, accumulator_id))
on_exit_parallelism(lambda: locally_bound_accumulator.report_to_log_if_needed(logger, 0.05))
locally_bound_accumulator.__class__.__name__, accumulator_id))
on_exit_parallelism(lambda: locally_bound_accumulator.report_to_log_if_needed(logger))

self.respond(accumulator_id)

Expand All @@ -45,6 +78,7 @@ def __init__(self, allow_parallel=False):
logger.debug(f"Registering {self.__class__}")
self.id = CreateNewAccumulatorMessage(self.__class__).send_and_get_response(0)
logger.debug(f"Received accumulator id={ self.id}")
self._state_at_last_report = copy.deepcopy(self)

def report_to_server(self):
if self._parallel:
Expand All @@ -66,9 +100,11 @@ def report_to_log_or_server(self, logger):
else:
self.report_to_log(logger)

def report_to_log_if_needed(self, logger, after_time=None):
if after_time is None:
after_time = self.REPORT_AFTER
if time.time() - self._last_reported > after_time:
def report_to_log_if_needed(self, logger):
if self != self._state_at_last_report:
self.report_to_log(logger)
self._last_reported = time.time()
self._state_at_last_report = None # avoid limitless depth in copy!
self._state_at_last_report = copy.deepcopy(self)

def __eq__(self, other):
raise NotImplementedError("This method should be overriden to compare two accumulations")
66 changes: 38 additions & 28 deletions tangos/parallel_tasks/jobs.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import base64
import getpass
import hashlib
import os
import pathlib
import pickle
import pipes
import sys
Expand All @@ -19,7 +18,7 @@ class InconsistentContext(RuntimeError):
pass

class IterationState:
_stored_iteration_states = {}
_this_run_iteration_states = {}
def __init__(self, context, jobs_complete, /, backend_size=None):
from . import backend
self._context = context
Expand Down Expand Up @@ -55,50 +54,61 @@ def from_context(cls, num_jobs, argv=None, stack_hash=None, allow_resume=None, b
log.logger.info(
f"Resuming from previous run. {r.count_complete()} of {len(r)} jobs are already complete.")
log.logger.info(
f"To prevent tangos from doing this, you can delete the file {cls._resume_state_filename()}")
f"To prevent tangos from doing this, you can delete the folder {str(cls._resume_state_folder_path()):s}")
return r

return cls(context, [False]*num_jobs, backend_size=backend_size)


@classmethod
def _resume_state_filename(cls):
return f"tangos_resume_state_{getpass.getuser()}.pickle"
def _resume_state_folder_path(cls):
path = pathlib.Path("~").expanduser() / ".tangos_resume_state"
path.mkdir(exist_ok=True)
return path

@staticmethod
def _resume_state_path():
if not hasattr(IterationState, "_resume_state_path_this_run"):
path = IterationState._resume_state_folder_path()
all_state_files = sorted(list(path.iterdir()))
if len(all_state_files)==0:
i = 0
else:
i = int(all_state_files[-1].name.split("_")[-1].split(".")[0])+1
candidate = path / f"tangos_resume_state_{i:06d}.pickle"
assert not candidate.exists()
IterationState._resume_state_path_this_run = candidate
return IterationState._resume_state_path_this_run

@classmethod
def _get_stored_completion_maps(cls):
maps = {}
try:
with open(cls._resume_state_filename(), "rb") as f:
maps = pickle.load(f)
except FileNotFoundError:
pass
return maps

@classmethod
def _store_completion_maps(cls, maps):
with open(cls._resume_state_filename(), "wb") as f:
pickle.dump(maps, f)
resume_path = cls._resume_state_folder_path()

for filename in sorted(list(resume_path.iterdir())):
if str(filename).endswith(".pickle"):
try:
with filename.open('rb') as f:
maps.update(pickle.load(f))
except (OSError, EOFError):
log.logger.warn(f"Error reading resume state from {str(filename):s}. Skipped.")
pass

return maps
@classmethod
def _get_stored_completion_map_from_context(cls, context):
maps = cls._get_stored_completion_maps()
return maps.get(context, None)

@classmethod
def clear_resume_state(cls):
try:
os.unlink(cls._resume_state_filename())
except FileNotFoundError:
pass
for f in cls._resume_state_folder_path().iterdir():
f.unlink()

def _store_completion_map(self):
# In principle, there could be a race condition if more than one tangos process
# is ongoing on the same filesystem. However, this is very unlikely to happen
# since updating the completion map is very quick and happens quite rarely,
# so we don't worry about it.
maps = self._get_stored_completion_maps()
maps[self._context] = self.to_string()
self._store_completion_maps(maps)
self._this_run_iteration_states[self._context] = self.to_string()
with open(self._resume_state_path(), "wb") as f:
pickle.dump(self._this_run_iteration_states, f)

def mark_complete(self, job):
if job is None:
Expand Down
5 changes: 1 addition & 4 deletions tangos/parallel_tasks/pynbody_server/snapshot_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,7 @@ def _free_if_unused(self):
f"Closing snapshot {self.current_timestep} after processing "
f"{RequestPynbodyArray.get_num_requests()} array fetches")
if RequestPynbodyArray.get_num_requests() > 0:
log.logger.info(" Typical wait time to start retrieving an array: %.1fs +/- %.1fs",
RequestPynbodyArray.get_mean_wait_time(),
RequestPynbodyArray.get_std_wait_time())
log.logger.info(" Total wait time: %.1fs", RequestPynbodyArray.get_total_wait_time())
log.logger.info(" Summed process waiting time: %.1fs", RequestPynbodyArray.get_total_wait_time())
RequestPynbodyArray.reset_performance_stats()

with check_deleted(self.current_snapshot):
Expand Down
8 changes: 7 additions & 1 deletion tangos/properties/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -555,9 +555,15 @@ def providing_classes(property_name_list, handler_class, silent_fail=False, expl
def instantiate_classes(simulation, property_name_list, silent_fail=False, explain=False):
"""Instantiate appropriate property calculation classes for a given simulation and list of property names."""
instances = []
already_instantiated_classes = []

handler_class = type(simulation.get_output_handler())

for property_identifier in property_name_list:
instances.append(providing_class(property_identifier, handler_class, silent_fail, explain)(simulation))
cl = providing_class(property_identifier, handler_class, silent_fail, explain)
if cl not in already_instantiated_classes:
already_instantiated_classes.append(cl)
instances.append(cl(simulation))

return instances

Expand Down
7 changes: 5 additions & 2 deletions tangos/testing/simulation_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,18 @@ def _most_recently_added_timestep(self):
def _two_most_recently_added_timesteps(self):
return self.sim.timesteps[-2:]

def add_timestep(self):
def add_timestep(self, time = None):
"""Add a sequentially-numbered timestep to the specified simulation"""

timestep_num = len(self.sim.timesteps)+1
if timestep_num>self.max_steps:
raise ValueError("Number of steps added exceeds maximum. You can use max_steps=<n> when constructing SimulationGeneratorForTests.")
ts = core.timestep.TimeStep(self.sim, "ts%d"%timestep_num)
ts.redshift = self.max_steps - timestep_num
ts.time_gyr = 0.9*timestep_num
if time is None:
ts.time_gyr = 0.9*timestep_num
else:
ts.time_gyr = time
self.session.add(ts)
self.session.commit()

Expand Down
2 changes: 1 addition & 1 deletion tangos/tools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,4 @@ def add_tools(cls, subparse):
from . import (add_simulation, ahf_merger_tree_importer, changa_bh_importer,
consistent_trees_importer, crosslink, db_importer,
merger_tree_patcher, property_deleter, property_importer,
property_writer, subfind_merger_tree_importer)
property_writer, subfind_merger_tree_importer, timestep_thinner)
35 changes: 21 additions & 14 deletions tangos/tools/property_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,6 @@ def _log_once_per_timestep(self, *args):
if parallel_tasks.backend is None or parallel_tasks.backend.rank()==1 or self.options.load_mode is None:
logger.info(*args)

def _summarise_timing(self):
self.timing_monitor.report_to_log_or_server(logger)


def _build_halo_list(self, db_timestep):
Expand Down Expand Up @@ -265,9 +263,9 @@ def _is_commit_needed(self, end_of_timestep, end_of_simulation):
return False
if end_of_simulation:
return True
elif end_of_timestep and (time.time() - self._start_time > self._writer_minimum):
elif end_of_timestep and (time.time() - self._last_commit_time > self._writer_minimum):
return True
elif time.time() - self._start_time > self._writer_timeout:
elif time.time() - self._last_commit_time > self._writer_timeout:
return True
else:
return False
Expand All @@ -276,12 +274,15 @@ def _is_commit_needed(self, end_of_timestep, end_of_simulation):
def _commit_results_if_needed(self, end_of_timestep=False, end_of_simulation=False):

if self._is_commit_needed(end_of_timestep, end_of_simulation):
logger.info("Attempting to commit halo properties...")
num_properties = insert_list(self._pending_properties)
logger.info(f"...{num_properties} properties were committed")
self._pending_properties = []
self._start_time = time.time()
self._summarise_timing()
self._commit_results()

self.tracker.report_to_log_or_server(logger)
self.timing_monitor.report_to_log_or_server(logger)

def _commit_results(self):
insert_list(self._pending_properties)
self._pending_properties = []
self._last_commit_time = time.time()

def _queue_results_for_later_commit(self, db_halo, names, results, existing_properties_data):
for n, r in zip(names, results):
Expand Down Expand Up @@ -513,8 +514,6 @@ def run_timestep_calculation(self, db_timestep):
self._log_once_per_timestep("Done with %r", db_timestep)
self._unload_timestep()

self.tracker.report_to_log_or_server(logger)

self._commit_results_if_needed(end_of_timestep=True)

def _add_prerequisites_to_calculator_instances(self, db_timestep):
Expand Down Expand Up @@ -545,7 +544,7 @@ def run_calculation_loop(self):

parallel_tasks.database.synchronize_creator_object()

self._start_time = time.time()
self._last_commit_time = time.time()
self._pending_properties = []

for f_obj in self._get_parallel_timestep_iterator():
Expand All @@ -556,8 +555,8 @@ def run_calculation_loop(self):

class CalculationSuccessTracker(accumulative_statistics.StatisticsAccumulatorBase):
def __init__(self, allow_parallel=False):
self.reset() # comes before __init__, since the latter stores a copy for use in report_to_log_if_needed
super().__init__(allow_parallel=allow_parallel)
self.reset()

self._posted_errors = parallel_tasks.shared_set.SharedSet('posted_errors',allow_parallel)

Expand Down Expand Up @@ -607,3 +606,11 @@ def add(self, other):
self._skipped_loading_error += other._skipped_loading_error
self._skipped_existing += other._skipped_existing
self._skipped_missing_prerequisite += other._skipped_missing_prerequisite

def __eq__(self, other):
return type(other) == type(self) and \
self._succeeded == other._succeeded and \
self._skipped_error == other._skipped_error and \
self._skipped_loading_error == other._skipped_loading_error and \
self._skipped_existing == other._skipped_existing and \
self._skipped_missing_prerequisite == other._skipped_missing_prerequisite
Loading

0 comments on commit 6358013

Please sign in to comment.