Skip to content

Commit

Permalink
Merge branch 'release/6.7.1'
Browse files Browse the repository at this point in the history
  • Loading branch information
mtzl committed Jun 8, 2017
2 parents 19c0703 + cc3f4f6 commit 2bf7afd
Show file tree
Hide file tree
Showing 17 changed files with 102 additions and 56 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
Unreleased changes
------------------

6.7.1 / 2017-06-08
------------------
* ControlHost improvements
* Change ``every`` behavior in pipeline
* h5chain multifile fix

6.7.0 / 2017-05-08
------------------
Expand Down
2 changes: 1 addition & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@
# -- Options for Example Gallery ------------------------------------------
sphinx_gallery_conf = {
# path to store the module using example template
'mod_example_dir': 'modules/generated',
'backreferences_dir': 'modules/generated',
'doc_module': ('km3pipe', ),
'default_thumb_file': '_static/default_gallery_thumbnail.png',
'examples_dirs': ['../examples'],
Expand Down
2 changes: 1 addition & 1 deletion docs/version.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
KM3Pipe 6.7.0
KM3Pipe 6.7.1
=============
37 changes: 11 additions & 26 deletions km3modules/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,15 +138,12 @@ def process(self, blob):

class StatusBar(Module):
"""Displays the current blob number."""
def __init__(self, **context):
super(self.__class__, self).__init__(**context)
self.every = self.get('every') or 100
self.blob_index = 0
def configure(self):
self.iteration = 1

def process(self, blob):
if self.blob_index % self.every == 0:
prettyln("Blob {0:>7}".format(self.blob_index))
self.blob_index += 1
prettyln("Blob {0:>7}".format(self.every * self.iteration))
self.iteration += 1
return blob

def finish(self):
Expand All @@ -158,40 +155,28 @@ class TickTock(Module):
Parameters
----------
every: int, optional [default=100]
every: int, optional [default=1]
Number of iterations between printout.
"""
def __init__(self, **context):
super(self.__class__, self).__init__(**context)
self.every = self.get('every') or 100
self.blob_index = 0
def configure(self):
self.t0 = time()

def process(self, blob):
if self.blob_index % self.every == 0:
t1 = (time() - self.t0)/60
prettyln("Time/min: {0:.3f}".format(t1))
self.blob_index += 1
t1 = (time() - self.t0)/60
prettyln("Time/min: {0:.3f}".format(t1))
return blob


class MemoryObserver(Module):
"""Shows the maximum memory usage
Parameters
----------
every: int, optional [default=100]
every: int, optional [default=1]
Number of iterations between printout.
"""
def __init__(self, **context):
super(self.__class__, self).__init__(**context)
self.every = self.get('every') or 100
self.blob_index = 0

def process(self, blob):
if self.blob_index % self.every == 0:
memory = peak_memory_usage()
prettyln("Memory peak: {0:.3f} MB".format(memory))
self.blob_index += 1
memory = peak_memory_usage()
prettyln("Memory peak: {0:.3f} MB".format(memory))
return blob


Expand Down
2 changes: 1 addition & 1 deletion km3pipe/__version__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

log = logging.getLogger(__name__) # pylint: disable=C0103

version_info = (6, 7, 0, 'final', 0)
version_info = (6, 7, 1, 'final', 0)


def _get_version(version_info):
Expand Down
19 changes: 12 additions & 7 deletions km3pipe/controlhost.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,20 @@ def get_message(self):
while True:
log.info(" Waiting for control host Prefix")
try:
prefix = Prefix(data=self.socket.recv(Prefix.SIZE))
data = self.socket.recv(Prefix.SIZE)
log.info(" raw prefix data received: '{0}'".format(data))
if data == b'':
raise EOFError
prefix = Prefix(data=data)
except (UnicodeDecodeError, OSError, struct.error):
log.error("Failed to construct Prefix.")
log.error("Failed to construct Prefix, reconnecting.")
self._reconnect()
raise ValueError
continue
if str(prefix.tag) not in self.valid_tags:
log.error("Invalid tag '{0}' received, ignoring the message."
.format(prefix.tag))
print("Valid tags are: {0}".format(self.valid_tags))
log.error("Invalid tag '{0}' received, ignoring the message \n"
"and reconnecting.\n"
" -> valid tags are: {0}"
.format(prefix.tag, self.valid_tags))
self._reconnect()
continue
else:
Expand All @@ -93,7 +98,7 @@ def get_message(self):
message += self.socket.recv(buffer_size)
except OSError:
log.error("Failed to construct message.")
raise ValueError
raise BufferError
log.info(" ------ returning message with {0} bytes"
.format(len(message)))
return prefix, message
Expand Down
11 changes: 11 additions & 0 deletions km3pipe/core.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ class Pipeline(object):
else:
module.only_if = None

if 'every' in kwargs:
module.every = kwargs['every']
else:
module.every = 1

self._timeit[module] = {'process': deque(maxlen=STAT_LIMIT),
'process_cpu': deque(maxlen=STAT_LIMIT),
'finish': 0,
Expand Down Expand Up @@ -156,6 +161,11 @@ class Pipeline(object):
"'{1}'.".format(module.name, module.only_if))
continue

if (self._cycle_count + 1) % module.every != 0:
log.debug("Skipping {0} (every {1} iterations)."
.format(module.name, module.every))
continue

log.debug("Processing {0} ".format(module.name))
start = timer()
start_cpu = time.clock()
Expand Down Expand Up @@ -282,6 +292,7 @@ class Module(object):
self._name = name
self.parameters = parameters
self.only_if = None
self.every = 1
self.detector = None
self.timeit = self.get('timeit') or False
self._timeit = {'process': deque(maxlen=STAT_LIMIT),
Expand Down
10 changes: 7 additions & 3 deletions km3pipe/io/ch.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,12 @@ def _run(self):
log.debug("Waiting for data from network...")
prefix, data = self.client.get_message()
log.debug("{0} bytes received from network.".format(len(data)))
except ValueError:
log.error("Corrupt data recieved, skipping...")
continue
except EOFError:
log.warn("EOF from Ligier, aborting...")
break
except BufferError:
log.error("Buffer error in Ligier stream, aborting...")
break
if not data:
log.critical("No data received, connection died.\n" +
"Trying to reconnect in 30 seconds.")
Expand Down Expand Up @@ -125,4 +128,5 @@ def process(self, blob):
def finish(self):
"""Clean up the JLigier controlhost connection"""
log.debug("Disconnecting from JLigier.")
self.client.socket.shutdown(socket.SHUT_RDWR)
self.client._disconnect()
2 changes: 1 addition & 1 deletion km3pipe/io/hdf5.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def _write_array(self, where, arr, title=''):
filters=self.filters, createparents=True,
expectedrows=self.n_rows_expected,
)
if(level < 4):
if(level < 5):
self._tables[where] = tab
else:
tab = self._tables[where]
Expand Down
18 changes: 10 additions & 8 deletions km3pipe/io/pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import numpy as np
import pandas as pd
import tables as tb
import h5py

from km3pipe.logger import logging
from km3pipe.tools import insert_prefix_to_dtype
Expand Down Expand Up @@ -52,14 +53,10 @@ class H5Chain(object):
"""
def __init__(self, filenames):
self.h5files = {}
for fn in filenames:
h5 = tb.File(fn, 'r')
self.h5files[fn] = h5
self.filenames = filenames

def close(self):
for h5 in self.h5files.values():
h5.close()
pass

def __exit__(self, exc_type, exc_value, traceback):
self.close()
Expand All @@ -69,8 +66,13 @@ def __enter__(self):

def __getitem__(self, key):
dfs = []
for fname, h5 in self.h5files.items():
tab = h5.get_node(key)[:]
for fname in self.filenames:
with h5py.File(fname, 'r') as h5:
try:
tab = h5[key][:]
except KeyError as ke:
log.error('{} does not exist in {}!'.format(key, fname))
raise ke
df = pd.DataFrame(tab)
dfs.append(df)
dfs = pd.concat(dfs, axis=0, ignore_index=True)
Expand Down
1 change: 1 addition & 0 deletions km3pipe/kp-data/stylelib/moritz.mplstyle
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
axes.linewidth: 2
axes.prop_cycle: cycler('color', ['008fd5', 'fc4f30', 'e5ae38', '6d904f', '8b8b8b', '810f7c'])

image.cmap: viridis

Expand Down
2 changes: 1 addition & 1 deletion km3pipe/style/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
try:
import matplotlib.pyplot as plt
except ImportError:
print("Please install matplotlib: `pip install matplotlib`")
raise ImportError("Please install matplotlib: `pip install matplotlib`")
import km3pipe as kp


Expand Down
25 changes: 25 additions & 0 deletions km3pipe/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,31 @@ def test_conditional_module_called_if_key_in_blob(self):
self.assertEqual(1, pl.modules[1].process.call_count)
self.assertEqual(1, pl.modules[2].process.call_count)

def test_condition_every(self):
pl = Pipeline(blob=1)

pl.attach(Module, 'module1')
pl.attach(Module, 'module2', every=3)
pl.attach(Module, 'module3', every=9)
pl.attach(Module, 'module4', every=10)
pl.attach(Module, 'module5')

func_module = MagicMock()
func_module.__name__ = "MagicMock"
pl.attach(func_module, 'funcmodule', every=4)

for module in pl.modules:
module.process = MagicMock(return_value={})

pl.drain(9)

self.assertEqual(9, pl.modules[0].process.call_count)
self.assertEqual(3, pl.modules[1].process.call_count)
self.assertEqual(1, pl.modules[2].process.call_count)
self.assertEqual(0, pl.modules[3].process.call_count)
self.assertEqual(9, pl.modules[4].process.call_count)
self.assertEqual(2, func_module.call_count)

def test_drain_calls_function_modules(self):
pl = Pipeline(blob=1)

Expand Down
16 changes: 12 additions & 4 deletions km3pipe/utils/hdf2root.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@
Convert HDF5 to vanilla ROOT.
Usage:
hdf2root FILES...
hdf2root [--verbose] FILES...
hdf2root (-h | --help)
hdf2root --version
Options:
-h --help Show this screen.
--verbose Print more info [default: False]
"""

from __future__ import division, absolute_import, print_function
Expand All @@ -18,6 +19,8 @@
import os
from datetime import datetime

import numpy as np

from km3pipe import version
from km3modules.common import StatusBar

Expand All @@ -30,7 +33,7 @@
__status__ = "Development"


def hdf2root(infile, outfile):
def hdf2root(infile, outfile, verbose=False):
from rootpy.io import root_open
from rootpy import asrootpy
from root_numpy import array2tree
Expand All @@ -45,7 +48,11 @@ def hdf2root(infile, outfile):
# => this moronic nested loop instead of simple `walk`
for group in h5.walk_groups():
for leafname, leaf in group._v_leaves.items():
tree = asrootpy(array2tree(leaf[:], name=leaf._v_pathname))
arr = leaf[:]
if arr.dtype.names is None:
dt = np.dtype((arr.dtype, [(leafname, arr.dtype)]))
arr = arr.view(dt)
tree = asrootpy(array2tree(arr, name=leaf._v_pathname))
tree.write()
rf.close()
h5.close()
Expand All @@ -56,5 +63,6 @@ def main():
args = docopt(__doc__, version=version)

files = args['FILES']
verbose = bool(args['--verbose'])
for fn in files:
hdf2root(fn, fn+'.root')
hdf2root(fn, fn+'.root', verbose=verbose)
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ scipy>=0.19
tornado
urwid
websocket-client
sphinx>=1.5.1
sphinx==1.5.6
numpydoc
sphinx_gallery
sklearn
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def finalize_options(self):
require_groups = {
'docs': ['numpydoc', 'pillow',
'scikit-learn', 'sphinx-gallery',
'sphinx>=1.5.1', 'sphinxcontrib-napoleon', ],
'sphinx==1.5.6', 'sphinxcontrib-napoleon', ],
'base': ['cython', 'docopt', 'numpy>=1.12', 'pandas', 'pytz',
'six', ],
'analysis': ['matplotlib>=2.0.0', 'sklearn', 'statsmodels>=0.8',
Expand Down
2 changes: 1 addition & 1 deletion sphinx_requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ pillow
numpydoc
ipython
sphinx-gallery
sphinx>=1.5.1
sphinx==1.5.6
sphinxcontrib-napoleon
scikit-learn
seaborn
Expand Down

0 comments on commit 2bf7afd

Please sign in to comment.