Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PMDA with refactored _single_frame #128

Draft
wants to merge 17 commits into
base: master
Choose a base branch
from
Draft
41 changes: 22 additions & 19 deletions pmda/density.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,8 @@ def __init__(self, atomgroup, delta=1.0, atomselection=None,
metadata=None, padding=2.0, updating=False,
parameters=None, gridcenter=None, xdim=None, ydim=None,
zdim=None):
u = atomgroup.universe
super(DensityAnalysis, self).__init__(u, (atomgroup, ))
universe = atomgroup.universe
super().__init__(universe)
self._atomgroup = atomgroup
self._delta = delta
self._atomselection = atomselection
Expand All @@ -253,7 +253,7 @@ def __init__(self, atomgroup, delta=1.0, atomselection=None,
self._xdim = xdim
self._ydim = ydim
self._zdim = zdim
self._trajectory = u.trajectory
self._trajectory = universe.trajectory
if updating and atomselection is None:
raise ValueError("updating=True requires a atomselection string")
elif not updating and atomselection is not None:
Expand Down Expand Up @@ -289,20 +289,31 @@ def _prepare(self):
grid, edges = np.histogramdd(np.zeros((1, 3)), bins=bins,
range=arange, normed=False)
grid *= 0.0
self._grid = grid

self._results = [grid] * self.n_frames
self._edges = edges
self._arange = arange
self._bins = bins

def _single_frame(self, ts, atomgroups):
coord = self.current_coordinates(atomgroups[0], self._atomselection,
self._updating)
result = np.histogramdd(coord, bins=self._bins, range=self._arange,
normed=False)
return result[0]
def _single_frame(self):
h, _ = np.histogramdd(self._atomgroup.positions,
bins=self._bins, range=self._arange,
normed=False)
# reduce (proposed change #2542 to match the parallel version in pmda.density)
# return self._reduce(self._grid, h)
#
# serial code can simply do

# the current timestep of the trajectory is self._ts
# self._results[self._frame_index][0] = self._ts.frame
# the actual trajectory is at self._trajectory
# self._results[self._frame_index][1] = self._trajectory.time
self._results[self._frame_index] = h
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Storing a full histogram for each frame is bad – you can easily run out of memory. I think it is important that the aggregation is done every step and not just in _conclude.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But isn't that what also happens with _reduce? It won't pass the full histogram back to the main process, but only the calculated frames in _dask_helper.

Copy link
Member

@orbeckst orbeckst Jul 15, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the density reduce

pmda/pmda/density.py

Lines 326 to 332 in 13fa3b5

def _reduce(res, result_single_frame):
""" 'accumulate' action for a time series"""
if isinstance(res, list) and len(res) == 0:
res = result_single_frame
else:
res += result_single_frame
return res
does an in-place addition (not a list append) in line 331. In _conclude

pmda/pmda/density.py

Lines 305 to 306 in 13fa3b5

self._grid = self._results[:].sum(axis=0)
self._grid /= float(self.n_frames)
we only sum over the summed histograms of the blocks and then divide by all frames to get the average.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, the PMDA paper has a discussion on that topic.


def _conclude(self):
self._grid = self._results[:].sum(axis=0)

# sum both inside and among blocks.
self._grid = self._results[:].sum(axis=(0, 1))
self._grid /= float(self.n_frames)
metadata = self._metadata if self._metadata is not None else {}
metadata['psf'] = self._atomgroup.universe.filename
Expand All @@ -322,14 +333,6 @@ def _conclude(self):
density.make_density()
self.density = density

@staticmethod
def _reduce(res, result_single_frame):
""" 'accumulate' action for a time series"""
if isinstance(res, list) and len(res) == 0:
res = result_single_frame
else:
res += result_single_frame
return res
Comment on lines -325 to -332
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like the design that gets rid of reduce.


@staticmethod
def current_coordinates(atomgroup, atomselection, updating):
Expand Down
111 changes: 59 additions & 52 deletions pmda/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class Timing(object):
store various timeing results of obtained during a parallel analysis run
"""

def __init__(self, io, compute, total, universe, prepare,
def __init__(self, io, compute, total, prepare, prepare_dask,
conclude, wait=None, io_block=None,
compute_block=None):
self._io = io
Expand All @@ -44,8 +44,8 @@ def __init__(self, io, compute, total, universe, prepare,
self._compute_block = compute_block
self._total = total
self._cumulate = np.sum(io) + np.sum(compute)
self._universe = universe
self._prepare = prepare
self._prepare_dask = prepare_dask
self._conclude = conclude
self._wait = wait

Expand Down Expand Up @@ -83,16 +83,16 @@ def cumulate_time(self):
"""
return self._cumulate

@property
def universe(self):
"""time to create a universe for each block"""
return self._universe

@property
def prepare(self):
"""time to prepare"""
return self._prepare

@property
def prepare_dask(self):
"""time for blocks to start working"""
return self._prepare_dask

@property
def conclude(self):
"""time to conclude"""
Expand Down Expand Up @@ -189,7 +189,7 @@ def _reduce(res, result_single_frame):

"""

def __init__(self, universe, atomgroups):
def __init__(self, universe):
"""Parameters
----------
Universe : :class:`~MDAnalysis.core.groups.Universe`
Expand All @@ -207,10 +207,8 @@ def __init__(self, universe, atomgroups):
:meth:`pmda.parallel.ParallelAnalysisBase._single_frame`.

"""
self._universe = universe
self._trajectory = universe.trajectory
self._top = universe.filename
self._traj = universe.trajectory.filename
self._indices = [ag.indices for ag in atomgroups]

@contextmanager
def readonly_attributes(self):
Expand All @@ -232,7 +230,10 @@ def __setattr__(self, key, val):
# guards to stop people assigning to self when they shouldn't
# if locked, the only attribute you can modify is _attr_lock
# if self._attr_lock isn't set, default to unlocked
if key == '_attr_lock' or not getattr(self, '_attr_lock', False):

# Current version depends on modifying the attributes.
# but adding key == '_frame_index' below does not work somehow.
if key == '_attr_lock' or not getattr(self, '_attr_lock', False) or True:
super(ParallelAnalysisBase, self).__setattr__(key, val)
else:
# raise HalError("I'm sorry Dave, I'm afraid I can't do that")
Expand All @@ -253,7 +254,7 @@ def _prepare(self):
"""additional preparation to run"""
pass # pylint: disable=unnecessary-pass

def _single_frame(self, ts, atomgroups):
def _single_frame(self):
"""Perform computation on a single trajectory frame.

Must return computed values as a list. You can only **read**
Expand Down Expand Up @@ -348,14 +349,17 @@ def run(self,
if scheduler == 'processes':
scheduler_kwargs['num_workers'] = n_jobs

start, stop, step = self._trajectory.check_slice_indices(start,
stop, step)
start, stop, step = self._universe.trajectory.check_slice_indices(start,
stop, step)
n_frames = len(range(start, stop, step))

self.start, self.stop, self.step = start, stop, step

self.n_frames = n_frames

# in case _prepare has not set an array.
self._results = np.zeros(n_frames)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure this is always something we want to do. Check the other analysis classes in mda to see if such a default would make sense.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should normally be overridden by the definition inside _prepare to suit what _single_frame saves.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not like this reasoning. This means I always have to know about the default and that I can overwrite it in one place. This just looks weird to me. I would rather have to just set this up in once single place.


if n_frames == 0:
warnings.warn("run() analyses no frames: check start/stop/step")
if n_frames < n_blocks:
Expand All @@ -374,22 +378,24 @@ def run(self,
blocks = []
_blocks = []
with self.readonly_attributes():
for bslice in slices:
task = delayed(
self._dask_helper, pure=False)(
bslice,
self._indices,
self._top,
self._traj, )
blocks.append(task)
# save the frame numbers for each block
_blocks.append(range(bslice.start,
bslice.stop, bslice.step))
blocks = delayed(blocks)
with timeit() as prepare_dask:
for bslice in slices:
task = delayed(
self._dask_helper, pure=False)(
bslice,
self._universe,
)
blocks.append(task)
# save the frame numbers for each block
_blocks.append(range(bslice.start,
bslice.stop, bslice.step))
blocks = delayed(blocks)
time_prepare_dask = prepare_dask.elapsed

# record the time when scheduler starts working
wait_start = time.time()
res = blocks.compute(**scheduler_kwargs)
self._res_dask = res
# hack to handle n_frames == 0 in this framework
if len(res) == 0:
# everything else wants list of block tuples
Expand All @@ -404,47 +410,48 @@ def run(self,
self.timing = Timing(
np.hstack([el[1] for el in res]),
np.hstack([el[2] for el in res]), total.elapsed,
np.array([el[3] for el in res]), time_prepare,
time_prepare,
time_prepare_dask,
conclude.elapsed,
# waiting time = wait_end - wait_start
np.array([el[4]-wait_start for el in res]),
np.array([el[5] for el in res]),
np.array([el[6] for el in res]))
np.array([el[3]-wait_start for el in res]),
np.array([el[4] for el in res]),
np.array([el[5] for el in res]))
return self

def _dask_helper(self, bslice, indices, top, traj):
def _dask_helper(self, bslice, u):
"""helper function to actually setup dask graph"""
# wait_end needs to be first line for accurate timing
wait_end = time.time()
# record time to generate universe and atom groups
with timeit() as b_universe:
u = mda.Universe(top, traj)
agroups = [u.atoms[idx] for idx in indices]

res = []
Comment on lines -419 to -424
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Getting rid of this cludge is great.

times_io = []
times_compute = []
# NOTE: bslice.stop cannot be None! Always make sure
# that it comes from _trajectory.check_slice_indices()!
for i in range(bslice.start, bslice.stop, bslice.step):
block_ind = []

for block_i, i in enumerate(range(bslice.start, bslice.stop, bslice.step)):
self._block_i = block_i
self._frame_index = i
# record io time per frame
# explicit instead of 'for ts in u.trajectory[bslice]'
# so that we can get accurate timing.
with timeit() as b_io:
# explicit instead of 'for ts in u.trajectory[bslice]'
# so that we can get accurate timing.
ts = u.trajectory[i]
# record compute time per frame
self._ts = u.trajectory[i]
with timeit() as b_compute:
res = self._reduce(res, self._single_frame(ts, agroups))
self._single_frame()
times_io.append(b_io.elapsed)

block_ind.append(i)
times_compute.append(b_compute.elapsed)

# as opposed to
# res = []
# for i,ts in traj:
# res.append(self._reduce(...)
# return res
# It does not return the right value except the first block, not totally sure why.

# calculate io and compute time per block
return np.asarray(res), np.asarray(times_io), np.asarray(
times_compute), b_universe.elapsed, wait_end, np.sum(
return np.asarray(self._results)[block_ind], np.asarray(times_io), np.asarray(
times_compute), wait_end, np.sum(
times_io), np.sum(times_compute)

@staticmethod
def _reduce(res, result_single_frame):
""" 'append' action for a time series"""
res.append(result_single_frame)
return res
Comment on lines -445 to -450
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How are you going to deal with aggregations as in DensityAnalysis or complicated reductions as in RMSF?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's still doable...with some workaround. i.e. we can do aggregations and further calculations inside _conclude, but it also means we have to transfer more data back to the main process. (new examples updated in the gist jupyter notebook)

16 changes: 11 additions & 5 deletions pmda/rms/rmsd.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,17 +127,23 @@ class RMSD(ParallelAnalysisBase):
"""
def __init__(self, mobile, ref, superposition=True):
universe = mobile.universe
super(RMSD, self).__init__(universe, (mobile, ))
super(RMSD, self).__init__(universe)
self._atomgroups = (mobile, )
self._ref_pos = ref.positions.copy()
self.superposition = superposition

def _prepare(self):
self._results = np.zeros((self.n_frames, 3))
self.rmsd = None

def _conclude(self):
self.rmsd = np.vstack(self._results)

def _single_frame(self, ts, atomgroups):
return (ts.frame, ts.time,
rms.rmsd(atomgroups[0].positions, self._ref_pos,
superposition=self.superposition))
def _single_frame(self):
# the current timestep of the trajectory is self._ts
self._results[self._frame_index, 0] = self._ts.frame
# the actual trajectory is at self._trajectory
self._results[self._frame_index, 1] = self._trajectory.time
self._results[self._frame_index, 2] = rms.rmsd(self._atomgroups[0].positions,
self._ref_pos,
superposition=self.superposition)
Loading