-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmean-qual.py
executable file
·257 lines (197 loc) · 6.9 KB
/
mean-qual.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# ----------------------------------------------------------------------
# This script calculates total mean quality of reads in fasrq file(s).
# Script writes it's output to tab-separated file.
# ----------------------------------------------------------------------
__version__ = "1.0.a"
# Year, month, day
__last_update_date__ = "2020-04-13"
# Check python interpreter version
import sys
if sys.version_info.major < 3:
print( "\nYour python interpreter version is " + "%d.%d" % (sys.version_info.major,
sys.version_info.minor) )
print(" Please, use Python 3.\a")
# In python 2 'raw_input' does the same thing as 'input' in python 3.
# Neither does 'input' in python2.
if sys.platform.startswith("win"):
raw_input("Press ENTER to exit:")
# end if
sys.exit(1)
# end if
import os
from re import search as re_search
def platf_depend_exit(exit_code=0):
"""
Function asks to press ENTER press on Windows
and exits after that.
:type exit_code: int;
"""
if sys.platform.startswith("win"):
input("Press ENTER to exit:")
# end if
sys.exit(exit_code)
# end def platf_depend_exit
def print_help():
print("\nScript 'mean-qual.py' calculates mean quality of reads in fastq file(s).\n")
print("Version {}; {} edition.".format(__version__, __last_update_date__))
print("\nUsage:")
print(" python3 mean-qual.py first.fastq second.fastq.gz third.fq.gz")
print("Following command will process all *.fastq(.gz) and *.fq(.gz) files in the working directory:")
print(" python3 mean-qual.py")
print("\nOptions:")
print(" -h (--help): print help message.")
print(" -v (--version): print version.")
print(" -o (--outfile): output file. Default: './mean-qual_result.tsv'.")
print(" -p (--phred-offset): Phred offset (33 of 64). Default: 33.")
# end if
# Firstly check for information-providing flags
if "-h" in sys.argv[1:] or "--help" in sys.argv[1:]:
print_help()
platf_depend_exit()
# end if
if "-v" in sys.argv[1:] or "--version" in sys.argv[1:]:
print(__version__)
platf_depend_exit()
# end if
is_fastq = lambda f: False if re_search(r".*\.f(ast)?q(\.gz)?$", f) is None else True
# Handle command line arguments
import getopt
try:
opts, args = getopt.gnu_getopt(sys.argv[1:], "p:o:",
["phred-offset=", "outfile="])
except getopt.GetoptError as gerr:
print( str(gerr) )
platf_depend_exit(2)
# end try
fpaths = args
phred_offset = 33
outfpath = "mean-qual_result.tsv"
write_head = True
for opt, arg in opts:
if opt in ("-p", "--phred-offset"):
try:
phred_offset = int(arg)
if phred_offset != 33 and phred_offset != 64:
raise ValueError
# end if
except ValueError:
print("Invalid Phred offset specified: {}".format(arg))
print("33 and 64 are available")
platf_depend_exit(1)
# end try
elif opt in ("-o", "--outfile"):
outfpath = arg
# end if
# end for
# If no input files are specified -- process all fastq files in the working directory
if len(fpaths) == 0:
fpaths = tuple( filter(is_fastq, os.listdir('.')) )
if len(fpaths) == 0:
print_help()
platf_depend_exit(1)
# end if
# If we process files in working directory,
# their exstance and extention are already checked
else:
# Check if all files exist
if len(tuple(filter(os.path.exists, fpaths))) != len(fpaths):
print("Following files do not exist:")
for fpath in filter(lambda f: not os.path.exists(f), fpaths):
print(fpath)
# end for
platf_depend_exit(1)
# end if
# Check if all files specified in command line are fastq files (just check extention)
if len(tuple(filter(is_fastq, fpaths))) != len(fpaths):
print("Following files are probably not FASTQ files:")
for fpath in filter(lambda f: not os.path.exists(f), fpaths):
print(fpath)
# end for
print("If they are actually are -- change the extention to .fastq(.gz) or .fq(.gz), please.")
platf_depend_exit(1)
# end if
# end if
# If output file wxists -- ask for ovewriting permissins
if os.path.exists(outfpath):
error = True
while error:
reply = input("File '{}' exists. Overwrite? [y/n]:".format(outfpath))
if reply.lower() == 'y' or reply == '':
print("Data in file '{}' will be overwritten.\n".format(outfpath))
error = False
elif reply.lower() == 'n':
print("Appending current result to '{}'.\n".format(outfpath))
error = False
write_head = False
else:
print("Invalid reply: '{}'".format(reply))
# end if
# end while
# end if
# Write head of the table and (maybe) overwrite old data
if write_head:
with open(outfpath, 'w') as outfile:
outfile.write('\t'.join( ("File",
"Number of reads",
"Mean quality (Q)",
"Accuracy (%)") ) +
'\n')
# end with
# end if
from math import log
from gzip import open as open_as_gzip
# Function for getting Q value from Phred character:
substr_phred = lambda q_symb: ord(q_symb) - phred_offset
# List of propabilities corresponding to indices (index is Q, is the propability):
q2p_map = [10 ** (-q/10) for q in range(128)] # 127 -- max value of a signed byte
# Function for accessing propabilities by Q:
qual2prop = lambda q: q2p_map[q]
# Function for accessing Q by propability:
prop2qual = lambda p: round(-10 * log(p, 10), 2)
props = list()
for fpath in fpaths:
print("Processing file '{}'...".format(os.path.basename(fpath)))
if fpath.endswith(".gz"):
open_func = open_as_gzip
fmt_func = lambda l: l.decode("utf-8").strip()
else:
open_func = open
fmt_func = lambda l: l.strip()
# end if
read_counter = 0
with open_func(fpath) as fq_file:
i = 0
for line in fq_file:
if i == 3:
p = map(substr_phred, fmt_func(line))
p = map(qual2prop, p)
props.extend(tuple(p))
read_counter += 1
i = 0
else:
i += 1
# end if
# end for
# end with
lenpr = len(props)
sum_props = sum(props)
props = list()
avg_prop = sum_props / lenpr
accuracy = round((lenpr - sum_props) / lenpr * 100, 2)
mean_q = prop2qual(avg_prop)
print("\n{:,} reads".format(read_counter).replace(',', ' '))
print("Mean quality is {}".format(mean_q))
print("I.e. accuracy is {}%".format(accuracy))
print('=' * 20 + '\n')
with open(outfpath, 'a') as outfile:
outfile.write( '\t'.join( (os.path.basename(fpath),
str(read_counter),
str(mean_q),
str(accuracy)) ) +
'\n' )
# end with
# end for
print("Completed!")
platf_depend_exit()