Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add some performance tracking to pynbody_server.py #233

Merged
merged 14 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tangos/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@
from .core import *
from .query import *

__version__ = '1.9.0'
__version__ = '1.9.1'
3 changes: 3 additions & 0 deletions tangos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@
# Property writer: don't bother committing even if a timestep is finished if this time hasn't elapsed:
PROPERTY_WRITER_MINIMUM_TIME_BETWEEN_COMMITS = 300 # seconds

# Minimum time between providing updates to the user during tangos write, when running in parallel
PROPERTY_WRITER_PARALLEL_STATISTICS_TIME_BETWEEN_UPDATES = 600 # seconds

try:
from .config_local import *
except:
Expand Down
1 change: 0 additions & 1 deletion tangos/input_handlers/output_testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ def enumerate_timestep_extensions(self, parallel=False):
pre_extension_length = len(os.path.join(config.base, self.basename))
steps = glob.glob(os.path.join(config.base, self.basename, "step.*"))
for i in steps:
print(i, i[pre_extension_length:], self.strip_slashes(i[pre_extension_length:]))
yield self.strip_slashes(i[pre_extension_length:])

def get_timestep_properties(self, ts_extension):
Expand Down
5 changes: 4 additions & 1 deletion tangos/input_handlers/pynbody.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,12 @@ def load_timestep_without_caching(self, ts_extension, mode=None):
raise NotImplementedError("Load mode %r is not implemented"%mode)

def load_region(self, ts_extension, region_specification, mode=None):
if mode is None or mode=='server':
if mode is None:
timestep = self.load_timestep(ts_extension, mode)
return timestep[region_specification]
elif mode=='server':
timestep = self.load_timestep(ts_extension, mode)
return timestep.get_view(region_specification)
elif mode=='server-shared-mem':
from ..parallel_tasks import pynbody_server as ps
timestep = self.load_timestep(ts_extension, mode)
Expand Down
15 changes: 14 additions & 1 deletion tangos/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ def __init__(self):
self.buffer = StringIO()
self.handler_buffer = logging.StreamHandler(self.buffer)
self.handler_buffer.setLevel(logging.INFO)
self.handler_buffer.setFormatter(formatter)
self._suspended_handlers = []

def __enter__(self):
self._suspended_handlers = copy.copy(logger.handlers)
for x_handler in self._suspended_handlers:
logger.removeHandler(x_handler)
logger.addHandler(self.handler_buffer)
return self

def __exit__(self, *exc_info):
for x_handler in self._suspended_handlers:
Expand All @@ -34,8 +36,19 @@ def __exit__(self, *exc_info):
def get_output(self):
return self.buffer.getvalue()

def get_output_without_timestamps(self):
lines = self.get_output().split("\n")
result = ""
for l in lines:
try:
result += l.split(" : ", 1)[1]+"\n"
except IndexError:
result += l+"\n"
return result


def set_identity_string(identifier):
global handler_stderr
formatter = logging.Formatter(identifier+"%(asctime)s : %(message)s")
handler_stderr.setFormatter(formatter)
for handler in logger.handlers:
handler.setFormatter(formatter)
37 changes: 23 additions & 14 deletions tangos/parallel_tasks/__init__.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,18 @@
import importlib
import re
import sys
import time
import traceback
import warnings

import tangos.core.creator

from .. import config, core

backend = None
_backend_name = config.default_backend
_num_procs = None # only for multiprocessing backend

_on_exit = [] # list of functions to call when parallelism is shutting down

from .. import log
from ..log import logger
from . import backends, jobs, message
from . import accumulative_statistics, jobs, message


def use(name):
Expand Down Expand Up @@ -51,10 +48,15 @@ def parallelism_is_active():
global _backend_name
return _backend_name != 'null' and backend is not None

def launch(function, args=None):
def launch(function, args=None, backend_kwargs=None):
if args is None:
args = []

if backend_kwargs is None:
backend_kwargs = {}

result = None

# we need to close any existing connections because we may fork, which leads to
# buggy/unreliable behaviour. This should invalidate the session attached to
# any existing objects, which is intended behaviour. If you are using parallel
Expand All @@ -71,15 +73,17 @@ def launch(function, args=None):
try:
core.close_db()
if _backend_name != 'null':
backend.launch(_exec_function_or_server, [function, connection_info, args])
result = backend.launch(_exec_function_or_server, [function, connection_info, args], **backend_kwargs)
else:
function(*args)
result = function(*args)
finally:
if connection_info is not None:
core.init_db(*connection_info)
finally:
deinit_backend()

return result

def distributed(file_list, proc=None, of=None):
"""Distribute a list of tasks between all nodes"""

Expand Down Expand Up @@ -138,13 +142,18 @@ def _server_thread():
else:
obj.process()

def on_exit_parallelism(function):
global _on_exit
_on_exit.append(function)

log.logger.info("Terminating manager process")
def _shutdown_parallelism():
global backend, _on_exit
log.logger.debug("Clearing up process")

for fn in _on_exit:
fn()
_on_exit = []

def _shutdown_parallelism():
global backend
log.logger.info("Terminating worker process")
backend.barrier()
backend.finalize()
backend = None
Expand All @@ -153,6 +162,6 @@ def _shutdown_parallelism():



from . import remote_import
from . import remote_import, shared_set
from .barrier import barrier
from .lock import ExclusiveLock
96 changes: 96 additions & 0 deletions tangos/parallel_tasks/accumulative_statistics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import logging
import time

from ..config import PROPERTY_WRITER_PARALLEL_STATISTICS_TIME_BETWEEN_UPDATES
from ..log import logger
from .message import Message

_new_accumulator_requested_for_ranks = []
_new_accumulator = None
_existing_accumulators = []
class CreateNewAccumulatorMessage(Message):

def process(self):
from . import backend
global _new_accumulator, _new_accumulator_requested_for_ranks, _existing_accumulators
assert issubclass(self.contents, StatisticsAccumulatorBase)
if _new_accumulator is None:
_new_accumulator = self.contents()
_new_accumulator_requested_for_ranks = [self.source]
else:
assert self.source not in _new_accumulator_requested_for_ranks
assert isinstance(_new_accumulator, self.contents)
_new_accumulator_requested_for_ranks.append(self.source)

from . import backend

if len(_new_accumulator_requested_for_ranks) == backend.size()-1:
self._confirm_new_accumulator()

def _confirm_new_accumulator(self):
global _new_accumulator, _new_accumulator_requested_for_ranks, _existing_accumulators
from . import backend, on_exit_parallelism
accumulator_id = len(_existing_accumulators)
_existing_accumulators.append(_new_accumulator)

locally_bound_accumulator = _new_accumulator
logger.debug("Created new accumulator of type %s with id %d" % (locally_bound_accumulator.__class__.__name__, accumulator_id))
on_exit_parallelism(lambda: locally_bound_accumulator.report_to_log_if_needed(logger, 0.05))

_new_accumulator = None
_new_accumulator_requested_for_ranks = []

for destination in range(1, backend.size()):
AccumulatorIdMessage(accumulator_id).send(destination)
class AccumulatorIdMessage(Message):
pass
class AccumulateStatisticsMessage(Message):
def process(self):
global _existing_accumulators
_existing_accumulators[self.contents.id].add(self.contents)

class StatisticsAccumulatorBase:
REPORT_AFTER = PROPERTY_WRITER_PARALLEL_STATISTICS_TIME_BETWEEN_UPDATES
def __init__(self, allow_parallel=False):
"""This is a base class for accumulating statistics, possibly in parallel across multiple processes.

Note that if allow_parallel is True, then all processes must create an instance of this class
(in effect creating the class will act like a barrier). If only some processes create an instance,
they will block, possibly creating a deadlock. This is why allow_parallel defaults to False.
"""
from . import backend, parallelism_is_active
self._last_reported = time.time()
self._parallel = allow_parallel and parallelism_is_active() and backend.rank() != 0
if self._parallel:
logger.debug(f"Registering {self.__class__}")
CreateNewAccumulatorMessage(self.__class__).send(0)
logger.debug(f"Awaiting accumulator id for {self.__class__}")
self.id = AccumulatorIdMessage.receive(0).contents
logger.debug(f"Received accumulator id={ self.id}")

def report_to_server(self):
if self._parallel:
AccumulateStatisticsMessage(self).send(0)
self.reset()

def reset(self):
raise NotImplementedError("This method should be overriden to reset the statistics")

def add(self, other):
raise NotImplementedError("This method should be overriden to add two accumulations together")

def report_to_log(self, logger):
raise NotImplementedError("This method should be overriden to log a statistics report")

def report_to_log_or_server(self, logger):
if self._parallel:
self.report_to_server()
else:
self.report_to_log(logger)

def report_to_log_if_needed(self, logger, after_time=None):
if after_time is None:
after_time = self.REPORT_AFTER
if time.time() - self._last_reported > after_time:
self.report_to_log(logger)
self._last_reported = time.time()
Loading
Loading