Skip to content

Commit

Permalink
Merge pull request #640 from macs3-project/feat/macs3/memory_consumption
Browse files Browse the repository at this point in the history
[Feat/macs3/memory consumption] To address the memory usage issue of `hmmratac`
  • Loading branch information
taoliu authored Apr 26, 2024
2 parents 880ab61 + 38dc371 commit 1d6c38e
Show file tree
Hide file tree
Showing 12 changed files with 204,377 additions and 204,286 deletions.
188 changes: 118 additions & 70 deletions MACS3/Commands/hmmratac_cmd.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Time-stamp: <2023-08-02 17:32:47 Tao Liu>
# Time-stamp: <2024-04-26 11:47:02 Tao Liu>

"""Description: Main HMMR command
Expand All @@ -16,6 +16,8 @@
import gc
import numpy as np
import json
import csv
import tempfile
from hmmlearn import hmm
#from typing import Sized

Expand Down Expand Up @@ -176,7 +178,7 @@ def run( args ):
options.info( f"# Pile up ONLY short fragments" )
fc_bdg = digested_atac_signals[ 0 ]
(sum_v, n_v, max_v, min_v, mean_v, std_v) = fc_bdg.summary()
print(sum_v, n_v, max_v, min_v, mean_v, std_v)
#print(sum_v, n_v, max_v, min_v, mean_v, std_v)
options.info( f"# Convert pileup to fold-change over average signal" )
fc_bdg.apply_func(lambda x: x/mean_v)
else:
Expand All @@ -195,7 +197,7 @@ def run( args ):

# Let MACS3 do the cutoff analysis to help decide the lower and upper cutoffs
with open(cutoffanalysis_file, "w") as ofhd_cutoff:
ofhd_cutoff.write( fc_bdg.cutoff_analysis( min_length=minlen, max_gap=options.hmm_training_flanking, max_score = 1000 ) )
ofhd_cutoff.write( fc_bdg.cutoff_analysis( min_length=minlen, max_gap=options.hmm_training_flanking, max_score = 100 ) )
#raise Exception("Cutoff analysis only.")
sys.exit(1)

Expand Down Expand Up @@ -242,7 +244,7 @@ def run( args ):

# Let MACS3 do the cutoff analysis to help decide the lower and upper cutoffs
with open(cutoffanalysis_file, "w") as ofhd_cutoff:
ofhd_cutoff.write( fc_bdg.cutoff_analysis( min_length=minlen, max_gap=options.hmm_training_flanking, max_score = 1000 ) )
ofhd_cutoff.write( fc_bdg.cutoff_analysis( min_length=minlen, max_gap=options.hmm_training_flanking, max_score = 100 ) )

# we will check if anything left after filtering
if peaks.total > options.hmm_maxTrain:
Expand Down Expand Up @@ -377,11 +379,12 @@ def run( args ):
options.info( f"# We expand the candidate regions with {options.hmm_training_flanking} and merge overlap" )
candidate_regions.expand( options.hmm_training_flanking )
candidate_regions.merge_overlap()
options.info( f"# after expanding and merging, we have {candidate_regions.total} candidate regions" )

# remove peaks overlapping with blacklisted regions
if options.blacklist:
candidate_regions.exclude( blacklist_regions )
options.info( f"# after removing those overlapping with provided blacklisted regions, we have {candidate_regions.total} left" )
options.info( f"# after removing those overlapping with provided blacklisted regions, we have {candidate_regions.total} left" )

# extract signals
options.info( f"# Extract signals in candidate regions and decode with HMM")
Expand All @@ -392,21 +395,39 @@ def run( args ):
# Note: we implement in a way that we will decode the candidate regions 10000 regions at a time so 1. we can make it running in parallel in the future; 2. we can reduce the memory usage.
options.info( f"# Use HMM to predict states")
n = 0
predicted_proba = []
candidate_bins = []
#predicted_proba_file_name = "predicted_proba.csv"
#predicted_proba_file = open(predicted_proba_file_name, "wb")
predicted_proba_file = tempfile.TemporaryFile(mode="w+b")
# predicted_proba_file = open("predicted_proba.csv", "w+b")

while candidate_regions.total != 0:
n += 1
cr = candidate_regions.pop( options.decoding_steps )
options.info( "# decoding %d..." % ( n*options.decoding_steps ) )
[ cr_bins, cr_data, cr_data_lengths ] = extract_signals_from_regions( digested_atac_signals, cr, binsize = options.hmm_binsize, hmm_type = options.hmm_type )
#options.info( "# extracted signals in the candidate regions")
candidate_bins.extend( cr_bins )
#options.info( "# saving information regarding the candidate regions")
predicted_proba.extend( hmm_predict( cr_data, cr_data_lengths, hmm_model ) )
#options.info( "# prediction done")
# we get DECODING_STEPS number of candidate regions first
cr = candidate_regions.pop(options.decoding_steps)
options.info("# decoding %d..." % (n * options.decoding_steps))

# then extrac data from digested signals, create cr_bins, cr_data, and cr_data_lengths
[cr_bins, cr_data, cr_data_lengths] = extract_signals_from_regions( digested_atac_signals, cr, binsize = options.hmm_binsize, hmm_type = options.hmm_type )
#options.debug( "# extract_signals_from_regions complete")

prob_data = hmm_predict(cr_data, cr_data_lengths, hmm_model)
assert len(prob_data) == len(cr_bins)
for i in range(len(prob_data)):
predicted_proba_file.write(b"%s,%d" % cr_bins[i])
predicted_proba_file.write(b",%f,%f,%f\n" % tuple(prob_data[i]))

cr_data = []
cr_data_lengths = []
cr_bins = []
prob_data = []

#options.debug( "# clean up complete")
gc.collect()


#predicted_proba_file.close()
predicted_proba_file.seek(0) # reset
options.info( f"# predicted_proba files written...")

#############################################
# 6. Output - add to OutputWriter
#############################################
Expand All @@ -424,62 +445,68 @@ def run( args ):
open_state_bdg_fhd = open( open_state_bdgfile, "w" )
nuc_state_bdg_fhd = open( nuc_state_bdgfile, "w" )
bg_state_bdg_fhd = open( bg_state_bdgfile, "w" )
save_proba_to_bedGraph( candidate_bins, predicted_proba, options.hmm_binsize, open_state_bdg_fhd, nuc_state_bdg_fhd, bg_state_bdg_fhd, i_open_region, i_nucleosomal_region, i_background_region )
save_proba_to_bedGraph( predicted_proba_file, options.hmm_binsize, open_state_bdg_fhd, nuc_state_bdg_fhd, bg_state_bdg_fhd, i_open_region, i_nucleosomal_region, i_background_region )
predicted_proba_file.seek(0) # reset
open_state_bdg_fhd.close()
nuc_state_bdg_fhd.close()
bg_state_bdg_fhd.close()
options.info( f"# finished writing proba_to_bedgraph")

# Generate states path:
states_path = generate_states_path( candidate_bins, predicted_proba, options.hmm_binsize, i_open_region, i_nucleosomal_region, i_background_region )

# # Generate states path:
states_path = generate_states_path( predicted_proba_file, options.hmm_binsize, i_open_region, i_nucleosomal_region, i_background_region )
options.info( f"# finished generating states path")
predicted_proba_file.close() #kill the tempfile
# Save states path if needed
# PS: we need to implement extra feature to include those regions NOT in candidate_bins and assign them as 'background state'.
if options.save_states:
options.info( f"# Write states assignments in a BED file: {options.name}_states.bed" )
f = open( states_file, "w" )
save_states_bed( states_path, f )
f.close()
with open( states_file, "w" ) as f:
save_states_bed( states_path, f )

options.info( f"# Write accessible regions in a gappedPeak file: {options.name}_accessible_regions.gappedPeak")
ofhd = open( accessible_file, "w" )
save_accessible_regions( states_path, ofhd, options.openregion_minlen )
ofhd.close()
with open( accessible_file, "w" ) as ofhd:
save_accessible_regions( states_path, ofhd, options.openregion_minlen )

options.info( f"# Finished")

def save_proba_to_bedGraph( candidate_bins, predicted_proba, binsize, open_state_bdg_file, nuc_state_bdg_file, bg_state_bdg_file, i_open, i_nuc, i_bg ):
def save_proba_to_bedGraph( predicted_proba_file, binsize, open_state_bdg_file, nuc_state_bdg_file, bg_state_bdg_file, i_open, i_nuc, i_bg ):
open_state_bdg = bedGraphTrackI( baseline_value = 0 )
nuc_state_bdg = bedGraphTrackI( baseline_value = 0 )
bg_state_bdg = bedGraphTrackI( baseline_value = 0 )

prev_chrom_name = None
prev_bin_end = None
for l in range(len(predicted_proba)):
# note that any region not in the candidate bins will be
# treated as absolutely the background
chrname = candidate_bins[l][0]
end_pos = candidate_bins[l][1]

for pp_line in predicted_proba_file:
pp_data = pp_line.strip().split(b',')

chrname = pp_data[0]
end_pos = int(pp_data[1])
start_pos = end_pos - binsize
pp_open = float(pp_data[i_open+2])
pp_nuc = float(pp_data[i_nuc+2])
pp_bg = float(pp_data[i_bg+2])

if chrname != prev_chrom_name:
prev_chrom_name = chrname
# add the first region as background
# we start a new chromosome
if start_pos > 0:
# add the first unannotated region as background
open_state_bdg.add_loc( chrname, 0, start_pos, 0.0 )
nuc_state_bdg.add_loc( chrname, 0, start_pos, 0.0 )
bg_state_bdg.add_loc( chrname, 0, start_pos, 1.0 )
prev_bin_end = start_pos
elif start_pos == 0:
# if start_pos == 0, then the first bin has to be assigned, we set prev_bin_end as 0
prev_bin_end = 0
# now check if the prev_bin_end is start_pos, if not, add a gap of background
if prev_bin_end < start_pos:
open_state_bdg.add_loc( chrname, prev_bin_end, start_pos, 0.0 )
nuc_state_bdg.add_loc( chrname, prev_bin_end, start_pos, 0.0 )
bg_state_bdg.add_loc( chrname, prev_bin_end, start_pos, 1.0 )

open_state_bdg.add_loc( chrname, start_pos, end_pos, predicted_proba[l][ i_open ] )
nuc_state_bdg.add_loc( chrname, start_pos, end_pos, predicted_proba[l][ i_nuc ] )
bg_state_bdg.add_loc( chrname, start_pos, end_pos, predicted_proba[l][ i_bg ] )
prev_bin_end = start_pos

prev_chrom_name = chrname
else:
# now check if the prev_bin_end is start_pos, if not, add a gap of background
if prev_bin_end < start_pos:
open_state_bdg.add_loc( chrname, prev_bin_end, start_pos, 0.0 )
nuc_state_bdg.add_loc( chrname, prev_bin_end, start_pos, 0.0 )
bg_state_bdg.add_loc( chrname, prev_bin_end, start_pos, 1.0 )

open_state_bdg.add_loc( chrname, start_pos, end_pos, pp_open )
nuc_state_bdg.add_loc( chrname, start_pos, end_pos, pp_nuc )
bg_state_bdg.add_loc( chrname, start_pos, end_pos, pp_bg )
prev_bin_end = end_pos

open_state_bdg.write_bedGraph( open_state_bdg_file, "Open States", "Likelihoods of being Open States" )
nuc_state_bdg.write_bedGraph( nuc_state_bdg_file, "Nucleosomal States", "Likelihoods of being Nucleosomal States" )
bg_state_bdg.write_bedGraph( bg_state_bdg_file, "Background States", "Likelihoods of being Background States" )
Expand All @@ -489,31 +516,52 @@ def save_states_bed( states_path, states_bedfile ):
# we do not need to output background state.
for l in range( len( states_path ) ):
if states_path[l][3] != "bg":
states_bedfile.write( "%s\t%s\t%s\t%s\n" % states_path[l] )
states_bedfile.write( "%s\t" % states_path[l][0].decode() )
states_bedfile.write( "%d\t%d\t%s\n" % states_path[l][1:] )
return

def generate_states_path(candidate_bins, predicted_proba, binsize, i_open_region, i_nucleosomal_region, i_background_region):
def generate_states_path(predicted_proba_file, binsize, i_open, i_nuc, i_bg):
ret_states_path = []
labels_list = ["open", "nuc", "bg"]
start_pos = candidate_bins[0][1] - binsize
for l in range(1, len(predicted_proba)):
chromosome = candidate_bins[l][0].decode()
prev_open, prev_nuc, prev_bg = predicted_proba[l-1][i_open_region], predicted_proba[l-1][i_nucleosomal_region], predicted_proba[l-1][i_background_region]
curr_open, curr_nuc, curr_bg = predicted_proba[l][i_open_region], predicted_proba[l][i_nucleosomal_region], predicted_proba[l][i_background_region]

prev_chrom_name = None
prev_bin_end = None
prev_label = None

for pp_line in predicted_proba_file:
pp_data = pp_line.strip().split(b',')

chrname = pp_data[0]
end_pos = int(pp_data[1])
start_pos = end_pos - binsize
pp_open = float(pp_data[i_open+2])
pp_nuc = float(pp_data[i_nuc+2])
pp_bg = float(pp_data[i_bg+2])

# find the best state as label
label = labels_list[max((pp_open, 0), (pp_nuc, 1), (pp_bg, 2), key=lambda x: x[0])[1]]

label_prev = labels_list[max((prev_open, 0), (prev_nuc, 1), (prev_bg, 2), key=lambda x: x[0])[1]]
label_curr = labels_list[max((curr_open, 0), (curr_nuc, 1), (curr_bg, 2), key=lambda x: x[0])[1]]

if candidate_bins[l-1][0] == candidate_bins[l][0]: # if we are looking at the same chromosome ...
if label_prev != label_curr:
end_pos = candidate_bins[l-1][1]
ret_states_path.append((chromosome, start_pos, end_pos, label_prev))
start_pos = candidate_bins[l][1] - binsize
elif l == len(predicted_proba) - 1:
end_pos = candidate_bins[l][1]
ret_states_path.append((chromosome, start_pos, end_pos, label_prev))
if chrname != prev_chrom_name:
# we start a new chromosome
if start_pos > 0:
# add the first unannotated region as background
ret_states_path.append((chrname, 0, start_pos, "bg"))
ret_states_path.append((chrname, start_pos, end_pos, label))
prev_label = label
prev_chrom_name = chrname
else:
start_pos = candidate_bins[l][1] - binsize
# now check if the prev_bin_end is start_pos, if not, add a gap of background
if prev_bin_end < start_pos:
ret_states_path.append((chrname, prev_bin_end, start_pos, "bg"))
prev_label = "bg"
# same label, just extend
if label == prev_label:
ret_states_path[-1] = (ret_states_path[-1][0], ret_states_path[-1][1], end_pos, label)
else:
ret_states_path.append((chrname, start_pos, end_pos, label))
prev_label = label

prev_bin_end = end_pos
return ret_states_path

def save_accessible_regions(states_path, accessible_region_file, openregion_minlen):
Expand All @@ -532,7 +580,6 @@ def add_regions(i, regions):
states_path[i][2] == states_path[i+1][1] and states_path[i+1][2] == states_path[i+2][1] and
states_path[i+1][2] - states_path[i+1][1] > openregion_minlen):
accessible_regions = add_regions(i, accessible_regions)

# Group states by region
list_of_groups = []
one_group = [accessible_regions[0]]
Expand All @@ -550,7 +597,8 @@ def add_regions(i, regions):
block_num = sum('open' in tup for tup in region)
block_sizes = ','.join(str(region[j][2] - region[j][1]) for j in range(1, len(region) - 1, 2))
block_starts = ','.join(str(region[j][1] - region[0][1]) for j in range(1, len(region) - 1, 2))
broadpeak.add(bytes(region[1][0], encoding="raw_unicode_escape"), region[0][1], region[-1][2],
broadpeak.add(region[1][0], region[0][1], region[-1][2],
#bytes(region[1][0], encoding="raw_unicode_escape"), region[0][1], region[-1][2],
thickStart=bytes(str(region[1][1]), encoding="raw_unicode_escape"),
thickEnd=bytes(str(region[-2][2]), encoding="raw_unicode_escape"),
blockNum=block_num,
Expand Down
11 changes: 9 additions & 2 deletions MACS3/Signal/HMMR_Signal_Processing.pyx
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# cython: language_level=3
# cython: profile=True
# Time-stamp: <2023-07-28 12:14:57 Tao Liu>
# Time-stamp: <2024-04-26 10:40:41 Tao Liu>

"""Module description:
Expand Down Expand Up @@ -146,6 +146,7 @@ cpdef list extract_signals_from_regions( list signals, object regions, int binsi
list ret_training_data, ret_training_lengths, ret_training_bins

regionsbdg = _make_bdg_of_bins_from_regions( regions, binsize )
debug('# extract_signals_from_regions: regionsbdg completed')
# now, let's overlap
extracted_positions = []
extracted_data = []
Expand All @@ -156,10 +157,14 @@ cpdef list extract_signals_from_regions( list signals, object regions, int binsi
extracted_positions.append( positions )
extracted_data.append( values )
extracted_len.append( lengths )
positions = []
values = []
lengths = []
debug('# extract_signals_from_regions: extracted positions, data, len')
ret_training_bins = []
ret_training_data = []
ret_training_lengths = []
c = 0

nn = len( extracted_data[0] )
assert nn > 0
assert nn == len( extracted_data[1] )
Expand All @@ -182,6 +187,7 @@ cpdef list extract_signals_from_regions( list signals, object regions, int binsi
counter = 0
prev_c = c
counter += 1
debug('# extract_signals_from_regions: ret_training bins, data, lengths - gaussian')
#poisson can only take int values as input
if hmm_type == "poisson":
for i in range( nn ):
Expand All @@ -197,6 +203,7 @@ cpdef list extract_signals_from_regions( list signals, object regions, int binsi
counter = 0
prev_c = c
counter += 1
debug('# extract_signals_from_regions: ret_training bins, data, lengths - poisson')
# last region
ret_training_lengths.append( counter )
assert sum(ret_training_lengths) == len(ret_training_data)
Expand Down
Loading

0 comments on commit 1d6c38e

Please sign in to comment.