Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix greedy cpu allocation #213

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion carveme/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@
#set_default_solver(config.get('solver', 'default_solver'))
#default_parameters[Parameter.FEASIBILITY_TOL] = config.getfloat('solver', 'feas_tol')
#default_parameters[Parameter.OPTIMALITY_TOL] = config.getfloat('solver', 'opt_tol')
#default_parameters[Parameter.INT_FEASIBILITY_TOL] = config.getfloat('solver', 'int_feas_tol')
#default_parameters[Parameter.INT_FEASIBILITY_TOL] = config.getfloat('solver', 'int_feas_tol')
39 changes: 29 additions & 10 deletions carveme/cli/carve.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,17 @@
import os
import os.path
import pandas as pd
from multiprocessing import Pool
from multiprocessing import Pool, cpu_count
from glob import glob
import subprocess


def first_run_check():
def first_run_check(ncores):
diamond_db = project_dir + config.get('generated', 'diamond_db')
if not os.path.exists(diamond_db):
print("Running diamond for the first time, please wait while we build the internal database...")
fasta_file = project_dir + config.get('generated', 'fasta_file')
cmd = ['diamond', 'makedb', '--in', fasta_file, '-d', diamond_db[:-5]]
cmd = ['diamond', 'makedb', '--threads', str(ncores), '--in', fasta_file, '-d', diamond_db[:-5]]
try:
exit_code = subprocess.call(cmd)
except OSError:
Expand All @@ -45,7 +45,7 @@ def build_model_id(name):
def maincall(inputfile, input_type='protein', outputfile=None, diamond_args=None, universe=None, universe_file=None,
ensemble_size=None, verbose=False, debug=False, flavor=None, gapfill=None, blind_gapfill=False, init=None,
mediadb=None, default_score=None, uptake_score=None, soft_score=None, soft=None, hard=None, reference=None,
ref_score=None, recursive_mode=False):
ref_score=None, recursive_mode=False, ncores=None):

if recursive_mode:
model_id = os.path.splitext(os.path.basename(inputfile))[0]
Expand Down Expand Up @@ -108,7 +108,7 @@ def maincall(inputfile, input_type='protein', outputfile=None, diamond_args=None
print('Running diamond...')
diamond_db = project_dir + config.get('generated', 'diamond_db')
blast_output = os.path.splitext(inputfile)[0] + '.tsv'
exit_code = run_blast(inputfile, input_type, blast_output, diamond_db, diamond_args, verbose)
exit_code = run_blast(inputfile, input_type, blast_output, diamond_db, diamond_args, ncores, verbose)

if exit_code is None:
print('Unable to run diamond (make sure diamond is available in your PATH).')
Expand Down Expand Up @@ -313,6 +313,16 @@ def main():

parser.add_argument('--blind-gapfill', action='store_true', help=argparse.SUPPRESS)

parser.add_argument('--njobs', type=int, default=cpu_count(),
help="number of concurrent tasks to run via "
"multiprocessing.Pool; defaults to " +
"multiprocessing.cpu_count() " +
" (%(default)s)")
parser.add_argument('--ncores', type=int, default=1,
help="number of cores to pass to "
"each multiprocessing.Pool job (eg diamond); " +
"jobs. default: %(default)s")

args = parser.parse_args()

if args.gapfill and args.ensemble:
Expand Down Expand Up @@ -345,12 +355,19 @@ def main():
else:
flavor = config.get('sbml', 'default_flavor')

if (args.ncores * args.njobs) > cpu_count():
parser.error(f'--ncores ({args.ncores}) multiplied by --njobs ({args.njobs}) cannot exceed {cpu_count()}')

if args.solver:
set_default_solver(args.solver)
# else:
# set_default_solver(config.get('solver', 'default_solver'))

first_run_check()
# give the initial diamond run all available resources
first_run_check(args.ncores * args.njobs)

if args.gapfill and args.ensemble:
parser.error('Gap fill and ensemble generation cannot currently be combined (not implemented yet).')

if not args.recursive:
if len(args.input) > 1:
Expand All @@ -377,7 +394,8 @@ def main():
soft=args.soft,
hard=args.hard,
reference=args.reference,
ref_score=args.reference_score
ref_score=args.reference_score,
ncores=args.ncores,
)

else:
Expand All @@ -404,10 +422,11 @@ def f(x):
hard=args.hard,
reference=args.reference,
ref_score=args.reference_score,
recursive_mode=True
)
recursive_mode=True,
ncores=args.ncores,
)

p = Pool()
p = Pool(args.njobs)
p.map(f, args.input)


Expand Down
3 changes: 0 additions & 3 deletions carveme/config.cfg
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
[input]
biomass_library = data/input/biomass_db.tsv
media_library = data/input/media_db.tsv
metabolomics = data/input/metabolomics_park2016.csv
refseq = data/input/refseq_release_201.tsv.gz
mnx_compounds = data/input/mnx_compounds.tsv
bigg_models = data/input/bigg_models.tsv
Expand All @@ -14,7 +13,6 @@ bigg_gprs = data/generated/bigg_gprs.csv.gz
model_specific_data = data/generated/model_specific_data.csv.gz
gene_annotations = data/generated/gene_annotations.tsv.gz
bigg_universe = data/generated/bigg_universe.xml.gz
bigg_annotated = data/generated/bigg_annotated.xml.gz
default_universe = data/generated/universe_bacteria.xml.gz
fasta_file = data/generated/bigg_proteins.faa
diamond_db = data/generated/bigg_proteins.dmnd
Expand All @@ -35,4 +33,3 @@ default_flavor = bigg

[gapfill]
max_uptake = 100

4 changes: 2 additions & 2 deletions carveme/reconstruction/diamond.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def load_diamond_results(filename, drop_unused_cols=True):
return data


def run_blast(inputfile, input_type, outputfile, database, args=None, verbose=True):
def run_blast(inputfile, input_type, outputfile, database, args=None, ncores=None, verbose=True):
""" Run blast aligment with Diamond.

Args:
Expand Down Expand Up @@ -58,7 +58,7 @@ def run_blast(inputfile, input_type, outputfile, database, args=None, verbose=Tr
cmd += ['-o', outputfile]

if not args:
args = "--more-sensitive --top 10 --quiet"
args = f"--more-sensitive --top 10 --quiet --threads {ncores}"

cmd += args.split()

Expand Down
11 changes: 9 additions & 2 deletions docs/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,15 @@ This can be combined with *-o* to change the output folder:

$ carve -r myfolder/*.faa -o mymodels/

To balance the number of concurrent samples run versus the number of cores allocated to each ,
specify the `--njobs` and `--ncores` argument. `--njobs` gets passed to `multiprocessing.Pool()`,
while `--ncores` gets passed to Diamond. these default to running in single-threaded mode for all available CPUs.
To instead run Diamond with 4 threads on a maximum of 2 concurrent samples, this be adjusted to :

.. code-block:: console

$ carve --ncores 4 --njobs 2 -r myfolder/*.faa -o mymodels/


Gap Filling
-----------
Expand Down Expand Up @@ -152,5 +161,3 @@ You can initialize the community with a pre-defined medium (just like during sin
.. code-block:: console

$ merge_community [input files] -i M9


45 changes: 36 additions & 9 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
# -*- coding: utf-8 -*-

"""The setup script."""

import os
from configparser import ConfigParser
from setuptools import setup, find_packages

with open('README.rst') as readme_file:
Expand All @@ -16,19 +17,20 @@
included_files = {
'carveme': [
'config.cfg',
'data/input/bigg_models.csv',
'data/input/bigg_models.tsv',
'data/input/biomass_db.tsv',
'data/input/manually_curated.csv',
'data/input/manually_curated.tsv',
'data/input/media_db.tsv',
'data/input/metabolomics_park2016.csv',
# 'data/input/metabolomics_park2016.csv', deleted 5cbc611af5aa265c39882f7a88bf357f3261b170
'data/input/unbalanced_metabolites.csv',
'data/input/bigg_proteins.faa',
'data/input/equilibrator_compounds.tsv.gz',
'data/generated/bigg_proteins.faa',
'data/input/mnx_compounds.tsv',
'data/input/refseq_release_201.tsv.gz',
'data/generated/bigg_gibbs.csv',
'data/generated/gene_annotations.tsv.gz',
# 'data/generated/bigg_gibbs.csv', # deleted c897f41d7d03c27ca12ecd9ee97337355338c378
'data/generated/bigg_gprs.csv.gz',
'data/generated/model_specific_data.csv.gz',
'data/generated/universe_draft.xml.gz',
'data/generated/bigg_universe.xml.gz',
'data/generated/universe_bacteria.xml.gz',
'data/generated/universe_grampos.xml.gz',
'data/generated/universe_gramneg.xml.gz',
Expand Down Expand Up @@ -70,6 +72,31 @@
'data/benchmark/results/essentiality.tsv',
]
}
missing_files = []
for path in included_files["carveme"]:
fullpath = os.path.join("carveme", path)
if not os.path.exists(fullpath):
missing_files.append(fullpath)
if missing_files:
print("files required for install are not found:\n")
print("\n".join(missing_files))
raise ValueError("missing files; exiting")

config = ConfigParser()
project_dir = "carveme"
config.read(os.path.join(project_dir, 'config.cfg'))
config_files = []
for chunk in ["input", "generated"]:
for k,v in config[chunk].items():
vpath = os.path.join(project_dir, v)
if k in ["folder", "diamond_db"]: continue
if not os.path.exists(vpath):
raise ValueError(f'file {vpath} not found')
elif v not in included_files["carveme"]:
raise ValueError(f'config file {vpath} not included in setup.py')
else:
config_files.append(v)



setup(
Expand Down Expand Up @@ -100,7 +127,7 @@
keywords='carveme',
classifiers=[
'Development Status :: 5 - Production/Stable',
'Environment :: Console',
'Environment :: Console',
'Intended Audience :: Science/Research',
'Topic :: Scientific/Engineering :: Bio-Informatics',
'Programming Language :: Python :: 3.6',
Expand Down