Coverage for PanACoTA/utils_argparse.py: 99%
155 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-20 14:37 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-20 14:37 +0000
1#!/usr/bin/env python3
2# coding: utf-8
4# ###############################################################################
5# This file is part of PanACOTA. #
6# #
7# Authors: Amandine Perrin #
8# Copyright © 2018-2020 Institut Pasteur (Paris). #
9# See the COPYRIGHT file for details. #
10# #
11# PanACOTA is a software providing tools for large scale bacterial comparative #
12# genomics. From a set of complete and/or draft genomes, you can: #
13# - Do a quality control of your strains, to eliminate poor quality #
14# genomes, which would not give any information for the comparative study #
15# - Uniformly annotate all genomes #
16# - Do a Pan-genome #
17# - Do a Core or Persistent genome #
18# - Align all Core/Persistent families #
19# - Infer a phylogenetic tree from the Core/Persistent families #
20# #
21# PanACOTA is free software: you can redistribute it and/or modify it under the #
22# terms of the Affero GNU General Public License as published by the Free #
23# Software Foundation, either version 3 of the License, or (at your option) #
24# any later version. #
25# #
26# PanACOTA is distributed in the hope that it will be useful, but WITHOUT ANY #
27# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS #
28# FOR A PARTICULAR PURPOSE. See the Affero GNU General Public License #
29# for more details. #
30# #
31# You should have received a copy of the Affero GNU General Public License #
32# along with PanACOTA (COPYING file). #
33# If not, see <https://www.gnu.org/licenses/>. #
34# ###############################################################################
36"""
37Functions to check argparse aguments given by user
40@author gem
41April 2017
42"""
43from PanACoTA import utils
44import argparse
45import configparser
46import sys
47import os
50def gen_name(param):
51 """
52 Check name given for the genome set:
53 - must contain 4 characters, all alphanumeric (calling utils.check_format)
54 - if does not fit, give advice on format (genus_genus_species_species)
55 """
56 if not utils.check_format(param):
57 msg = ("The genome name must contain 4 characters. For example, this name can "
58 "correspond to the 2 first letters of genus, and 2 first letters of "
59 "species, e.g. ESCO for Escherichia Coli.")
60 raise argparse.ArgumentTypeError(msg)
61 return param
64def date_name(param):
65 """
66 Check format of given date:
67 - must contain 4 characters, all alphanumeric (calling utils.check_format)
68 - if does not fit, give advice on format (MMYY)
69 """
70 if not utils.check_format(param):
71 msg = ("The date must contain 4 characters. Usually, it contains 4 digits, "
72 "corresponding to the month (2 digits) and year (2 digits).")
73 raise argparse.ArgumentTypeError(msg)
74 return param
77def get_date():
78 """
79 Get current date in MMYY format
80 """
81 import time
82 return time.strftime("%m%y")
85def cont_num(param):
86 """
87 Check number of contigs given
88 - must be positive int
89 - not more than 10000
90 """
91 try:
92 param = int(param)
93 except Exception:
94 msg = "argument --nbcont: invalid int value: {}".format(param)
95 raise argparse.ArgumentTypeError(msg)
96 if param < 0:
97 msg = "The maximum number of contigs allowed must be a positive number."
98 raise argparse.ArgumentTypeError(msg)
99 if param >= 10000:
100 msg = "We do not support genomes with more than 9999 contigs."
101 raise argparse.ArgumentTypeError(msg)
102 return param
105def thread_num(param):
106 """
107 check number of threads given.
108 - must be a positive int
109 - Cannot be more than number of threads available
110 - if '0' given, return number of threads available
111 """
112 import multiprocessing
113 try:
114 param = int(param)
115 except Exception:
116 msg = "argument --threads threads: invalid int value: {}".format(param)
117 raise argparse.ArgumentTypeError(msg)
118 # Get number of CPUs available to the process (to be able to use it on clusters)
119 # For Mac OS, sched_getaffinity does not work -> count total number of CPUs in the machine
120 # and not only available ones.
121 try:
122 nb_cpu = len(os.sched_getaffinity(0))
123 except AttributeError:
124 nb_cpu = multiprocessing.cpu_count()
125 if param > nb_cpu:
126 msg = ("You have {} threads on your computer, you cannot ask for more: "
127 "invalid value: {}").format(nb_cpu, param)
128 raise argparse.ArgumentTypeError(msg)
129 elif param < 0:
130 msg = ("Please provide a positive number of threads (or 0 for all threads): "
131 "Invalid value: {}").format(param)
132 raise argparse.ArgumentTypeError(msg)
133 elif param == 0:
134 return nb_cpu
135 return param
138def positive_int(param):
139 """
140 Return a positive int for argument --cutn
141 """
142 try:
143 param = int(param)
144 except ValueError:
145 msg = f"error: argument --cutn: invalid int value: '{param}'"
146 raise argparse.ArgumentTypeError(msg)
147 if param < 0:
148 msg = f"error: argument --cutn must be a positive integer: invalid int value: '{param}'"
149 raise argparse.ArgumentTypeError(msg)
150 return param
153def mash_dist(param):
154 """
155 Check mash distance given. Must be a float between 0 and 1 included
156 """
157 try:
158 param = float(param)
159 except ValueError:
160 msg = f"error: mash distance: invalid float value: '{param}'"
161 raise argparse.ArgumentTypeError(msg)
162 if param < 0 or param > 1:
163 msg = f"error: mash distance must be between 0 and 1: invalid value: '{param}'"
164 raise argparse.ArgumentTypeError(msg)
165 return param
168def percentage(param):
169 """
170 check argument given to parameter '-t tol'
171 """
172 try:
173 param = float(param)
174 except Exception:
175 msg = "argument -t tol: invalid float value: {}".format(param)
176 raise argparse.ArgumentTypeError(msg)
177 if param < 0 or param > 1:
178 msg = ("The minimum %% of genomes required in a family to be persistent must "
179 "be in [0, 1]. Invalid value: {}".format(param))
180 raise argparse.ArgumentTypeError(msg)
181 return param
184def perc_id(param):
185 """
186 Check argument given to parameter -i percentage_id
187 """
188 try:
189 param = float(param)
190 except Exception:
191 msg = "argument -i percentage_id: invalid float value: {}".format(param)
192 raise argparse.ArgumentTypeError(msg)
193 if param < 0 or param > 1:
194 msg = ("The minimum %% of identity must be in [0, 1]. Invalid value: {}".format(param))
195 raise argparse.ArgumentTypeError(msg)
196 return param
199class Conf_all_parser(configparser.ConfigParser):
200 """
201 Read configfile and return arguments found, according to required type
203 Parameters
204 ----------
205 conffile : str
206 path to configuration file
207 readsec : list
208 list of sections of the config file to read
209 clean_str : boolean
210 by default, remove " surrounding strings. If no need to do it, set this parameter to False
212 Attributes
213 ----------
214 conffile : str
215 Path to configfile
216 sec_dicts : dict
217 {section1: {param: value}, {section2: {param:value}}}
219 """
220 def __init__(self, conffile, readsec=[], clean_str=True):
221 super().__init__()
222 # If there is a config file specified, but it does not exist -> exit with error message
223 if conffile != "" and not os.path.isfile(conffile):
224 print(f"Error: config file {conffile} not found.")
225 sys.exit(1)
226 self.conffile = conffile
227 # Read the config file
228 try:
229 self.read(conffile)
230 except configparser.DuplicateOptionError as err:
231 print(err)
232 sys.exit(1)
233 self.sec_dicts = {}
234 # Convert configfile sections to dicts
235 for sec in readsec:
236 # If section in configfile, put its arguments and values to a dict
237 # If not, create empty section, and associate with empty dict
238 if sec in dict(self):
239 self.sec_dicts[sec] = dict(self[sec])
240 if clean_str:
241 self.clean_strings(sec)
242 else:
243 self.sec_dicts[sec] = {}
244 self.add_section(sec)
246 def clean_strings(self, section):
247 """
248 Remove quote marks surrounding strings
249 """
250 for param in self.sec_dicts[section]:
251 initial = self.sec_dicts[section][param]
252 self.sec_dicts[section][param] = initial.strip('"')
253 self[section][param] = initial.strip('"')
255 def get_section_dict(self, section):
256 """
257 get dictionary of values for 'section' section
258 """
259 if section in self.sec_dicts:
260 return self.sec_dicts[section]
261 else:
262 print(f"No section {section} in {self.conffile}")
263 sys.exit(1)
265 def add_default(self, defargs, section):
266 """
267 Complete 'section' dict with default parameters.
268 If key already defined, keep current value.
269 """
270 for key, val in defargs.items():
271 if key not in self.sec_dicts[section]:
272 self[section][key] = str(val)
273 self.sec_dicts[section][key] = val
275 def update(self, args, section):
276 """
277 Add all arguments from args. If key already exists in self, overwrite it.
278 Otherwise, create it.
279 """
280 self.sec_dicts[section].update(args)
281 for key, val in self.sec_dicts[section].items():
282 self[section][key] = str(val)
284 def set_boolean(self, section, param):
285 """
286 Change param of section to boolean
287 raise error if problem
288 """
289 try:
290 bool_param = self.getboolean(section, param)
291 self.sec_dicts[section][param] = bool_param
292 except ValueError as err:
293 val = self[section][param]
294 print(f"ERROR: {param} must be a boolean. Wrong value: {val}.")
295 sys.exit(1)
297 def set_int(self, section, param):
298 """
299 Change param of section to int
300 raise error if problem
301 """
302 try:
303 int_param = self.getint(section, param)
304 self.sec_dicts[section][param] = int_param
305 except ValueError as err:
306 val = self[section][param]
307 print(f"ERROR: {param} must be an int. Wrong value: {val}.")
308 sys.exit(1)
310 def set_float(self, section, param):
311 """
312 Change param of section to float
313 raise error if problem
314 """
315 try:
316 float_param = self.getfloat(section, param)
317 self.sec_dicts[section][param] = float_param
318 except ValueError as err:
319 val = self[section][param]
320 print(f"ERROR: {param} must be a float. Wrong value: {val}.")
321 sys.exit(1)