Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PMDA with refactored _single_frame #128

Draft
wants to merge 17 commits into
base: master
Choose a base branch
from
Draft
76 changes: 33 additions & 43 deletions pmda/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class Timing(object):
store various timeing results of obtained during a parallel analysis run
"""

def __init__(self, io, compute, total, universe, prepare,
def __init__(self, io, compute, total, prepare,
conclude, wait=None, io_block=None,
compute_block=None):
self._io = io
Expand All @@ -44,7 +44,6 @@ def __init__(self, io, compute, total, universe, prepare,
self._compute_block = compute_block
self._total = total
self._cumulate = np.sum(io) + np.sum(compute)
self._universe = universe
self._prepare = prepare
self._conclude = conclude
self._wait = wait
Expand Down Expand Up @@ -83,11 +82,6 @@ def cumulate_time(self):
"""
return self._cumulate

@property
def universe(self):
"""time to create a universe for each block"""
return self._universe

@property
def prepare(self):
"""time to prepare"""
Expand Down Expand Up @@ -189,7 +183,7 @@ def _reduce(res, result_single_frame):

"""

def __init__(self, universe, atomgroups):
def __init__(self, universe):
"""Parameters
----------
Universe : :class:`~MDAnalysis.core.groups.Universe`
Expand All @@ -207,10 +201,8 @@ def __init__(self, universe, atomgroups):
:meth:`pmda.parallel.ParallelAnalysisBase._single_frame`.

"""
self._universe = universe
self._trajectory = universe.trajectory
self._top = universe.filename
self._traj = universe.trajectory.filename
self._indices = [ag.indices for ag in atomgroups]

@contextmanager
def readonly_attributes(self):
Expand All @@ -232,7 +224,10 @@ def __setattr__(self, key, val):
# guards to stop people assigning to self when they shouldn't
# if locked, the only attribute you can modify is _attr_lock
# if self._attr_lock isn't set, default to unlocked
if key == '_attr_lock' or not getattr(self, '_attr_lock', False):

# Current version depends on modifying the attributes.
# but adding key == '_frame_index' below does not work somehow.
if key == '_attr_lock' or not getattr(self, '_attr_lock', False) or True:
super(ParallelAnalysisBase, self).__setattr__(key, val)
else:
# raise HalError("I'm sorry Dave, I'm afraid I can't do that")
Expand All @@ -253,7 +248,7 @@ def _prepare(self):
"""additional preparation to run"""
pass # pylint: disable=unnecessary-pass

def _single_frame(self, ts, atomgroups):
def _single_frame(self):
"""Perform computation on a single trajectory frame.

Must return computed values as a list. You can only **read**
Expand Down Expand Up @@ -348,14 +343,17 @@ def run(self,
if scheduler == 'processes':
scheduler_kwargs['num_workers'] = n_jobs

start, stop, step = self._trajectory.check_slice_indices(start,
stop, step)
start, stop, step = self._universe.trajectory.check_slice_indices(start,
stop, step)
n_frames = len(range(start, stop, step))

self.start, self.stop, self.step = start, stop, step

self.n_frames = n_frames

# in case _prepare has not set an array.
self._results = np.zeros(n_frames)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure this is always something we want to do. Check the other analysis classes in mda to see if such a default would make sense.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should normally be overridden by the definition inside _prepare to suit what _single_frame saves.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not like this reasoning. This means I always have to know about the default and that I can overwrite it in one place. This just looks weird to me. I would rather have to just set this up in once single place.


if n_frames == 0:
warnings.warn("run() analyses no frames: check start/stop/step")
if n_frames < n_blocks:
Expand All @@ -378,9 +376,8 @@ def run(self,
task = delayed(
self._dask_helper, pure=False)(
bslice,
self._indices,
self._top,
self._traj, )
self._universe,
)
blocks.append(task)
# save the frame numbers for each block
_blocks.append(range(bslice.start,
Expand Down Expand Up @@ -408,43 +405,36 @@ def run(self,
conclude.elapsed,
# waiting time = wait_end - wait_start
np.array([el[4]-wait_start for el in res]),
np.array([el[5] for el in res]),
np.array([el[6] for el in res]))
np.array([el[5] for el in res]))
return self

def _dask_helper(self, bslice, indices, top, traj):
def _dask_helper(self, bslice, u):
"""helper function to actually setup dask graph"""
# wait_end needs to be first line for accurate timing
wait_end = time.time()
# record time to generate universe and atom groups
with timeit() as b_universe:
u = mda.Universe(top, traj)
agroups = [u.atoms[idx] for idx in indices]

res = []
Comment on lines -419 to -424
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Getting rid of this cludge is great.

times_io = []
times_compute = []
# NOTE: bslice.stop cannot be None! Always make sure
# that it comes from _trajectory.check_slice_indices()!
for i in range(bslice.start, bslice.stop, bslice.step):
block_ind = []
for i, ts in enumerate(self._universe._trajectory[bslice]):
self._frame_index = i
# record io time per frame
with timeit() as b_io:
# explicit instead of 'for ts in u.trajectory[bslice]'
# so that we can get accurate timing.
ts = u.trajectory[i]
# record compute time per frame
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where did the per-frame timing information go?

self._ts = ts
with timeit() as b_compute:
res = self._reduce(res, self._single_frame(ts, agroups))
times_io.append(b_io.elapsed)
self._single_frame()

block_ind.append(i)
times_compute.append(b_compute.elapsed)

# as oppsed to
# res = []
# for i,ts in traj:
# res.append(self._reduce(...)
# return res
# It does not return the right value except the first block, not totally sure why.

# calculate io and compute time per block
return np.asarray(res), np.asarray(times_io), np.asarray(
times_compute), b_universe.elapsed, wait_end, np.sum(
return np.asarray(self._results[block_ind]), np.asarray(times_io), np.asarray(
times_compute), wait_end, np.sum(
times_io), np.sum(times_compute)

@staticmethod
def _reduce(res, result_single_frame):
""" 'append' action for a time series"""
res.append(result_single_frame)
return res
Comment on lines -445 to -450
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How are you going to deal with aggregations as in DensityAnalysis or complicated reductions as in RMSF?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's still doable...with some workaround. i.e. we can do aggregations and further calculations inside _conclude, but it also means we have to transfer more data back to the main process. (new examples updated in the gist jupyter notebook)

16 changes: 11 additions & 5 deletions pmda/rms/rmsd.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,17 +127,23 @@ class RMSD(ParallelAnalysisBase):
"""
def __init__(self, mobile, ref, superposition=True):
universe = mobile.universe
super(RMSD, self).__init__(universe, (mobile, ))
super(RMSD, self).__init__(universe)
self._atomgroups = (mobile, )
self._ref_pos = ref.positions.copy()
self.superposition = superposition

def _prepare(self):
self._results = np.zeros((self.n_frames, 3))
self.rmsd = None

def _conclude(self):
self.rmsd = np.vstack(self._results)

def _single_frame(self, ts, atomgroups):
return (ts.frame, ts.time,
rms.rmsd(atomgroups[0].positions, self._ref_pos,
superposition=self.superposition))
def _single_frame(self):
# the current timestep of the trajectory is self._ts
self._results[self._frame_index, 0] = self._ts.frame
# the actual trajectory is at self._trajectory
self._results[self._frame_index, 1] = self._trajectory.time
self._results[self._frame_index, 2] = rms.rmsd(self._atomgroups[0].positions,
self._ref_pos,
superposition=self.superposition)