Coverage for PanACoTA/pangenome_module/mmseqs_functions.py: 100%

153 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-20 14:37 +0000

1#!/usr/bin/env python3 

2# coding: utf-8 

3 

4# ############################################################################### 

5# This file is part of PanACOTA. # 

6# # 

7# Authors: Amandine Perrin # 

8# Copyright © 2018-2020 Institut Pasteur (Paris). # 

9# See the COPYRIGHT file for details. # 

10# # 

11# PanACOTA is a software providing tools for large scale bacterial comparative # 

12# genomics. From a set of complete and/or draft genomes, you can: # 

13# - Do a quality control of your strains, to eliminate poor quality # 

14# genomes, which would not give any information for the comparative study # 

15# - Uniformly annotate all genomes # 

16# - Do a Pan-genome # 

17# - Do a Core or Persistent genome # 

18# - Align all Core/Persistent families # 

19# - Infer a phylogenetic tree from the Core/Persistent families # 

20# # 

21# PanACOTA is free software: you can redistribute it and/or modify it under the # 

22# terms of the Affero GNU General Public License as published by the Free # 

23# Software Foundation, either version 3 of the License, or (at your option) # 

24# any later version. # 

25# # 

26# PanACOTA is distributed in the hope that it will be useful, but WITHOUT ANY # 

27# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # 

28# FOR A PARTICULAR PURPOSE. See the Affero GNU General Public License # 

29# for more details. # 

30# # 

31# You should have received a copy of the Affero GNU General Public License # 

32# along with PanACOTA (COPYING file). # 

33# If not, see <https://www.gnu.org/licenses/>. # 

34# ############################################################################### 

35 

36""" 

37Functions to use mmseqs to create a pangenome 

38 

39@author gem 

40April 2017 

41""" 

42import logging 

43import os 

44import sys 

45import time 

46import threading 

47import progressbar 

48import copy 

49 

50from PanACoTA import utils 

51from PanACoTA import utils_pangenome as utils_pan 

52 

53logger = logging.getLogger("pangenome.mmseqs") 

54 

55 

56def run_all_pangenome(min_id, clust_mode, outdir, prt_path, threads, panfile=None, quiet=False): 

57 """ 

58 Run all steps to build a pangenome: 

59 

60 - create mmseqs database from protein bank 

61 - cluster proteins 

62 - convert to pangenome 

63 

64 

65 Parameters 

66 ---------- 

67 min_id : float 

68 minimum percentage of identity to be in the same family 

69 clust_mode : [0, 1, 2] 

70 0 for 'set cover', 1 for 'single-linkage', 2 for 'CD-Hit' 

71 outdir : str 

72 directory where output cluster file must be saved 

73 prt_path : str 

74 path to file containing all proteins to cluster. 

75 threads : int 

76 number of threads which can be used 

77 panfile : str or None 

78 name for output pangenome file. Otherwise, will use default name 

79 quiet : bool 

80 True if nothing must be written on stdout, False otherwise. 

81 

82 Returns 

83 ------- 

84 (families, outfile) : tuple 

85 

86 - families : {fam_num: [all members]} 

87 - outfile : pangenome filename 

88 """ 

89 # Get general information and file/directory names 

90 prt_bank = os.path.basename(prt_path) 

91 # start = time.strftime('%Y-%m-%d_%H-%M-%S') 

92 information = ("Will run MMseqs2 with:\n" 

93 f"\t- minimum sequence identity = {min_id*100}%\n" 

94 f"\t- cluster mode {clust_mode}") 

95 if threads > 1: 

96 information += f"\n\t- {threads} threads" 

97 logger.info(information) 

98 infoname = get_info(threads, min_id, clust_mode) 

99 logmmseq = get_logmmseq(outdir, prt_bank, infoname) 

100 

101 tmpdir = os.path.join(outdir, "tmp_" + prt_bank + "_" + infoname) 

102 mmseqdb = os.path.join(tmpdir, prt_bank + "-msDB") 

103 mmseqclust = os.path.join(tmpdir, prt_bank + "-clust-" + infoname) 

104 mmseqstsv = mmseqclust + ".tsv" 

105 # Get pangenome filename 

106 if not panfile: 

107 base = os.path.basename(mmseqstsv) 

108 panfile = os.path.join(outdir, f"PanGenome-{prt_bank}-clust-{infoname}.lst") 

109 else: 

110 panfile = os.path.join(outdir, panfile) 

111 # If pangenome file already exists, read it to get families 

112 if os.path.isfile(panfile): 

113 logger.warning(f"Pangenome file {panfile} already exists. PanACoTA will read it to get families.") 

114 _, families, _ = utils_pan.read_pan_file(panfile, logger) 

115 else: 

116 os.makedirs(tmpdir, exist_ok=True) 

117 # Create ffindex of DB if not already done 

118 status = do_mmseqs_db(mmseqdb, prt_path, logmmseq, quiet) 

119 # status = create_mmseqs_db(mmseqdb, prt_path, logmmseq) 

120 # Status = ok means that mmseqs_db files already existed and were not re-done 

121 # If they were redone (or just done), remove any existing following file (mmseqs clust, tsv, csv) 

122 # Cluster with mmseqs 

123 families, panfile = do_pangenome(outdir, prt_bank, mmseqdb, mmseqclust, tmpdir, logmmseq, min_id, 

124 clust_mode, status, threads, panfile, quiet) 

125 return families, panfile 

126 

127 

128def get_info(threads, min_id, clust_mode): 

129 """ 

130 Get string containing all information on future run 

131 

132 Parameters 

133 ---------- 

134 threads : int 

135 max number of threads to use 

136 min_id : float 

137 min percentage of identity to consider 2 proteins in hte same family 

138 clust_mode : [0, 1, 2] 

139 0 for 'set cover', 1 for 'single-linkage', 2 for 'CD-Hit' 

140 

141 Returns 

142 ------- 

143 str 

144 string containing info on current run, to put in filenames 

145 """ 

146 if threads != 1: 

147 threadinfo = "-th" + str(threads) 

148 else: 

149 threadinfo = "" 

150 infoname = str(min_id) + "-mode" + str(clust_mode) + threadinfo 

151 return infoname 

152 

153 

154def get_logmmseq(outdir, prt_bank, infoname): 

155 """ 

156 Get filename of logfile, given information 

157 

158 Parameters 

159 ---------- 

160 outdir : str 

161 output directory 

162 prt_bank : str 

163 name of file (without path) containing all proteins to cluster 

164 infoname : str 

165 string containing information on the current run 

166 

167 Returns 

168 ------- 

169 str 

170 path to mmseq logfile 

171 """ 

172 return os.path.join(outdir, "mmseq_" + prt_bank + "_" + infoname + ".log") 

173 

174 

175def do_mmseqs_db(mmseqdb, prt_path, logmmseq, quiet): 

176 """ 

177 Runs create_mmseqs_db with an "infinite progress bar" in the background. 

178  

179 create_mmseqs_db does : 

180 Create ffindex of protein bank (prt_path) if not already done. If done, just write a message 

181 to tell the user that the current existing file will be used. 

182 

183 Parameters 

184 ---------- 

185 mmseqdb : str 

186 path to base filename for output of mmseqs createdb 

187 prt_path : str 

188 path to the file containing all proteins to cluster 

189 logmmseq : str 

190 path to file where logs must be written 

191 quiet : bool 

192 True if no output in stderr/stdout, False otherwise 

193 

194 Returns 

195 ------- 

196 bool 

197 True if mmseqs db just created, False if already existed 

198 """ 

199 logger.info("Creating database") 

200 try: 

201 stop_bar = False 

202 if quiet: 

203 widgets = [] 

204 # If not quiet, start a progress bar while clustering proteins. We cannot guess 

205 # how many time it will take, so we start an "infinite" bar, and send it a signal 

206 # when it has to stop. If quiet, we start a thread that will immediatly stop 

207 else: 

208 widgets = [progressbar.BouncingBar(marker=progressbar.RotatingMarker(markers="◐◓◑◒")), 

209 " - ", progressbar.Timer()] 

210 x = threading.Thread(target=utils.thread_progressbar, args=(widgets, lambda : stop_bar,)) 

211 x.start() 

212 res = create_mmseqs_db(mmseqdb, prt_path, logmmseq) 

213 # except KeyboardInterrupt: # pragma: no cover 

214 except: # pragma: no cover 

215 stop_bar = True 

216 x.join() 

217 sys.exit(1) 

218 # Clustering done, stop bar and join (if quiet, it was already finished, so we just join it) 

219 stop_bar = True 

220 x.join() 

221 return res 

222 

223 

224def do_pangenome(outdir, prt_bank, mmseqdb, mmseqclust, tmpdir, logmmseq, min_id, clust_mode, 

225 just_done, threads, panfile, quiet=False): 

226 """ 

227 Use mmseqs to cluster proteins 

228 

229 Parameters 

230 ---------- 

231 outdir : str 

232 directory where output files are saved 

233 prt_bank : str 

234 name of the file containing all proteins to cluster, without path 

235 mmseqdb : str 

236 path to base filename of output of mmseqs createdb 

237 mmseqclust : str 

238 mmseqs clust 

239 tmp_dir : str 

240 path to tmp directory 

241 logmmseq : str 

242 path to file for mmseqs logs 

243 min_id : float 

244 min percentage of identity to be considered in the same family (between 0 and 1) 

245 clust_mode : [0, 1, 2] 

246 0 for 'set cover', 1 for 'single-linkage', 2 for 'CD-Hit' 

247 just_done : str 

248 True if mmseqs db was just (re)created -> remove mmseqs clust.  

249 False if mmseqs db was kept from previous run -> no need to rerun mmseqs clust if already exists 

250 threads : int 

251 max number of threads to use 

252 panfile : str 

253 if a pangenome file is specified. Otherwise, default pangenome name will be used 

254 quiet : bool 

255 true if nothing must be print on stdout/stderr, false otherwise (show progress bar) 

256 

257 Returns 

258 ------- 

259 (families, outfile) : tuple 

260 

261 - families : {fam_num: [all members]} 

262 - outfile : pangenome filename 

263 """ 

264 mmseqstsv = mmseqclust + ".tsv" 

265 # If we just made the database, we must redo all next steps 

266 # -> if existing, remove 

267 # mmseqsclust (created by run_mmseqs_clust) 

268 # mmseqstsv (created by mmseqs_to_pangenome) 

269 # pangenome file 

270 if just_done and os.path.isfile(mmseqclust) or os.path.isfile(mmseqstsv) or os.path.isfile(panfile): 

271 logger.details("Removing existing clustering and/or pangenome files.") 

272 utils.remove(mmseqclust) 

273 utils.remove(mmseqstsv) 

274 utils.remove(panfile) 

275 bar = None 

276 logger.debug(mmseqclust) 

277 if os.path.isfile(mmseqclust): 

278 logger.warning((f"mmseqs clustering {mmseqclust} already exists. The program will now convert " 

279 "it to a pangenome file.")) 

280 else: 

281 logger.info("Clustering proteins...") 

282 try: 

283 stop_bar = False 

284 if quiet: 

285 widgets = [] 

286 # If not quiet, start a progress bar while clustering proteins. We cannot guess 

287 # how many time it will take, so we start an "infinite" bar, and send it a signal 

288 # when it has to stop. If quiet, we start a thread that will immediatly stop 

289 else: 

290 widgets = [progressbar.BouncingBar(marker=progressbar.RotatingMarker(markers="◐◓◑◒")), 

291 " - ", progressbar.Timer()] 

292 x = threading.Thread(target=utils.thread_progressbar, args=(widgets, lambda : stop_bar,)) 

293 x.start() 

294 args = (mmseqdb, mmseqclust, tmpdir, logmmseq, min_id, threads, clust_mode) 

295 run_mmseqs_clust(args) 

296 # except KeyboardInterrupt: # pragma: no cover 

297 except: # pragma: no cover 

298 stop_bar = True 

299 x.join() 

300 sys.exit(1) 

301 # Clustering done, stop bar and join (if quiet, it was already finished, so we just join it) 

302 stop_bar = True 

303 x.join() 

304 # Convert output to tsv file (one line per comparison done) 

305 # # Convert output to tsv file (one line per comparison done) 

306 # -> returns (families, outfile) 

307 families = mmseqs_to_pangenome(mmseqdb, mmseqclust, logmmseq, panfile) 

308 return families, panfile 

309 

310 

311def run_mmseqs_clust(args): 

312 """ 

313 Run mmseqs clustering 

314 

315 Parameters 

316 ---------- 

317 args : tuple 

318 (mmseqdb, mmseqclust, tmpdir, logmmseq, min_id, threads, clust_mode), with: 

319 

320 * mmseqdb: path to base filename (output created by mmseq db) 

321 * mmseqclust: path to base filename for output of mmseq clustering 

322 * tmpdir : path to folder which will contain mmseq temporary files 

323 * logmmseq : path to file where logs must be written 

324 * min_id : min percentage of identity to be considered in the same family 

325 * (between 0 and 1) 

326 * threads : max number of threads to use 

327 * clust_mode : [0, 1, 2], 0 for 'set cover', 1 for 'single-linkage', 2 for 'CD-Hit' 

328 

329 """ 

330 mmseqdb, mmseqclust, tmpdir, logmmseq, min_id, threads, clust_mode = args 

331 cmd = (f"mmseqs cluster {mmseqdb} {mmseqclust} {tmpdir} --min-seq-id {min_id} --threads {threads} --cluster-mode " 

332 f"{clust_mode}") 

333 logger.details(f"MMseqs command: {cmd}") 

334 msg = f"Problem while clustering proteins with mmseqs. See log in {logmmseq}" 

335 with open(logmmseq, "a") as logm: 

336 utils.run_cmd(cmd, msg, eof=False, stdout=logm, stderr=logm) 

337 

338 

339def mmseqs_to_pangenome(mmseqdb, mmseqclust, logmmseq, outfile): 

340 """ 

341 Convert mmseqs clustering to a pangenome file: 

342 

343 - convert mmseqs results to tsv file 

344 - convert tsv file to pangenome 

345 

346 Parameters 

347 ---------- 

348 mmseqdb : str 

349 path to base filename of output of mmseqs createdb 

350 mmseqclust : str 

351 path to base filename of output of mmseqs cluster 

352 logmmseq : str 

353 path to file where logs must be written 

354 outfile : str 

355 pangenome filename 

356 

357 Returns 

358 ------- 

359 dict 

360 - families : {fam_num: [all members]} 

361 """ 

362 cmd = f"mmseqs createtsv {mmseqdb} {mmseqdb} {mmseqclust} {mmseqclust}.tsv" 

363 msg = "Problem while trying to convert mmseq result file to tsv file" 

364 logger.details(f"MMseqs command: {cmd}") 

365 with open(logmmseq, "a") as logf: 

366 utils.run_cmd(cmd, msg, eof=True, stdout=logf, stderr=logf) 

367 # Convert the tsv file to a 'pangenome' file: one line per family 

368 families = mmseqs_tsv_to_pangenome(mmseqclust, logmmseq, outfile) 

369 return families 

370 

371 

372def mmseqs_tsv_to_pangenome(mmseqclust, logmmseq, outfile): 

373 """ 

374 Convert the tsv output file of mmseqs to the pangenome file 

375 

376 Parameters 

377 ---------- 

378 mmseqclust : str 

379 path to base filename for output of mmseq clustering 

380 logmmseq : str 

381 path to file where logs must be written 

382 outfile : str 

383 pangenome filename, or None if default one must be used 

384 

385 Returns 

386 ------- 

387 dict 

388 

389 - families : {fam_num: [all members]} 

390 """ 

391 logger.info("Converting mmseqs results to pangenome file") 

392 tsvfile = mmseqclust + ".tsv" 

393 clusters = mmseq_tsv_to_clusters(tsvfile) 

394 families = clusters_to_file(clusters, outfile) 

395 end = time.strftime('%Y-%m-%d_%H-%M-%S') 

396 with open(logmmseq, "a") as logm: 

397 logm.write(f"End: {end}") 

398 return families 

399 

400 

401def mmseq_tsv_to_clusters(mmseq): 

402 """ 

403 Reads the output of mmseq as a tsv file, and converts it to a python dict 

404 

405 Parameters 

406 ---------- 

407 mmseq : str 

408 filename of mmseq clustering output in tsv format 

409 

410 Returns 

411 ------- 

412 dict 

413 {representative_of_cluster: [list of members]} 

414 

415 """ 

416 clusters = {} # {representative: [all members]} 

417 with open(mmseq) as mmsf: 

418 for line in mmsf: 

419 repres, other = line.strip().split() 

420 if repres in clusters: 

421 clusters[repres].append(other) 

422 else: 

423 clusters[repres] = [repres] 

424 logger.info("Pangenome has {} families.".format(len(clusters))) 

425 return clusters 

426 

427 

428def clusters_to_file(clust, fileout): 

429 """ 

430 Write all clusters to a file 

431 

432 Parameters 

433 ---------- 

434 clust : {first_member: [all members = protein names]} 

435 fileout : filename of pangenome where families must be written 

436 

437 Returns 

438 ------- 

439 dict 

440 families : {famnum: [members]} 

441 """ 

442 families = {} # {famnum: [members]} 

443 with open(fileout, "w") as fout: 

444 num = 1 

445 for _, fam in clust.items(): 

446 families[num] = [] 

447 fout.write(str(num)) 

448 for mem in sorted(fam, key=utils.sort_proteins): 

449 families[num].append(mem) 

450 fout.write(" " + mem) 

451 fout.write("\n") 

452 num += 1 

453 return families 

454 

455 

456def create_mmseqs_db(mmseqdb, prt_path, logmmseq): 

457 """ 

458 Create ffindex of protein bank (prt_path) if not already done. If done, just write a message 

459 to tell the user that the current existing file will be used. 

460 

461 Parameters 

462 ---------- 

463 mmseqdb : str 

464 path to base filename for output of mmseqs createdb 

465 prt_path : str 

466 path to the file containing all proteins to cluster 

467 logmmseq : str 

468 path to file where logs must be written 

469 

470 

471 Returns 

472 ------- 

473 bool 

474 True if mmseqs db just created, False if already existed 

475 """ 

476 outext = ["", ".index", ".dbtype", ".lookup", "_h", "_h.index", "_h.dbtype"] 

477 files_existing = [] 

478 if os.path.isfile(mmseqdb): 

479 for file in [mmseqdb + ext for ext in outext]: 

480 if not os.path.isfile(file): 

481 continue 

482 files_existing.append(file) 

483 if len(files_existing) != len(outext): 

484 logger.warning(f"mmseqs database {mmseqdb} already exists, but at least 1 associated " 

485 "file (.dbtype, .index etc). is missing. The program will " 

486 "remove existing files and recreate the database.") 

487 files_remaining = copy.deepcopy(files_existing) 

488 for file in files_existing: 

489 os.remove(file) # Delete file 

490 files_remaining.remove(file) # Remove file from list of existing files 

491 logger.details(f"Removing '{file}'.") 

492 files_existing = copy.deepcopy(files_remaining) 

493 else: 

494 logger.warning(f"mmseqs database {mmseqdb} already exists. The program will " 

495 "use it.") 

496 return False 

497 logger.debug("Existing files: {}".format(len(files_existing))) 

498 logger.debug("Expected extensions: {}".format(len(outext))) 

499 cmd = f"mmseqs createdb {prt_path} {mmseqdb}" 

500 msg = (f"Problem while trying to convert database {prt_path} to mmseqs " 

501 "database format.") 

502 logger.details(f"MMseqs command: {cmd}") 

503 with open(logmmseq, "w") as logf: 

504 utils.run_cmd(cmd, msg, eof=True, stdout=logf, stderr=logf) 

505 return True