Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make_prg update PR series: 5. CLI changes #37

Merged
merged 5 commits into from
Nov 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions make_prg/subcommands/from_msa.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,17 @@ def register_parser(subparsers):
"Multiple sequence alignment file or a directory containing such files"
),
)
subparser_msa.add_argument(
leoisl marked this conversation as resolved.
Show resolved Hide resolved
"-s",
"--suffix",
action="store",
type=str,
default="",
help=(
"If the input parameter (-i, --input) is a directory, then filter for files with this suffix. "
"If this parameter is not given, all files in the input directory is considered."
),
)
subparser_msa.add_argument(
"-o",
"--output-prefix",
Expand Down Expand Up @@ -75,7 +86,7 @@ def register_parser(subparsers):
return subparser_msa


def get_all_input_files(input_path: str) -> List[Path]:
def get_all_input_files(input_path: str, suffix: str) -> List[Path]:
input_path = Path(input_path)
if not input_path.exists():
raise FileNotFoundError(f"{input_path} does not exist")
Expand All @@ -84,7 +95,7 @@ def get_all_input_files(input_path: str) -> List[Path]:
all_files = [input_path]
else:
all_files = [
path.resolve() for path in input_path.iterdir() if path.is_file()
path.resolve() for path in input_path.iterdir() if path.is_file() and path.name.endswith(suffix)
]
return all_files

Expand Down Expand Up @@ -132,7 +143,7 @@ def run(cl_options):
options = cl_options

logger.info("Getting input files...")
input_files = get_all_input_files(options.input)
input_files = get_all_input_files(options.input, options.suffix)

there_is_no_input_files = len(input_files) == 0
if there_is_no_input_files:
Expand Down
14 changes: 13 additions & 1 deletion make_prg/subcommands/update.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,18 @@ def check_if_is_update_file(argument: str):
"Filepath containing denovo sequences. Should point to a denovo_paths.txt file"
),
)
subparser_update_prg.add_argument(
"-D",
"--deletion-threshold",
dest="long_deletion_threshold",
action="store",
type=int,
default=10,
mbhall88 marked this conversation as resolved.
Show resolved Hide resolved
help=(
"Ignores long deletions of the given size or longer. If long deletions should not be ignored, "
"put a large value. Default: %(default)d"
),
)
subparser_update_prg.add_argument(
"-m",
"--mafft",
Expand Down Expand Up @@ -163,7 +175,7 @@ def run(cl_options):
prg_builder_zip_db = PrgBuilderZipDatabase(options.update_DS)
prg_builder_zip_db.load()
logger.info(f"Reading {options.denovo_paths}...")
denovo_variants_db = DenovoVariantsDB(options.denovo_paths)
denovo_variants_db = DenovoVariantsDB(options.denovo_paths, options.long_deletion_threshold)
update_shared_data = UpdateSharedData(denovo_variants_db, mafft_aligner)

output_dir = Path(options.output_prefix).parent
Expand Down
54 changes: 38 additions & 16 deletions make_prg/update/denovo_variants.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,27 +12,34 @@
from pathlib import Path
from make_prg.utils.misc import remove_duplicated_consecutive_elems_from_list
from dataclasses import dataclass
import sys


class DenovoError(Exception):
pass


class TooLongDeletion(Exception):
pass


class DenovoVariant:
"""
Represents a denovo variant in a denovo_paths.txt file, e.g.: "44 C T"
"""
def __init__(self, start_index_in_linear_path: int, ref: str, alt: str,
ml_path_nodes_it_goes_through: Optional[List[MLPathNode]] = None):
DenovoVariant._param_checking(start_index_in_linear_path, ref, alt)
ml_path_nodes_it_goes_through: Optional[List[MLPathNode]] = None,
long_deletion_threshold: int = sys.maxsize):
DenovoVariant._param_checking(start_index_in_linear_path, ref, alt, long_deletion_threshold)
self.start_index_in_linear_path: int = start_index_in_linear_path
self.end_index_in_linear_path: int = start_index_in_linear_path + len(ref)
self.ref: str = ref
self.alt: str = alt
self.set_ml_path_nodes_it_goes_through(ml_path_nodes_it_goes_through)
self.long_deletion_threshold = long_deletion_threshold

@staticmethod
def _param_checking(start_index_in_linear_path: int, ref: str, alt: str):
def _param_checking(start_index_in_linear_path: int, ref: str, alt: str, long_deletion_threshold: int):
DenovoVariant._check_sequence_is_composed_of_ACGT_only(ref)
DenovoVariant._check_sequence_is_composed_of_ACGT_only(alt)
not_a_variant = ref == alt
Expand All @@ -43,6 +50,11 @@ def _param_checking(start_index_in_linear_path: int, ref: str, alt: str):
if negative_index_for_variant_pos:
raise DenovoError(f"Found a negative index for variant pos ({start_index_in_linear_path})")

deletion_size = len(ref) - len(alt)
mbhall88 marked this conversation as resolved.
Show resolved Hide resolved
is_a_too_long_deletion = deletion_size >= long_deletion_threshold
if is_a_too_long_deletion:
raise TooLongDeletion(f"Variant has a too long deletion (delta = {deletion_size}) that should be ignored")

@staticmethod
def _check_sequence_is_composed_of_ACGT_only(seq: str):
sequence_is_composed_of_ACGT_only = all([base in "ACGT" for base in seq])
Expand Down Expand Up @@ -141,12 +153,17 @@ def _split_variant_at_boundary_alignment(self, ref_alignment: Deque[str], alt_al
sub_alt_seq = "".join(sub_alt)
sub_ref_and_alt_are_different = sub_ref_seq != sub_alt_seq
if sub_ref_and_alt_are_different:
split_variant = DenovoVariant(
current_start_in_linear_path, sub_ref_seq, sub_alt_seq
)
ml_path_nodes_the_split_variant_goes_through = [ml_path_node] * len(sub_ref_seq)
split_variant.set_ml_path_nodes_it_goes_through(ml_path_nodes_the_split_variant_goes_through)
split_variants.append(split_variant)
try:
split_variant = DenovoVariant(
current_start_in_linear_path, sub_ref_seq, sub_alt_seq,
long_deletion_threshold=self.long_deletion_threshold
)
ml_path_nodes_the_split_variant_goes_through = [ml_path_node] * len(sub_ref_seq)
split_variant.set_ml_path_nodes_it_goes_through(ml_path_nodes_the_split_variant_goes_through)
split_variants.append(split_variant)
logger.debug(f"Split variant to be applied: {split_variant}")
except TooLongDeletion as error:
logger.warning(f"Ignoring split variant: {error}")

return split_variants

Expand Down Expand Up @@ -347,7 +364,7 @@ def _read_nb_of_variants(filehandler: TextIO) -> int:
return nb_of_variants

@staticmethod
def _read_DenovoVariant(filehandler: TextIO) -> DenovoVariant:
def _read_DenovoVariant(filehandler: TextIO, long_deletion_threshold: int = sys.maxsize) -> DenovoVariant:
line = filehandler.readline().strip("\n")
line_split = line.split("\t")

Expand All @@ -358,18 +375,22 @@ def _read_DenovoVariant(filehandler: TextIO) -> DenovoVariant:
start_index_in_linear_path=start_index_in_linear_path,
ref=ref,
alt=alt,
long_deletion_threshold=long_deletion_threshold
)

logger.trace(f"Read variant: {denovo_variant}")
logger.debug(f"Read variant: {denovo_variant}")
return denovo_variant

@classmethod
def _read_variants(cls, filehandler) -> List[DenovoVariant]:
def _read_variants(cls, filehandler: TextIO, long_deletion_threshold: int = sys.maxsize) -> List[DenovoVariant]:
nb_of_variants = cls._read_nb_of_variants(filehandler)
variants = []
for _ in range(nb_of_variants):
denovo_variant = cls._read_DenovoVariant(filehandler)
variants.append(denovo_variant)
try:
denovo_variant = cls._read_DenovoVariant(filehandler, long_deletion_threshold)
variants.append(denovo_variant)
except TooLongDeletion as error:
logger.warning(f"Ignoring variant: {error}")
return variants

def _get_locus_name_to_denovo_loci_core(self, filehandler: TextIO) -> Dict[str, List[DenovoLocusInfo]]:
Expand All @@ -388,7 +409,7 @@ def _get_locus_name_to_denovo_loci_core(self, filehandler: TextIO) -> Dict[str,
for locus_index in range(nb_of_loci_in_sample):
locus = self._read_locus(filehandler)
ml_path = self._read_ml_path(filehandler)
variants = self._read_variants(filehandler)
variants = self._read_variants(filehandler, self.long_deletion_threshold)
denovo_locus = DenovoLocusInfo(sample, locus, ml_path, variants)
locus_name_to_denovo_loci[locus].append(denovo_locus)

Expand All @@ -409,8 +430,9 @@ def _get_locus_name_to_update_data(
locus_name_to_update_data[locus_name].extend(update_data)
return locus_name_to_update_data

def __init__(self, filepath: str):
def __init__(self, filepath: str, long_deletion_threshold: int = sys.maxsize):
self.filepath: Path = Path(filepath)
self.long_deletion_threshold = long_deletion_threshold
locus_name_to_denovo_loci = self._get_locus_name_to_denovo_loci()
self.locus_name_to_update_data = self._get_locus_name_to_update_data(
locus_name_to_denovo_loci
Expand Down