Skip to content

Commit

Permalink
Keep track of what processes are waiting for when they are communicat…
Browse files Browse the repository at this point in the history
…ion-blocked
  • Loading branch information
apontzen committed Jan 11, 2024
1 parent d231830 commit 14e5bc7
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 14 deletions.
3 changes: 3 additions & 0 deletions tangos/parallel_tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ def _exec_function_or_server(function, connection_info, args):
log.logger.debug("Reinitialising database, args "+str(connection_info))
core.init_db(*connection_info)

from .message import _setup_message_reception_timing_monitor
_setup_message_reception_timing_monitor()

if backend.rank()==0:
_server_thread()
else:
Expand Down
11 changes: 8 additions & 3 deletions tangos/parallel_tasks/accumulative_statistics.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,12 @@ def process_global(self):

from . import on_exit_parallelism

new_accumulator = self.contents()
if self.contents[1]:
# has kwargs
new_accumulator = self.contents[0](**self.contents[1])
else:
new_accumulator = self.contents[0]()

accumulator_id = len(_existing_accumulators)
_existing_accumulators.append(new_accumulator)

Expand All @@ -64,7 +69,7 @@ def process(self):

class StatisticsAccumulatorBase:
REPORT_AFTER = PROPERTY_WRITER_PARALLEL_STATISTICS_TIME_BETWEEN_UPDATES
def __init__(self, allow_parallel=False):
def __init__(self, allow_parallel=False, accumulator_init_kwargs=None):
"""This is a base class for accumulating statistics, possibly in parallel across multiple processes.
Note that if allow_parallel is True, then all processes must create an instance of this class
Expand All @@ -76,7 +81,7 @@ def __init__(self, allow_parallel=False):
self._parallel = allow_parallel and parallelism_is_active() and backend.rank() != 0
if self._parallel:
logger.debug(f"Registering {self.__class__}")
self.id = CreateNewAccumulatorMessage(self.__class__).send_and_get_response(0)
self.id = CreateNewAccumulatorMessage((self.__class__, accumulator_init_kwargs)).send_and_get_response(0)
logger.debug(f"Received accumulator id={ self.id}")
self._state_at_last_report = copy.deepcopy(self)

Expand Down
47 changes: 42 additions & 5 deletions tangos/parallel_tasks/message.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,30 @@
reception_timing_monitor = None

def _setup_message_reception_timing_monitor():
global reception_timing_monitor

from ..util import timing_monitor
from . import backend
if backend is None or backend.rank() == 0:
# server can't gather its own timing information
reception_timing_monitor = timing_monitor.TimingMonitor(allow_parallel=False, label='idle')
else:
reception_timing_monitor = timing_monitor.TimingMonitor(allow_parallel=True, label='response wait',
num_chars=40, show_percentages=False)
return reception_timing_monitor

def update_performance_stats():
from . import backend
if reception_timing_monitor is not None and backend is not None:
assert backend.rank() != 0
reception_timing_monitor.report_to_log_or_server(None)

class MessageMetaClass(type):
_message_classes = {}
_next_tag = 100

timing_monitor = None

def __new__(meta, name, bases, dct):
return super().__new__(meta, name, bases, dct)

Expand All @@ -28,6 +51,14 @@ def register_class(cls, tag):
MessageMetaClass._message_classes[tag] = cls
cls._tag = tag

class MessageWithResponseMetaClass(MessageMetaClass):
def __init__(cls, name, bases, dct):
super().__init__(name, bases, dct)
response_class = type(cls.__name__+"Response", (ServerResponseMessage,), {})
assert response_class.__name__ not in globals()
globals()[response_class.__name__] = response_class # for Pickle
cls._response_class = response_class


class Message(metaclass=MessageMetaClass):
_handler = None
Expand Down Expand Up @@ -62,8 +93,14 @@ def send(self, destination):
@classmethod
def receive(cls, source=None):
from . import backend
global reception_timing_monitor

if reception_timing_monitor is not None:
with reception_timing_monitor(cls):
msg, source, tag = backend.receive_any(source=None)
else:
msg, source, tag = backend.receive_any(source=None)

msg, source, tag = backend.receive_any(source=None)
obj = Message.interpret_and_deserialize(tag, source, msg)

if not isinstance(obj, cls):
Expand All @@ -79,17 +116,17 @@ def process(self):
class ServerResponseMessage(Message):
pass

class MessageWithResponse(Message):
class MessageWithResponse(Message, metaclass=MessageWithResponseMetaClass):
"""An extension of the message class where the server can return a response to each process"""
def respond(self, response):
return ServerResponseMessage(response).send(self.source)
return self._response_class(response).send(self.source)

def send_and_get_response(self, destination):
self.send(destination)
return self.get_response(destination)

def get_response(self, receiving_from):
return ServerResponseMessage.receive(receiving_from).contents
return self._response_class.receive(receiving_from).contents

class BarrierMessageWithResponse(MessageWithResponse):
"""An extension of the message class where the client blocks until all processes have made the request, and then the server responds"""
Expand Down Expand Up @@ -117,7 +154,7 @@ def assert_consistent(self, original_message):

def respond(self, response):
from . import backend
response = ServerResponseMessage(response)
response = self._response_class(response)
for i in range(1, backend.size()):
response.send(i)

Expand Down
3 changes: 3 additions & 0 deletions tangos/tools/property_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,9 @@ def _commit_results_if_needed(self, end_of_timestep=False, end_of_simulation=Fal
self.tracker.report_to_log_or_server(logger)
self.timing_monitor.report_to_log_or_server(logger)

from ..parallel_tasks import message
message.update_performance_stats()

def _commit_results(self):
insert_list(self._pending_properties)
self._pending_properties = []
Expand Down
23 changes: 17 additions & 6 deletions tangos/util/timing_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,14 @@ class TimingMonitor(accumulative_statistics.StatisticsAccumulatorBase):
"""This class keeps track of how long a Property is taking to evaluate, and (if the Property is implemented
to take advantage of this), the time spent on sub-tasks. It provides formatting to place this information
into the log."""
def __init__(self, allow_parallel=False):
def __init__(self, allow_parallel=False, label="running times", num_chars=20, show_percentages=True):
self._label = label
self._num_chars = num_chars
self._show_percentages = show_percentages
self.reset()
super().__init__(allow_parallel=allow_parallel)
super().__init__(allow_parallel=allow_parallel,
accumulator_init_kwargs={'label': label, 'num_chars': num_chars,
'show_percentages': show_percentages})
self._monitoring = None

@contextlib.contextmanager
Expand Down Expand Up @@ -54,7 +59,10 @@ def _start(self, object):

def _end(self):
"""End a timer for the specified object."""
cl = type(self._monitoring)
if isinstance(self._monitoring, type):
cl = self._monitoring
else:
cl = type(self._monitoring)
self._unset_as_monitor_for(self._monitoring)
self._time_marks_info.append("end")
self._time_marks.append(time.time())
Expand Down Expand Up @@ -100,16 +108,19 @@ def report_to_log(self, logger):
if len(self.timings_by_class) == 0:
return
logger.info("")
logger.info("CUMULATIVE RUNNING TIMES, summed over all processes, if applicable")
logger.info(f"CUMULATIVE {self._label.upper()}, summed over all processes, if applicable")
v_tot = 1e-10
for k, v in self.timings_by_class.items():
v_tot += sum(v)

for k, v in self.timings_by_class.items():
name = str(k)[:-2]
name = name.split(".")[-1]
name = "%20s " % (name[-20:])
logger.info(" " + name + f"{self.format_time(sum(v)):>12} | {100 * sum(v) / v_tot:4.1f}%")
name = ("%"+str(self._num_chars)+"s ") % (name[-self._num_chars:])
if self._show_percentages:
logger.info(" " + name + f"{self.format_time(sum(v)):>12} | {100 * sum(v) / v_tot:4.1f}%")
else:
logger.info(" " + name + f"{self.format_time(sum(v)):>12}")
if len(v)>1:
marks_info = self.labels_by_class[k]
logger.info(" ------------ INTERNAL BREAKDOWN ------------" )
Expand Down
3 changes: 3 additions & 0 deletions tests/test_db_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,9 @@ def test_writer_reports_aggregates(fresh_database):
assert "Succeeded: 15 property calculations" in res
assert "myPropertyTakingTime 1.5s" in res

assert "CUMULATIVE RESPONSE WAIT" in res
assert "MessageRequestJobResponse" in res # checking the server response time stats are being printed


class MoreThanOneDummyProperty(properties.PropertyCalculation):
names = "dummy_property_t1", "dummy_property_t2"
Expand Down

0 comments on commit 14e5bc7

Please sign in to comment.