Coverage for PanACoTA/utils_argparse.py: 99%

155 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-20 14:37 +0000

1#!/usr/bin/env python3 

2# coding: utf-8 

3 

4# ############################################################################### 

5# This file is part of PanACOTA. # 

6# # 

7# Authors: Amandine Perrin # 

8# Copyright © 2018-2020 Institut Pasteur (Paris). # 

9# See the COPYRIGHT file for details. # 

10# # 

11# PanACOTA is a software providing tools for large scale bacterial comparative # 

12# genomics. From a set of complete and/or draft genomes, you can: # 

13# - Do a quality control of your strains, to eliminate poor quality # 

14# genomes, which would not give any information for the comparative study # 

15# - Uniformly annotate all genomes # 

16# - Do a Pan-genome # 

17# - Do a Core or Persistent genome # 

18# - Align all Core/Persistent families # 

19# - Infer a phylogenetic tree from the Core/Persistent families # 

20# # 

21# PanACOTA is free software: you can redistribute it and/or modify it under the # 

22# terms of the Affero GNU General Public License as published by the Free # 

23# Software Foundation, either version 3 of the License, or (at your option) # 

24# any later version. # 

25# # 

26# PanACOTA is distributed in the hope that it will be useful, but WITHOUT ANY # 

27# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # 

28# FOR A PARTICULAR PURPOSE. See the Affero GNU General Public License # 

29# for more details. # 

30# # 

31# You should have received a copy of the Affero GNU General Public License # 

32# along with PanACOTA (COPYING file). # 

33# If not, see <https://www.gnu.org/licenses/>. # 

34# ############################################################################### 

35 

36""" 

37Functions to check argparse aguments given by user 

38 

39 

40@author gem 

41April 2017 

42""" 

43from PanACoTA import utils 

44import argparse 

45import configparser 

46import sys 

47import os 

48 

49 

50def gen_name(param): 

51 """ 

52 Check name given for the genome set: 

53 - must contain 4 characters, all alphanumeric (calling utils.check_format) 

54 - if does not fit, give advice on format (genus_genus_species_species) 

55 """ 

56 if not utils.check_format(param): 

57 msg = ("The genome name must contain 4 characters. For example, this name can " 

58 "correspond to the 2 first letters of genus, and 2 first letters of " 

59 "species, e.g. ESCO for Escherichia Coli.") 

60 raise argparse.ArgumentTypeError(msg) 

61 return param 

62 

63 

64def date_name(param): 

65 """ 

66 Check format of given date: 

67 - must contain 4 characters, all alphanumeric (calling utils.check_format) 

68 - if does not fit, give advice on format (MMYY) 

69 """ 

70 if not utils.check_format(param): 

71 msg = ("The date must contain 4 characters. Usually, it contains 4 digits, " 

72 "corresponding to the month (2 digits) and year (2 digits).") 

73 raise argparse.ArgumentTypeError(msg) 

74 return param 

75 

76 

77def get_date(): 

78 """ 

79 Get current date in MMYY format 

80 """ 

81 import time 

82 return time.strftime("%m%y") 

83 

84 

85def cont_num(param): 

86 """ 

87 Check number of contigs given 

88 - must be positive int 

89 - not more than 10000 

90 """ 

91 try: 

92 param = int(param) 

93 except Exception: 

94 msg = "argument --nbcont: invalid int value: {}".format(param) 

95 raise argparse.ArgumentTypeError(msg) 

96 if param < 0: 

97 msg = "The maximum number of contigs allowed must be a positive number." 

98 raise argparse.ArgumentTypeError(msg) 

99 if param >= 10000: 

100 msg = "We do not support genomes with more than 9999 contigs." 

101 raise argparse.ArgumentTypeError(msg) 

102 return param 

103 

104 

105def thread_num(param): 

106 """ 

107 check number of threads given. 

108 - must be a positive int 

109 - Cannot be more than number of threads available 

110 - if '0' given, return number of threads available 

111 """ 

112 import multiprocessing 

113 try: 

114 param = int(param) 

115 except Exception: 

116 msg = "argument --threads threads: invalid int value: {}".format(param) 

117 raise argparse.ArgumentTypeError(msg) 

118 # Get number of CPUs available to the process (to be able to use it on clusters) 

119 # For Mac OS, sched_getaffinity does not work -> count total number of CPUs in the machine 

120 # and not only available ones. 

121 try: 

122 nb_cpu = len(os.sched_getaffinity(0)) 

123 except AttributeError: 

124 nb_cpu = multiprocessing.cpu_count() 

125 if param > nb_cpu: 

126 msg = ("You have {} threads on your computer, you cannot ask for more: " 

127 "invalid value: {}").format(nb_cpu, param) 

128 raise argparse.ArgumentTypeError(msg) 

129 elif param < 0: 

130 msg = ("Please provide a positive number of threads (or 0 for all threads): " 

131 "Invalid value: {}").format(param) 

132 raise argparse.ArgumentTypeError(msg) 

133 elif param == 0: 

134 return nb_cpu 

135 return param 

136 

137 

138def positive_int(param): 

139 """ 

140 Return a positive int for argument --cutn 

141 """ 

142 try: 

143 param = int(param) 

144 except ValueError: 

145 msg = f"error: argument --cutn: invalid int value: '{param}'" 

146 raise argparse.ArgumentTypeError(msg) 

147 if param < 0: 

148 msg = f"error: argument --cutn must be a positive integer: invalid int value: '{param}'" 

149 raise argparse.ArgumentTypeError(msg) 

150 return param 

151 

152 

153def mash_dist(param): 

154 """ 

155 Check mash distance given. Must be a float between 0 and 1 included 

156 """ 

157 try: 

158 param = float(param) 

159 except ValueError: 

160 msg = f"error: mash distance: invalid float value: '{param}'" 

161 raise argparse.ArgumentTypeError(msg) 

162 if param < 0 or param > 1: 

163 msg = f"error: mash distance must be between 0 and 1: invalid value: '{param}'" 

164 raise argparse.ArgumentTypeError(msg) 

165 return param 

166 

167 

168def percentage(param): 

169 """ 

170 check argument given to parameter '-t tol' 

171 """ 

172 try: 

173 param = float(param) 

174 except Exception: 

175 msg = "argument -t tol: invalid float value: {}".format(param) 

176 raise argparse.ArgumentTypeError(msg) 

177 if param < 0 or param > 1: 

178 msg = ("The minimum %% of genomes required in a family to be persistent must " 

179 "be in [0, 1]. Invalid value: {}".format(param)) 

180 raise argparse.ArgumentTypeError(msg) 

181 return param 

182 

183 

184def perc_id(param): 

185 """ 

186 Check argument given to parameter -i percentage_id 

187 """ 

188 try: 

189 param = float(param) 

190 except Exception: 

191 msg = "argument -i percentage_id: invalid float value: {}".format(param) 

192 raise argparse.ArgumentTypeError(msg) 

193 if param < 0 or param > 1: 

194 msg = ("The minimum %% of identity must be in [0, 1]. Invalid value: {}".format(param)) 

195 raise argparse.ArgumentTypeError(msg) 

196 return param 

197 

198 

199class Conf_all_parser(configparser.ConfigParser): 

200 """ 

201 Read configfile and return arguments found, according to required type 

202  

203 Parameters 

204 ---------- 

205 conffile : str 

206 path to configuration file 

207 readsec : list 

208 list of sections of the config file to read 

209 clean_str : boolean 

210 by default, remove " surrounding strings. If no need to do it, set this parameter to False 

211 

212 Attributes 

213 ---------- 

214 conffile : str 

215 Path to configfile 

216 sec_dicts : dict 

217 {section1: {param: value}, {section2: {param:value}}} 

218 

219 """ 

220 def __init__(self, conffile, readsec=[], clean_str=True): 

221 super().__init__() 

222 # If there is a config file specified, but it does not exist -> exit with error message 

223 if conffile != "" and not os.path.isfile(conffile): 

224 print(f"Error: config file {conffile} not found.") 

225 sys.exit(1) 

226 self.conffile = conffile 

227 # Read the config file 

228 try: 

229 self.read(conffile) 

230 except configparser.DuplicateOptionError as err: 

231 print(err) 

232 sys.exit(1) 

233 self.sec_dicts = {} 

234 # Convert configfile sections to dicts 

235 for sec in readsec: 

236 # If section in configfile, put its arguments and values to a dict 

237 # If not, create empty section, and associate with empty dict 

238 if sec in dict(self): 

239 self.sec_dicts[sec] = dict(self[sec]) 

240 if clean_str: 

241 self.clean_strings(sec) 

242 else: 

243 self.sec_dicts[sec] = {} 

244 self.add_section(sec) 

245 

246 def clean_strings(self, section): 

247 """ 

248 Remove quote marks surrounding strings 

249 """ 

250 for param in self.sec_dicts[section]: 

251 initial = self.sec_dicts[section][param] 

252 self.sec_dicts[section][param] = initial.strip('"') 

253 self[section][param] = initial.strip('"') 

254 

255 def get_section_dict(self, section): 

256 """ 

257 get dictionary of values for 'section' section 

258 """ 

259 if section in self.sec_dicts: 

260 return self.sec_dicts[section] 

261 else: 

262 print(f"No section {section} in {self.conffile}") 

263 sys.exit(1) 

264 

265 def add_default(self, defargs, section): 

266 """ 

267 Complete 'section' dict with default parameters. 

268 If key already defined, keep current value. 

269 """ 

270 for key, val in defargs.items(): 

271 if key not in self.sec_dicts[section]: 

272 self[section][key] = str(val) 

273 self.sec_dicts[section][key] = val 

274 

275 def update(self, args, section): 

276 """ 

277 Add all arguments from args. If key already exists in self, overwrite it. 

278 Otherwise, create it. 

279 """ 

280 self.sec_dicts[section].update(args) 

281 for key, val in self.sec_dicts[section].items(): 

282 self[section][key] = str(val) 

283 

284 def set_boolean(self, section, param): 

285 """ 

286 Change param of section to boolean 

287 raise error if problem 

288 """ 

289 try: 

290 bool_param = self.getboolean(section, param) 

291 self.sec_dicts[section][param] = bool_param 

292 except ValueError as err: 

293 val = self[section][param] 

294 print(f"ERROR: {param} must be a boolean. Wrong value: {val}.") 

295 sys.exit(1) 

296 

297 def set_int(self, section, param): 

298 """ 

299 Change param of section to int 

300 raise error if problem 

301 """ 

302 try: 

303 int_param = self.getint(section, param) 

304 self.sec_dicts[section][param] = int_param 

305 except ValueError as err: 

306 val = self[section][param] 

307 print(f"ERROR: {param} must be an int. Wrong value: {val}.") 

308 sys.exit(1) 

309 

310 def set_float(self, section, param): 

311 """ 

312 Change param of section to float 

313 raise error if problem 

314 """ 

315 try: 

316 float_param = self.getfloat(section, param) 

317 self.sec_dicts[section][param] = float_param 

318 except ValueError as err: 

319 val = self[section][param] 

320 print(f"ERROR: {param} must be a float. Wrong value: {val}.") 

321 sys.exit(1)