Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add feature to delete XML files #53

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,15 @@ More advanced usage allows additional arguments to be specified:
* `--maxits <MAXITS>` will cause the munging pipeline to run for the specified number of iterations and then exit. This can be useful for debugging. Without specifying this option, munging will run indefinitely.
* `--sleeptime <SLEEPTIME>` will cause munging to sleep for the specified number of seconds if no work was done in this iteration (default:3600).
* `--validate` will validate the choice of `topology_selection` MDTraj DSL topology selection queries to make sure they are valid; note that this may take a significant amount of time, so is optional behavior
* `--compress-xml` will compress `.xml` files after unpacking them from old-WS-style result packages to save space
* `--xml compress` will compress `.xml` files after unpacking them from old-WS-style result packages to save space; `--xml delete` will delete `.xml` files after unpacking
* `--delete-tarballs` will delete the old `.tar.bz2` WUs after unpacking them from old-WS-style result packages to save space

#### Usage on `choderalab` Folding@home servers

1. Login to work server using the usual FAH login
2. Check if script is running (`screen -r -d`). If True, stop here.
3. Start a screen session
4. Run with: `munge-fah-data --projects /data/choderalab/fah/projects.csv --outpath /data/choderalab/fah/munged-data --time 600 --nprocesses 16`
4. Run with: `munge-fah-data --projects /data/choderalab/fah/projects.csv --outpath /data/choderalab/fah/munged-data --time 600 --nprocesses 16 --delete-on-unpack --xml compress`
5. To stop, control c when the script is in the "sleep" phase

#### How it works
Expand Down
10 changes: 5 additions & 5 deletions devtools/conda-recipe/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ requirements:
build:
- setuptools
- python

run:
- python
- numpy
Expand All @@ -41,10 +41,10 @@ test:
# Test parallel
- echo "Testing parallel processing"
- rm -rf munged
- munge-fah-data --projects projects.csv --outpath munged --maxits 1 --validate --nprocesses 2 --compress-xml
- munge-fah-data --projects projects.csv --outpath munged --maxits 1 --validate --nprocesses 2 --xml compress
# Test restart after unpacking (shouldn't actually do anything)
- echo "Testing additional processing iterations"
- munge-fah-data --projects projects.csv --outpath munged --maxits 1 --validate --nprocesses 2
- munge-fah-data --projects projects.csv --outpath munged --maxits 1 --validate --nprocesses 2 --xml compress
- munge-fah-data --projects projects.csv --outpath munged --maxits 1
- munge-fah-data --projects projects.csv --outpath munged --maxits 2 --sleeptime 1
# TODO: Test to make sure expected output is found
Expand Down Expand Up @@ -79,8 +79,8 @@ test:
- rm -rf PROJ11507/RUN?/CLONE?/results?
- rm -rf PROJ11419/RUN?/CLONE?/results?
# Test parallel with permanent deletion
- echo "Testing irreversible unpacking"
- munge-fah-data --projects projects.csv --outpath munged --maxits 1 --nprocesses 2 --unpack
- echo "Testing irreversible unpacking with deletion of unpacked XML files"
- munge-fah-data --projects projects.csv --outpath munged --maxits 1 --nprocesses 2 --xml delete --delete-tarballs

about:
home: https://github.com/choderalab/fahmunge
Expand Down
16 changes: 8 additions & 8 deletions fahmunge/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@

# Reads in a list of project details from a CSV file with Core17/18 FAH projects and munges them.

def setup_worker(terminate_event, delete_on_unpack, compress_xml):
def setup_worker(terminate_event, delete_on_unpack, xml_handling):
global global_terminate_event
global_terminate_event = terminate_event
global global_delete_on_unpack
global_delete_on_unpack = delete_on_unpack
global global_compress_xml
global_compress_xml = compress_xml
global xml_handling
global_xml_handling = xml_handling

def worker(args):
return fahmunge.core21.process_core21_clone(*args, terminate_event=global_terminate_event, delete_on_unpack=global_delete_on_unpack, compress_xml=global_compress_xml)
Expand All @@ -36,7 +36,7 @@ def main():
help='Number of threads to use (default: 1)')
parser.add_argument('-d', '--debug', dest='debug', action='store_true', default=False,
help='Run in serial mode and turn on debug output')
parser.add_argument('-u', '--unpack', dest='delete_on_unpack', action='store_true', default=False,
parser.add_argument('-z', '--delete-tarballs', dest='delete_on_unpack', action='store_true', default=False,
help='Delete original results-###.tar.bz2 after unpacking; WARNING: THIS IS DANGEROUS AND COULD DELETE YOUR PRIMARY DATA.')
parser.add_argument('-t', '--time', metavar='TIME', dest='time_limit', action='store', type=int, default=None,
help='Process each project for no more than specified time (in seconds) before moving on to next project')
Expand All @@ -46,8 +46,8 @@ def main():
help='Sleep for specified time (in seconds) between iterations (default: 0)')
parser.add_argument('-v', '--version', action='store_true', default=False,
help='Print version information and exit')
parser.add_argument('-c', '--compress-xml', dest='compress_xml', action='store_true', default=False,
help='If specified, will compress XML data')
parser.add_argument('-x', '--xml', dest='xml_handling', action='store', default='compress',
help='How to handle old WS style .xml files: "compress" (default) or "delete"')
args = parser.parse_args()

if args.version:
Expand Down Expand Up @@ -175,7 +175,7 @@ def main():
print('Using serial debug mode')
print('----------' * 8)
for packed_args in clones_to_process:
fahmunge.core21.process_core21_clone(*packed_args, delete_on_unpack=args.delete_on_unpack, compress_xml=args.compress_xml, signal_handler=signal_handler)
fahmunge.core21.process_core21_clone(*packed_args, delete_on_unpack=args.delete_on_unpack, xml_handling=args.xml_handling, signal_handler=signal_handler)
# Terminate if instructed
if signal_handler.terminate:
print('Signal caught; terminating.')
Expand All @@ -187,7 +187,7 @@ def main():
from multiprocessing import Pool, Event
print("Creating thread pool of %d threads..." % args.nprocesses)
terminate_event = Event()
pool = Pool(args.nprocesses, setup_worker, (terminate_event, args.delete_on_unpack, args.compress_xml))
pool = Pool(args.nprocesses, setup_worker, (terminate_event, args.delete_on_unpack, args.xml_handling))


try:
Expand Down
28 changes: 18 additions & 10 deletions fahmunge/core21.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def list_core21_result_packets(clone_path):

return result_packets

def ensure_result_packet_is_decompressed(result_packet, topology, atom_indices=None, chunksize=10, delete_on_unpack=False, compress_xml=False):
def ensure_result_packet_is_decompressed(result_packet, topology, atom_indices=None, chunksize=10, delete_on_unpack=False, xml_handling='compress'):
"""
Ensure that the specified result packet is decompressed.

Expand All @@ -112,8 +112,9 @@ def ensure_result_packet_is_decompressed(result_packet, topology, atom_indices=N
delete_on_unpack : bool, optional, default=True
If True, will delete old ws8-style .tar.bz2 files after they have been unpacked.
WARNING: THIS COULD BE DANGEROUS
compress_xml : bool, optional, default=False
If True, will compress XML files after unpacking them.
xml_handling : bool, optional, default='compress'
If 'compress', will compress XML files after unpacking them;
if 'delete', will delete XML files after unpacking them.
chunksize : int, optional, default=10
Number of frames to read each call to mdtraj.iterload for verifying trajectory integrity

Expand Down Expand Up @@ -146,12 +147,18 @@ def ensure_result_packet_is_decompressed(result_packet, topology, atom_indices=N
archive = tarfile.open(absfilename, mode='r:bz2')
archive.extractall(path=extracted_archive_directory)

# Compress XML files
if compress_xml:
xml_filenames = glob.glob('%s/*.xml' % extracted_archive_directory)
# Compress or delete XML files
xml_filenames = glob.glob('%s/*.xml' % extracted_archive_directory)
if xml_handling == 'compress':
for filename in xml_filenames:
print(" Compressing %s" % os.path.basename(filename))
subprocess.call(['gzip', filename])
elif xml_handling == 'delete':
for filename in xml_filenames:
print(" Deleting %s" % os.path.basename(filename))
os.remove(filename)
else:
raise Exception('Unknown xml_handling option "%s": Must be "delete" or "compress"' % xml_handling)

# Create new result packet name
new_result_packet = os.path.join(basepath, 'results%d' % frame_number)
Expand Down Expand Up @@ -182,7 +189,7 @@ def ensure_result_packet_is_decompressed(result_packet, topology, atom_indices=N
# Return updated result packet directory name
return new_result_packet

def process_core21_clone(clone_path, topology_filename, processed_trajectory_filename, atom_selection_string, terminate_event=None, delete_on_unpack=False, compress_xml=False, chunksize=10, signal_handler=None):
def process_core21_clone(clone_path, topology_filename, processed_trajectory_filename, atom_selection_string, terminate_event=None, delete_on_unpack=False, xml_handling='compress', chunksize=10, signal_handler=None):
"""
Process core21 result packets in a CLONE, concatenating to a specified trajectory.
This will append to the specified trajectory if it already exists.
Expand All @@ -208,8 +215,9 @@ def process_core21_clone(clone_path, topology_filename, processed_trajectory_fil
delete_on_unpack : bool, optional, default=True
If True, will delete old ws8-style .tar.bz2 files after they have been unpacked.
WARNING: THIS COULD BE DANGEROUS
compress_xml : bool, optional, default=False
If True, will compress XML files after unpacking them.
xml_handling : bool, optional, default='compress'
If 'compress', will compress XML files after unpacking them;
if 'delete', will delete XML files after unpacking them.
chunksize : int, optional, default=10
Chunksize (in number of frames) to use for mdtraj.iterload reading of trajectory
signal_handler : SignalHandler, optional, default=None
Expand Down Expand Up @@ -278,7 +286,7 @@ def process_core21_clone(clone_path, topology_filename, processed_trajectory_fil
continue

# If the result packet is compressed, decompress it and return the new directory name
result_packet = ensure_result_packet_is_decompressed(result_packet, work_unit_topology, delete_on_unpack=delete_on_unpack, compress_xml=compress_xml)
result_packet = ensure_result_packet_is_decompressed(result_packet, work_unit_topology, delete_on_unpack=delete_on_unpack, xml_handling=xml_handling)

# Check that we haven't violated our filename length assumption
if len(result_packet) > MAX_FILEPATH_LENGTH:
Expand Down