-
Notifications
You must be signed in to change notification settings - Fork 250
/
Copy pathcreate_taxonomic_profile_from_blast.py
executable file
·312 lines (232 loc) · 11 KB
/
create_taxonomic_profile_from_blast.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
#!/usr/bin/env python3.2
import argparse
import os
import re
import sqlite3
from collections import OrderedDict
from biocode.utils import read_list_file
def main():
"""This is the second script I've written in Python. I'm sure it shows."""
parser = argparse.ArgumentParser( description='Reads a BLAST m8 file and taxonomy DB to produce a taxonomic profile at any user-specified ranking level.')
## input formats: btab, blast_m8
parser.add_argument('-f', '--input_format', type=str, required=True, help='Blast format: current options are btab or blast_m8' )
## The SQLite3 file that will be read for taxonomy information
parser.add_argument('-t', '--taxonomy_db', type=str, required=True, help='Path to a taxonomy.db file created by "create_taxonomy_db.py"' )
## BLAST list file
parser.add_argument('-b', '--blast_list_file', type=str, required=True, help='List of BLAST files (m8 format)' )
## output file to be written
parser.add_argument('-o', '--output_file', type=str, required=True, help='Path where the result file should written' )
## E-value cutoff to use
parser.add_argument('-e', '--eval_cutoff', type=float, required=False, help='Optional E-value cutoff to use.' )
## Top N hits per query to score. Only counts those where the taxon could be looked up in the indexes
parser.add_argument('-n', '--top_n', type=int, required=False, default=1, help=' Top N hits per query to score. Only counts unique taxon matches which could be looked up in the indexes' )
## rank on which matches will be grouped and reported. values like: species, genus, order, family, etc.
parser.add_argument('-r', '--rank', type=str, required=True, help='Taxonomy rank on which to group all matches, such as: species, genus, order, family, etc.' )
args = parser.parse_args()
conn = sqlite3.connect( args.taxonomy_db )
c = conn.cursor()
blast_files = read_list_file( args.blast_list_file )
taxon_counts = {}
processed_file_count = 0
stats = {}
stats['gi_lookup_success_count'] = 0
stats['gi_lookup_fail_count'] = 0
stats['taxon_lookup_success_count'] = 0
stats['taxon_lookup_failure_count'] = 0
for file in blast_files:
print("Processing file: ", file)
if args.input_format == 'blast_m8' or args.input_format == 'btab':
parse_blast_file( file, c, taxon_counts, args.eval_cutoff, args.input_format, stats, args.top_n )
else:
raise Exception("Unsupported input format passed: {0}".format(args.input_format) )
processed_file_count += 1
#if processed_file_count == 50:
#break
## process the taxon counts, conforming them to the user-specified rank
result_table = group_taxa_by_rank( args.rank, taxon_counts, c )
node_names = get_selected_node_names( result_table, c )
c.close()
fout = open(args.output_file, mode='w')
## write the results to the output file in order of most-found clade first
for tax_id in OrderedDict(sorted(result_table.items(), reverse=True, key=lambda t: t[1])):
sci_name = ''
if tax_id in node_names:
sci_name = node_names[tax_id]
fout.write( "{0}\t{1}\t{2}\n".format(tax_id, int(result_table[tax_id]), sci_name ) )
fout.close()
print("INFO: successful GI lookups: {0}/{1}".format(stats['gi_lookup_success_count'], \
(stats['gi_lookup_fail_count'] + stats['gi_lookup_success_count'])) )
print("INFO: successful taxon lookups: {0}/{1}".format( stats['taxon_lookup_success_count'], \
(stats['taxon_lookup_success_count'] + stats['taxon_lookup_failure_count']) ) )
def get_selected_node_names( res_table, cursor):
node_names = {}
for taxon_id in res_table:
cursor.execute('''SELECT scientific_name FROM orgs WHERE tax_id=?''', (taxon_id,) )
row = cursor.fetchone()
if row:
node_names[taxon_id] = row[0]
else:
print("WARN: failed to get scientific name for tax_id:", taxon_id)
return node_names
def get_ranked_taxon( tax_id, c, rank, rec_depth ):
c.execute("""SELECT parent_tax_id, rank FROM nodes WHERE tax_id = ?""", (tax_id,) )
row = c.fetchone()
if rec_depth > 20:
print("WARN: deep recursion detected for tax ID:", tax_id)
return None
if row:
if row[1] == rank:
return tax_id
else:
return get_ranked_taxon( row[0], c, rank, rec_depth + 1 )
else:
print("WARN: unable to find ranked taxon for tax_id:", tax_id)
return None
def group_taxa_by_rank(rank, counts, cursor):
"""Given a taxonomic rank, the input count table is regrouped by walking
up the taxonomy tree until all nodes are at the level of the passed
rank
"""
ranked_counts = dict()
unranked_taxon_count = 0
for taxon_id in counts:
ranked_taxon_id = get_ranked_taxon(taxon_id, cursor, rank, 0)
if ranked_taxon_id:
if ranked_taxon_id in ranked_counts:
#print("DEBUG: increasing count for taxon: {0} by: {1}".format(taxon_id, counts[taxon_id]['n']) )
ranked_counts[ranked_taxon_id] += counts[taxon_id]['n']
else:
#print("DEBUG: initializing a count for ranked_taxon_id {0} from taxon_id {1}".format(ranked_taxon_id, taxon_id) )
ranked_counts[ranked_taxon_id] = counts[taxon_id]['n']
else:
unranked_taxon_count += 1
return ranked_counts
def parse_blast_file( file, cursor, tax, eval_cutoff, format, stats, hits_per_query ):
""" For each query sequence find the top match above the E-val cutoff (if any)
which has an NCBI taxonomy assignment.
"""
if ( not os.path.isfile(file) ):
raise Exception("Couldn't find file: " + file)
## presets are for ncbi_m8, for which lines should have 12 columns. they are
# 1: Query - The query sequence id
# 2: Subject - The matching subject sequence id
# 3: Percent identity
# 4: alignment length
# 5: mismatches
# 6: gap openings
# 7: q.start
# 8: q.end
# 9: s.start
# 10: s.end
# 11: e-value
# 12: bit score
ID_COLUMN_NUM = 0
SUBJECT_LABEL_COLUMN_NUM = 1
EVAL_COLUMN_NUM = 10
ALIGN_LEN_COLUMN_NUM = 3
BIT_SCORE_COLUMN_NUM = 11
if format == 'btab':
ID_COLUMN_NUM = 0
SUBJECT_LABEL_COLUMN_NUM = 5
EVAL_COLUMN_NUM = 19
current_id = ""
current_id_classified = False
current_id_match_count = 0
current_match_ids = dict()
for line in open(file, "r"):
cols = line.split("\t")
if len(cols) >= 10:
this_id = cols[ID_COLUMN_NUM]
## this controls that we only look at the top hit for query
if this_id != current_id:
current_id_classified = False
current_id_match_ids = dict()
current_id_match_count = 0
current_id = this_id
if current_id_match_count < hits_per_query:
if eval_cutoff is None or eval_cutoff >= float(cols[EVAL_COLUMN_NUM]):
#print("DEBUG: attempting to parse a GI for header: ({0})".format(cols[SUBJECT_LABEL_COLUMN_NUM]) )
gi = parse_gi(cols[SUBJECT_LABEL_COLUMN_NUM], cursor)
if gi:
#print("DEBUG: Got a GI ({0}) for hit with id this_id".format(gi))
stats['gi_lookup_success_count'] += 1
taxon_id = get_taxon_id_by_gi(gi, cursor)
if taxon_id:
stats['taxon_lookup_success_count'] += 1
if taxon_id not in current_match_ids:
#print("DEBUG: adding match to taxon_id: {0}".format(taxon_id) )
match_score = int(cols[BIT_SCORE_COLUMN_NUM])/int(cols[ALIGN_LEN_COLUMN_NUM])
add_taxon_match( taxon_id, tax, cursor, match_score )
current_id_match_count += 1
current_match_ids[taxon_id] = True
else:
stats['taxon_lookup_failure_count'] += 1
print("WARN: failed to find a taxon_id for gi: {0}".format(gi))
else:
stats['gi_lookup_fail_count'] += 1
def add_taxon_match( id, tax, c, score ):
if id in tax:
tax[id]['n'] += score
#print("DEBUG: match count for taxon id {0} increased to {1}".format(id, tax[id]['n']) )
else:
tax[id] = {}
tax[id]['n'] = score
tax[id]['l'] = get_clade_name_by_taxon_id( c, id )
def get_clade_name_by_taxon_id( cursor, taxon_id ):
cursor.execute("""SELECT scientific_name FROM orgs WHERE tax_id = ?""", (taxon_id,) )
row = cursor.fetchone()
if row:
return row[0]
else:
return None
def get_taxon_id_by_gi( gi, cursor ):
"""Attempts to fetch a taxon ID using the GI from first the nucl_acc and then prot_acc tables"""
taxon_id = None
cursor.execute( 'SELECT tax_id FROM nucl_acc WHERE gi = ?', (gi,) )
row = cursor.fetchone()
if row:
taxon_id = row[0]
else:
cursor.execute( 'SELECT tax_id FROM prot_acc WHERE gi = ?', (gi,) )
row = cursor.fetchone()
if row:
taxon_id = row[0]
return taxon_id
def get_gi_by_accession( acc, cursor ):
## The table we query depends on the source
# Protein queries have a three-letter prefix, followed by five digits.
m = re.match( '^[A-Z]{3}[0-9]{5}\.\d', acc )
## CURRENT HACK, because of a db index bug the version number was indexed with
# the period character removed.
acc = acc.replace('.', '')
if m:
cursor.execute("""SELECT gi FROM prot_acc WHERE version = ?""", (acc,) )
row = cursor.fetchone()
else:
cursor.execute("""SELECT gi FROM nucl_acc WHERE version = ?""", (acc,) )
row = cursor.fetchone()
if row:
return row[0]
else:
return None
def parse_gi( match_header, cursor ):
"""Parses the GI number out of a NCBI-formatted BLAST match header string. If the
header has an accession instead of a GI, it performs a lookup
"""
## look for a GI first
#print("DEBUG: looking for a gi in header: ({0})".format(match_header))
m = re.match( 'gi\|(\d+)', match_header )
gi = None
accession = None
if m:
gi = m.group(1)
else:
## a direct GI pull failed, so look for something that resembles an accession instead
m = re.match( '[a-z]+\|([A-Z]{2,}\d{5,}\.\d)', match_header )
if m:
accession = m.group(1)
gi = get_gi_by_accession( accession, cursor )
#print("DEBUG:\treturning GI ({0}) for header ({1}), accession ({2})".format( gi, match_header, accession ) )
return gi
if __name__ == '__main__':
main()