Skip to content

Commit

Permalink
counter and control_window_length are now computed in advance of _run…
Browse files Browse the repository at this point in the history
…_one_window()
  • Loading branch information
LaraFuhrmann committed Sep 8, 2023
1 parent 76cfd15 commit b2f8e52
Showing 1 changed file with 56 additions and 11 deletions.
67 changes: 56 additions & 11 deletions shorah/b2w.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ def _build_one_full_read(full_read: list[str], full_qualities: list[int]|list[st
return full_read, full_qualities # TODO return same data type twice


def _run_one_window(samfile, window_start, reference_name, window_length,
def _run_one_window(samfile, window_start, reference_name, window_length,control_window_length,
minimum_overlap, permitted_reads_per_location, counter,
exact_conformance_fix_0_1_basing_in_reads, indel_map, max_ins_at_pos,
extended_window_mode, exclude_non_var_pos_threshold):
Expand All @@ -144,11 +144,16 @@ def _run_one_window(samfile, window_start, reference_name, window_length,
window_start + window_length # arg exclusive as per pysam convention
)

# TODO: minimum_overlap
# TODO: original_window_length
# TODO: window_length
original_window_length = window_length
window_length = control_window_length
if extended_window_mode:
for pos, val in max_ins_at_pos.items():
if window_start <= pos < window_start + original_window_length:
window_length += val
# this is now done intilaly for all windows
#for pos, val in max_ins_at_pos.items():
# if window_start <= pos < window_start + original_window_length:
# window_length += val
minimum_overlap *= window_length/original_window_length

if exclude_non_var_pos_threshold > 0:
Expand Down Expand Up @@ -277,7 +282,8 @@ def _run_one_window(samfile, window_start, reference_name, window_length,
# TODO move out of this function
convert_to_printed_fmt = lambda x: [f'>{k[0]} {k[1]}\n{"".join(k[2])}' for k in x]

return convert_to_printed_fmt(arr), arr_read_qualities_summary, arr_read_summary, counter, window_length, pos_filter
# return convert_to_printed_fmt(arr), arr_read_qualities_summary, arr_read_summary, counter, window_length, pos_filter
return convert_to_printed_fmt(arr), arr_read_qualities_summary, arr_read_summary, pos_filter


def parallel_run_one_window(
Expand All @@ -288,6 +294,7 @@ def parallel_run_one_window(
idx,
window_start,
window_length,
control_window_length,
alignment_file,
reference_name,
win_min_ext,
Expand Down Expand Up @@ -315,12 +322,15 @@ def parallel_run_one_window(

logging.info(f"Working on window (1-based) @ {window_start+1}")

# (arr, arr_read_qualities_summary, arr_read_summary,
# counter, control_window_length, pos_filter) = _run_one_window(
(arr, arr_read_qualities_summary, arr_read_summary,
counter, control_window_length, pos_filter) = _run_one_window(
pos_filter) = _run_one_window(
samfile,
window_start - 1, # make 0 based
reference_name,
window_length,
control_window_length,
math.floor(win_min_ext * window_length),
dict(permitted_reads_per_location), # copys dict ("pass by value")
counter,
Expand Down Expand Up @@ -456,6 +466,32 @@ def parallel_run_one_window(
)
_write_to_file([line], f"coverage_{idx}.txt")



def update_tiling(tiling, extended_window_mode, max_ins_at_pos):
"""
input tiling:
return: tiling = [
(window_start, original_window_length, control_window_length, counter)
for each window
]
"""
update_tiling = []

for idx, (window_start, window_length) in enumerate(tiling):
original_window_length = window_length
if extended_window_mode:
for pos, val in max_ins_at_pos.items():
if window_start <= pos < window_start + original_window_length:
window_length += val
update_tiling.append((window_start,original_window_length, window_length))
else:
update_tiling.append((window_start,original_window_length, window_length))

return update_tiling


def build_windows(alignment_file: str, tiling_strategy: TilingStrategy,
win_min_ext: float, maximum_reads: int, minimum_reads: int,
reference_filename: str,
Expand Down Expand Up @@ -508,7 +544,7 @@ def build_windows(alignment_file: str, tiling_strategy: TilingStrategy,
threads=max_proc #1
)
#reffile = pysam.FastaFile(reference_filename) --> we need to read it in each child process
counter = 0
#counter = 0 #--> counter is now coputed initially for all windows
reference_name = tiling_strategy.get_reference_name()
tiling = tiling_strategy.get_window_tilings()
region_end = tiling_strategy.get_region_end()
Expand All @@ -519,8 +555,15 @@ def build_windows(alignment_file: str, tiling_strategy: TilingStrategy,
maximum_reads
)

tiling = update_tiling(tiling, extended_window_mode, max_ins_at_pos)

# generate counter for each window
# counter = window_start - 1 + control_window_length, # make 0 based
counter_list = [0] + [window_start- 1 + control_window_length for (window_start, window_length, control_window_length) in tiling]

all_processes = []
for idx, (window_start, window_length) in enumerate(tiling):
for idx, (window_start, window_length, control_window_length) in enumerate(tiling):
counter = counter_list[idx]
p = Process(
target=parallel_run_one_window,
args=(
Expand All @@ -531,6 +574,7 @@ def build_windows(alignment_file: str, tiling_strategy: TilingStrategy,
idx,
window_start,
window_length,
control_window_length,
alignment_file,
reference_name,
win_min_ext,
Expand All @@ -543,12 +587,13 @@ def build_windows(alignment_file: str, tiling_strategy: TilingStrategy,
exclude_non_var_pos_threshold,
)
)
p.start()
all_processes.append(p)

for p in all_processes:
p.join()
print("here after join")
p.run()

#for p in all_processes:
# p.join()

samfile.close()

Expand Down

0 comments on commit b2f8e52

Please sign in to comment.