Skip to content

Commit

Permalink
Merge pull request #242 from pynbody/server-shmem
Browse files Browse the repository at this point in the history
New shared memory server mode
  • Loading branch information
apontzen authored Nov 29, 2023
2 parents 9a595a1 + 6ffabb3 commit a3ebd96
Show file tree
Hide file tree
Showing 18 changed files with 757 additions and 452 deletions.
5 changes: 3 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@
'scipy >= 0.14.0',
'more_itertools >= 8.0.0',
'matplotlib >= 3.0.0', # for web interface
'tqdm >= 4.59.0'
'tqdm >= 4.59.0',
'tblib >= 3.0.0',
]

tests_require = [
'pytest >= 5.0.0',
'webtest >= 2.0',
'pyquery >= 1.3.0',
'pynbody >= 1.3.2',
'pynbody >= 1.5.0',
'yt>=3.4.0',
'PyMySQL>=1.0.2',
]
Expand Down
2 changes: 1 addition & 1 deletion tangos/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@
from .core import *
from .query import *

__version__ = '1.8.1'
__version__ = '1.9.0'
9 changes: 7 additions & 2 deletions tangos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,14 @@
diff_default_rtol = 1e-3


# Database import: how many rows to copy at a time, and when to issue a commit
# Database import: how many rows to copy at a time
DB_IMPORT_CHUNK_SIZE = 10
DB_IMPORT_COMMIT_AFTER_CHUNKS = 500

# Property writer: longest to wait before trying to commit properties (even if in middle of timestep)
PROPERTY_WRITER_MAXIMUM_TIME_BETWEEN_COMMITS = 600 # seconds

# Property writer: don't bother committing even if a timestep is finished if this time hasn't elapsed:
PROPERTY_WRITER_MINIMUM_TIME_BETWEEN_COMMITS = 300 # seconds

try:
from .config_local import *
Expand Down
35 changes: 23 additions & 12 deletions tangos/input_handlers/pynbody.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@
import time
import weakref
from collections import defaultdict
from itertools import chain

import numpy as np
from more_itertools import always_iterable

from ..util import proxy_object

Expand Down Expand Up @@ -36,8 +34,8 @@ class PynbodyInputHandler(finding.PatternBasedFileDiscovery, HandlerBase):
def __new__(cls, *args, **kwargs):
import pynbody as pynbody_local

if pynbody_local.__version__<"1.2.2":
raise ImportError("Using tangos with pynbody requires pynbody 1.2.2 or later")
if pynbody_local.__version__<"1.5.0":
raise ImportError("Using tangos with pynbody requires pynbody 1.5.0 or later")

global pynbody
pynbody = pynbody_local
Expand Down Expand Up @@ -82,19 +80,22 @@ def load_timestep_without_caching(self, ts_extension, mode=None):
f = pynbody.load(self._extension_to_filename(ts_extension))
f.physical_units()
return f
elif mode=='server' or mode=='server-partial':
elif mode in ('server', 'server-partial', 'server-shared-mem'):
from ..parallel_tasks import pynbody_server as ps
return ps.RemoteSnapshotConnection(self,ts_extension)
return ps.RemoteSnapshotConnection(self, ts_extension,
shared_mem = (mode == 'server-shared-mem'))
else:
raise NotImplementedError("Load mode %r is not implemented"%mode)

def load_region(self, ts_extension, region_specification, mode=None):
if mode is None:
if mode is None or mode=='server':
timestep = self.load_timestep(ts_extension, mode)
return timestep[region_specification]
elif mode=='server':
elif mode=='server-shared-mem':
from ..parallel_tasks import pynbody_server as ps
timestep = self.load_timestep(ts_extension, mode)
return timestep.get_view(region_specification)
simsnap = timestep.shared_mem_view
return simsnap[region_specification].get_copy_on_access_simsnap()
elif mode=='server-partial':
timestep = self.load_timestep(ts_extension, mode)
view = timestep.get_view(region_specification)
Expand All @@ -114,19 +115,29 @@ def load_object(self, ts_extension, finder_id, finder_offset, object_typetag='ha
h_file = h.load_copy(finder_offset)
h_file.physical_units()
return h_file
elif mode=='server':
elif mode=='server' :
timestep = self.load_timestep(ts_extension, mode)
from ..parallel_tasks import pynbody_server as ps
return timestep.get_view(ps.ObjectSpecification(finder_id, finder_offset, object_typetag))
return timestep.get_view(
ps.snapshot_queue.ObjectSpecification(finder_id, finder_offset, object_typetag))
elif mode=='server-partial':
timestep = self.load_timestep(ts_extension, mode)
from ..parallel_tasks import pynbody_server as ps
view = timestep.get_view(ps.ObjectSpecification(finder_id, finder_offset, object_typetag))
view = timestep.get_view(
ps.snapshot_queue.ObjectSpecification(finder_id, finder_offset, object_typetag))
load_index = view['remote-index-list']
logger.info("Partial load %r, taking %d particles", ts_extension, len(load_index))
f = pynbody.load(self._extension_to_filename(ts_extension), take=load_index)
f.physical_units()
return f
elif mode=='server-shared-mem':
timestep = self.load_timestep(ts_extension, mode)
from ..parallel_tasks import pynbody_server as ps
view = timestep.get_view(
ps.snapshot_queue.ObjectSpecification(finder_id, finder_offset, object_typetag))
view_index = view['remote-index-list']
return timestep.shared_mem_view[view_index].get_copy_on_access_simsnap()

elif mode is None:
h = self._construct_halo_cat(ts_extension, object_typetag)
return h[finder_offset]
Expand Down
4 changes: 2 additions & 2 deletions tangos/parallel_tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,12 @@ def _server_thread():
obj.process()


log.logger.info("Terminating manager")
log.logger.info("Terminating manager process")


def _shutdown_parallelism():
global backend
log.logger.info("Shutting down parallel_tasks")
log.logger.info("Terminating worker process")
backend.barrier()
backend.finalize()
backend = None
Expand Down
16 changes: 12 additions & 4 deletions tangos/parallel_tasks/backends/multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
import signal
import sys
import threading
from typing import Optional

import tblib.pickling_support

_slave = False
_rank = None
Expand Down Expand Up @@ -86,6 +89,8 @@ def finalize():
_pipe.send("finalize")

def launch_wrapper(target_fn, rank_in, size_in, pipe_in, args_in):
tblib.pickling_support.install()

global _slave, _rank, _size, _pipe, _recv_lock
_rank = rank_in
_size = size_in
Expand All @@ -103,10 +108,12 @@ def launch_wrapper(target_fn, rank_in, size_in, pipe_in, args_in):
print("Error on a sub-process:", file=sys.stderr)
traceback.print_exception(exc_type, exc_value, exc_traceback,
file=sys.stderr)
_pipe.send(("error", e))
_pipe.send(("error", exc_value, exc_traceback))

_pipe.close()

class RemoteException(Exception):
pass

def launch_functions(functions, args):
global _slave
Expand All @@ -125,7 +132,7 @@ def launch_functions(functions, args):
proc_i.start()

running = [True for rank in range(num_procs)]
error = False
error: Optional[Exception] = None

while any(running):
for i, pipe_i in enumerate(parent_connections):
Expand All @@ -136,6 +143,7 @@ def launch_functions(functions, args):
running[i]=False
elif isinstance(message[0], str) and message[0]=='error':
error = message[1]
traceback = message[2]
running = [False]
break
else:
Expand All @@ -153,8 +161,8 @@ def launch_functions(functions, args):
os.kill(proc_i.pid, signal.SIGTERM)
proc_i.join()

if error:
raise error
if error is not None:
raise error.with_traceback(traceback)



Expand Down
4 changes: 2 additions & 2 deletions tangos/parallel_tasks/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ def process(self):
global j, num_jobs, current_job
source = self.source
if current_job is not None and num_jobs>0:
log.logger.info("Send job %d of %d to node %d", current_job, num_jobs, source)
log.logger.debug("Send job %d of %d to node %d", current_job, num_jobs, source)
else:
num_jobs = None
current_job = None # in case num_jobs=0, still want to send 'end of loop' signal to client
log.logger.info("Finished jobs; notify node %d", source)
log.logger.debug("Finished jobs; notify node %d", source)

MessageDeliverJob(current_job).send(source)

Expand Down
Loading

0 comments on commit a3ebd96

Please sign in to comment.