-
Notifications
You must be signed in to change notification settings - Fork 1
/
pyBAPS.py
118 lines (97 loc) · 3.44 KB
/
pyBAPS.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
from itertools import groupby
from numpy import zeros,argsort,array,unique
from collections import Counter
def count_fasta(fasta_name):
"""
count number of sequences, and length of sequence
:return n_seq, len_seq
"""
n_seq = 0
with open(fasta_name, "r") as fh:
for k, x in groupby(fh, lambda line: line[0] == ">"):
# header
if k:
n_seq += 1
else:
if n_seq==1:
len_seq = sum([len(s.strip()) for s in x])
else:
tmplen = sum([len(s.strip()) for s in x])
assert len_seq==tmplen, print("Length of %dth input sequences are not the same as previous.\n" % n_seq)
return n_seq, len_seq
def fasta_iter(fasta_name):
"""
read fasta file, one entry at a time
:return hed, seq_int, an iterator over fasta entries,
"""
hed = ''
with open(fasta_name, "r") as fh:
for k, x in groupby(fh, lambda line: line[0] == ">"):
# header
if k:
hed = list(x)[0][1:].strip()
else:
seq = "".join([s.strip() for s in x])
yield (hed, seq2int(seq))
def seq2int(seq):
"""
transform DNA sequences to int array
"""
base = {'A': 1, 'C': 2, 'G': 4, 'T': 8, 'a': 1, 'c': 2, 'g': 4, 't': 8}
arr = zeros(len(seq), dtype='uint8')
for i, tb in enumerate(seq):
if tb in base:
arr[i] = base[tb]
return arr
def read_fasta(fasta_name):
"""
:param fasta_name: name of input fasta file
:return headers, seq_aln: headers of sequences, sequence alignment in numpy array
"""
nseq, len_seq = count_fasta(fasta_name)
seq_aln = zeros((nseq,len_seq), dtype='uint8')
headers = list()
for (i, x) in enumerate(fasta_iter(filename)):
hed, seq_int = x
headers.append(hed)
seq_aln[i:]=seq_int
return headers,seq_aln
def group_aln(seq_aln, partition):
"""
group alignment matrix into count matrix according to partition
:param seq_aln, alignment matrix, nseq x nloci
partition, nseq x 1
:return: count matrix, ngroup x nloci x 4
"""
base_key = [1,2,4,8]
partition = array(partition)
# assert that the partition is from 0 to n-1
unipart = unique(partition)
assert unipart[0]==0, "group partition should be from 0 to %d, unipart[0]=%d" % (len(unipart)-1, unipart[0])
assert unipart[-1]==len(unipart)-1, "group partition should be from 0 to %d, unipart[-1]=%d" % (len(unipart)-1, unipart[-1])
inds = argsort(partition)
n_group = len(set(partition))
nseq, nloci = seq_aln.shape
cnt_aln = zeros((n_group, nloci, 4), dtype='uint8')
offset=0
for k,g in groupby(partition[inds]):
tmplen = sum([1 for _ in g])
tmpinds = inds[offset:offset+tmplen]
# count seq_aln into cnt_aln
for j in range(nloci):
tmpc = Counter(seq_aln[tmpinds,j])
for bi,bk in enumerate(base_key):
cnt_aln[k,j,bi] = tmpc[bk]
offset += tmplen
return cnt_aln
if __name__ == "__main__":
# execute only if run as a script
filename = 'sample.fa'
heds,aln_mat = read_fasta(filename)
grp_aln = group_aln(aln_mat,[0,1,1,1])
print(heds)
print(aln_mat)
print(grp_aln)
#print(count_fasta(filename))
#for hed, seq in fasta_iter(filename):
# print(hed,seq)