diff --git a/related-packages/fileformats-extras/fileformats/extras/medimage_fsl/__init__.py b/related-packages/fileformats-extras/fileformats/extras/medimage_fsl/__init__.py index 5ee470b..d96e129 100644 --- a/related-packages/fileformats-extras/fileformats/extras/medimage_fsl/__init__.py +++ b/related-packages/fileformats-extras/fileformats/extras/medimage_fsl/__init__.py @@ -9,5 +9,5 @@ @FileSet.generate_sample_data.register -def gen_sample_con_data(con: Con, dest_dir: Path, seed: ty.Union[int, Random], stem: ty.Optional[str]): +def gen_sample_con_data(con: Con, dest_dir: Path, seed: ty.Union[int, Random] = 0, stem: ty.Optional[str] = None) -> ty.Iterable[Path]: raise NotImplementedError diff --git a/tools/__init__.py b/tools/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/tools/converter.py b/tools/converter.py deleted file mode 100644 index eca5cc1..0000000 --- a/tools/converter.py +++ /dev/null @@ -1,607 +0,0 @@ -from attr import has -from ast import literal_eval -from nipype.interfaces import fsl -from nipype.interfaces.base import traits_extension -from pydra.engine import specs -from pydra.engine.helpers import ensure_list - -import os, sys, yaml, black, imp -import traits -from pathlib import Path -import typing as ty -import inspect -import click -import warnings -import functools - -sys.path.append(str(Path(__file__).resolve().parent.parent / 'specs')) -import callables - - -class FSLConverter: - INPUT_KEYS = [ - "allowed_values", - "argstr", - "container_path", - "copyfile", - "desc", - "mandatory", - "position", - "requires", - "sep", - "xor", - ] - OUTPUT_KEYS = ["desc"] - NAME_MAPPING = {"desc": "help_string"} - - TRAITS_IRREL = [ - 'output_type', - 'args', - 'environ', - 'environ_items', - '__all__', - 'trait_added', - 'trait_modified', - ] - - TYPE_REPLACE = [ - ("\'File\'", "specs.File"), - ("\'bool\'", "bool"), - ("\'str\'", "str"), - ("\'Any\'", "ty.Any"), - ("\'int\'", "int"), - ("\'float\'", "float"), - ("\'list\'", "list"), - ("\'dict\'", "dict"), - ("\'MultiInputObj\'", "specs.MultiInputObj"), - ("\'MultiOutputObj\'", "specs.MultiOutputObj"), - ("\'MultiInputFile\'", "specs.MultiInputFile"), - ("\'MultiOutputFile\'", "specs.MultiOutputFile"), - ] - - def __init__(self, interface_name, interface_spec_file): - self.interface_name = interface_name - with interface_spec_file.open() as f: - self.interface_spec = yaml.safe_load(f)[self.interface_name] - if self.interface_spec.get("output_requirements") is None: - self.interface_spec["output_requirements"] = [] - if self.interface_spec.get("inputs_metadata") is None: - self.interface_spec["inputs_metadata"] = {} - if self.interface_spec.get("inputs_drop") is None: - self.interface_spec["inputs_drop"] = [] - if self.interface_spec.get("output_templates") is None: - self.interface_spec["output_templates"] = {} - if self.interface_spec.get("output_callables") is None: - self.interface_spec["output_callables"] = {} - if ( - not self.interface_spec["output_callables"] - .keys() - .isdisjoint(self.interface_spec["output_templates"].keys()) - ): - raise Exception("output_callables and output_templates have the same keys") - if self.interface_spec.get("doctest") is None: - self.interface_spec["doctest"] = {} - - # getting input/output spec from nipype - nipype_interface = getattr(fsl, self.interface_name) - self.cmd = nipype_interface._cmd - self.nipype_input_spec = nipype_interface.input_spec() - self.nipype_output_spec = nipype_interface.output_spec() - - def pydra_specs(self, write=False, dirname=None): - """creating pydra input/output spec from nipype specs - if write is True, a pydra Task class will be written to the file together with tests - """ - input_fields_pdr, inp_templates = self.convert_input_fields() - output_fields_pdr = self.convert_output_spec(fields_from_template=inp_templates) - - input_spec_pydra = specs.SpecInfo( - name="Input", fields=input_fields_pdr, bases=(specs.ShellSpec,) - ) - output_spec_pydra = specs.SpecInfo( - name="Output", fields=output_fields_pdr, bases=(specs.ShellOutSpec,) - ) - - if write: - if dirname is None: - raise Exception("dirname has to be provided if write is True") - self.write_pydra_files( - dirname=dirname, - pydra_input_spec=input_fields_pdr, - pydra_output_spec=output_fields_pdr, - ) - return input_spec_pydra, output_spec_pydra - - def write_pydra_files(self, dirname, pydra_input_spec, pydra_output_spec): - """writing pydra task and tests to the files""" - testdir = dirname / "tests" - testdir.mkdir(parents=True, exist_ok=True) - Path.touch(dirname / "__init__.py") - Path.touch(testdir / "__init__.py") - filename = dirname / f"{self.interface_name.lower()}.py" - filename_test = testdir / f"test_spec_{filename.name}" - filename_test_run = testdir / f"test_run_{filename.name}" - - print("\n FILENAME", filename) - self.write_task(filename, pydra_input_spec, pydra_output_spec) - - self.write_test(filename_test=filename_test) - self.write_test(filename_test=filename_test_run, run=True) - - def write_task(self, filename, input_fields, output_fields): - """writing pydra task to the dile based on the input and output spec""" - - def types_to_names(spec_fields): - spec_fields_str = [] - for el in spec_fields: - el = list(el) - try: - el[1] = el[1].__name__ - except AttributeError: - el[1] = el[1]._name - spec_fields_str.append(tuple(el)) - return spec_fields_str - - input_fields_str = types_to_names(spec_fields=input_fields) - output_fields_str = types_to_names(spec_fields=output_fields) - functions_str = self.function_callables() - spec_str = "from pydra.engine import specs \nfrom pydra import ShellCommandTask \n" - spec_str += f"import typing as ty\n" - spec_str += functions_str - spec_str += f"input_fields = {input_fields_str}\n" - spec_str += f"{self.interface_name}_input_spec = specs.SpecInfo(name='Input', fields=input_fields, bases=(specs.ShellSpec,))\n\n" - spec_str += f"output_fields = {output_fields_str}\n" - spec_str += f"{self.interface_name}_output_spec = specs.SpecInfo(name='Output', fields=output_fields, bases=(specs.ShellOutSpec,))\n\n" - - spec_str += f"class {self.interface_name}(ShellCommandTask):\n" - if self.interface_spec["doctest"]: - spec_str += self.create_doctest() - spec_str += f" input_spec = {self.interface_name}_input_spec\n" - spec_str += f" output_spec = {self.interface_name}_output_spec\n" - spec_str += f" executable='{self.cmd}'\n" - - for tp_repl in self.TYPE_REPLACE: - spec_str = spec_str.replace(*tp_repl) - - spec_str_black = black.format_file_contents(spec_str, fast=False, mode=black.FileMode()) - - with open(filename, "w") as f: - f.write(spec_str_black) - - def write_test(self, filename_test, run=False): - """writing tests for the specific interface based on the test spec (from interface_spec) - if run is True the test contains task run, - if run is False only the spec is check by the test - """ - tests_inputs = self.interface_spec["tests_inputs"] - tests_outputs = self.interface_spec["tests_outputs"] - if len(tests_inputs) != len(tests_outputs): - raise Exception("tests and tests_outputs should have the same length") - - tests_inp_outp = [] - tests_inp_error = [] - for i, out in enumerate(tests_outputs): - if isinstance(out, list): - tests_inp_outp.append((tests_inputs[i], out)) - elif out is None: - tests_inp_outp.append((tests_inputs[i], [])) - # allowing for incomplete or incorrect inputs that should raise an exception - elif out not in ["AttributeError", "Exception"]: - tests_inp_outp.append((tests_inputs[i], [out])) - else: - tests_inp_error.append((tests_inputs[i], out)) - - spec_str = f"import re, os, shutil, pytest \nfrom pathlib import Path\n" - spec_str += f"from ..{self.interface_name.lower()} import {self.interface_name} \n\n" - if run: - spec_str += ( - "@pytest.mark.xfail('FSLDIR' not in os.environ, reason='no FSL found', " - "raises=FileNotFoundError)\n" - ) - spec_str += f"@pytest.mark.parametrize('inputs, outputs', {tests_inp_outp})\n" - spec_str += f"def test_{self.interface_name}(test_data, inputs, outputs):\n" - spec_str += f" if inputs is None:\n" - spec_str += f" in_file = Path(test_data) / 'test.nii.gz'\n" - spec_str += f" task = {self.interface_name}(in_file=in_file)\n" - spec_str += f" else:\n" - spec_str += f" for key, val in inputs.items():\n" - spec_str += f" try: \n" - spec_str += f" pattern = r'\.[a-zA-Z]*'\n" - spec_str += f" if isinstance(val, str):\n" - spec_str += f" if re.findall(pattern, val) != []:\n" - spec_str += f" inputs[key] = Path(test_data) / val\n" - spec_str += f" elif '_dir' in key:\n" - spec_str += f" dirpath = Path(test_data) / val\n" - spec_str += f" if dirpath.exists() and dirpath.is_dir():\n" - spec_str += f" shutil.rmtree(dirpath)\n" - spec_str += f" inputs[key] = Path(test_data) / val\n" - spec_str += f" else: inputs[key] = eval(val)\n" - spec_str += f" elif isinstance(val, list):\n" - spec_str += f" if all (re.findall(pattern, _) != [] for _ in val):\n" - spec_str += f" inputs[key] = [Path(test_data)/_ for _ in val] \n" - spec_str += f" else: inputs[key] = eval(val)\n" - spec_str += f" except: pass\n" - spec_str += f" task = {self.interface_name}(**inputs)\n" - spec_str += ( - f" assert set(task.generated_output_names) == " - f"set(['return_code', 'stdout', 'stderr'] + outputs)\n" - ) - - if run: - spec_str += f" res = task()\n" - spec_str += f" print('RESULT: ', res)\n" - spec_str += f" for out_nm in outputs:\n" - spec_str += f" if isinstance(getattr(res.output, out_nm), list): assert [os.path.exists(x) for x in getattr(res.output, out_nm)]\n" - spec_str += f" else: assert os.path.exists(getattr(res.output, out_nm))\n" - - # if test_inp_error is not empty, than additional test function will be created - if tests_inp_error: - spec_str += self.write_test_error(input_error=tests_inp_error) - - spec_str_black = black.format_file_contents(spec_str, fast=False, mode=black.FileMode()) - - with open(filename_test, "w") as f: - f.write(spec_str_black) - - def write_test_error(self, input_error): - """creating a tests for incorrect or incomplete inputs - checking if the exceptions are raised - """ - spec_str = "\n\n" - spec_str += f"@pytest.mark.parametrize('inputs, error', {input_error})\n" - spec_str += f"def test_{self.interface_name}_exception(test_data, inputs, error):\n" - spec_str += f" if inputs is None:\n" - spec_str += f" in_file = Path(test_data) / 'test.nii.gz'\n" - spec_str += f" task = {self.interface_name}(in_file=in_file)\n" - spec_str += f" else:\n" - spec_str += f" for key, val in inputs.items():\n" - spec_str += f" try: \n" - spec_str += f" pattern = r'\.[a-zA-Z]*'\n" - spec_str += f" if isinstance(val, str):\n" - spec_str += f" if re.findall(pattern, val) != []:\n" - spec_str += f" inputs[key] = Path(test_data) / val\n" - spec_str += f" elif '_dir' in key:\n" - spec_str += f" dirpath = Path(test_data) / val\n" - spec_str += f" if dirpath.exists() and dirpath.is_dir():\n" - spec_str += f" shutil.rmtree(dirpath)\n" - spec_str += f" inputs[key] = Path(test_data) / val\n" - spec_str += f" else: inputs[key] = eval(val)\n" - spec_str += f" elif isinstance(val, list):\n" - spec_str += f" if all (re.findall(pattern, _) != [] for _ in val):\n" - spec_str += f" inputs[key] = [Path(test_data)/_ for _ in val] \n" - spec_str += f" else: inputs[key] = eval(val)\n" - spec_str += f" except: pass\n" - spec_str += f" task = {self.interface_name}(**inputs)\n" - spec_str += f" with pytest.raises(eval(error)):\n" - spec_str += f" task.generated_output_names\n" - - return spec_str - - def create_doctest(self): - """adding doctests to the interfaces""" - cmdline = self.interface_spec["doctest"].pop("cmdline") - doctest = ' """\n Example\n -------\n' - doctest += f' >>> task = {self.interface_name}()\n' - for key, val in self.interface_spec["doctest"].items(): - if type(val) is str: - doctest += f' >>> task.inputs.{key} = "{val}"\n' - else: - doctest += f' >>> task.inputs.{key} = {val}\n' - doctest += ' >>> task.cmdline\n' - doctest += f" '{cmdline}'" - doctest += '\n """\n' - return doctest - - def convert_input_fields(self): - """creating fields list for pydra input spec""" - fields_pdr_dict = {} - position_dict = {} - has_template = [] - for name, fld in self.nipype_input_spec.traits().items(): - if name in self.TRAITS_IRREL: - continue - if name in self.interface_spec["inputs_drop"]: - continue - fld_pdr, pos = self.pydra_fld_input(fld, name) - meta_pdr = fld_pdr[-1] - if "output_file_template" in meta_pdr: - has_template.append(name) - fields_pdr_dict[name] = (name,) + fld_pdr - if pos is not None: - position_dict[name] = pos - - fields_pdr_l = list(fields_pdr_dict.values()) - return fields_pdr_l, has_template - - def pydra_fld_input(self, field, nm): - """converting a single nipype field to one element of fields for pydra input_spec""" - tp_pdr = self.pydra_type_converter(field, spec_type="input", name=nm) - if nm in self.interface_spec["inputs_metadata"]: - metadata_extra_spec = self.interface_spec["inputs_metadata"][nm] - else: - metadata_extra_spec = {} - - if "default" in metadata_extra_spec: - default_pdr = metadata_extra_spec.pop("default") - elif getattr(field, "usedefault") and field.default is not traits.ctrait.Undefined: - default_pdr = field.default - else: - default_pdr = None - - metadata_pdr = {"help_string": ""} - for key in self.INPUT_KEYS: - key_nm_pdr = self.NAME_MAPPING.get(key, key) - val = getattr(field, key) - if val is not None: - if key == "argstr" and "%" in val: - val = self.string_formats(argstr=val, name=nm) - metadata_pdr[key_nm_pdr] = val - - if getattr(field, "name_template"): - template = getattr(field, "name_template") - name_source = ensure_list(getattr(field, "name_source")) - - metadata_pdr["output_file_template"] = self.string_formats( - argstr=template, name=name_source[0] - ) - if tp_pdr in [specs.File, specs.Directory]: - tp_pdr = str - elif getattr(field, "genfile"): - if nm in self.interface_spec["output_templates"]: - if isinstance(self.interface_spec["output_templates"][nm], list): - metadata_pdr["output_file_template"] = self.interface_spec["output_templates"][ - nm - ][0] - else: - metadata_pdr["output_file_template"] = self.interface_spec["output_templates"][ - nm - ] - if tp_pdr in [ - specs.File, - specs.Directory, - ]: # since this is a template, the file doesn't exist - tp_pdr = str - elif nm not in self.interface_spec["output_callables"]: - raise Exception( - f"the filed {nm} has genfile=True, but no output template or callables provided" - ) - - metadata_pdr.update(metadata_extra_spec) - - pos = metadata_pdr.get("position", None) - - if default_pdr is not None and not metadata_pdr.get("mandatory", None): - return (tp_pdr, default_pdr, metadata_pdr), pos - else: - return (tp_pdr, metadata_pdr), pos - - def convert_output_spec(self, fields_from_template): - """creating fields list for pydra output spec""" - fields_pdr_l = [] - for name, fld in self.nipype_output_spec.traits().items(): - if ( - name in self.interface_spec["output_requirements"] - and name not in fields_from_template - ): - fld_pdr = self.pydra_fld_output(fld, name) - fields_pdr_l.append((name,) + fld_pdr) - return fields_pdr_l - - def pydra_fld_output(self, field, nm): - """converting a single nipype field to one element of fields for pydra output_spec""" - tp_pdr = self.pydra_type_converter(field, spec_type="output", name=nm) - - metadata_pdr = {} - for key in self.OUTPUT_KEYS: - key_nm_pdr = self.NAME_MAPPING.get(key, key) - val = getattr(field, key) - if val: - metadata_pdr[key_nm_pdr] = val - - if self.interface_spec["output_requirements"][nm]: - if all( - [isinstance(el, list) for el in self.interface_spec["output_requirements"][nm]] - ): - requires_l = self.interface_spec["output_requirements"][nm] - nested_flag = True - elif all( - [ - isinstance(el, (str, dict)) - for el in self.interface_spec["output_requirements"][nm] - ] - ): - requires_l = [self.interface_spec["output_requirements"][nm]] - nested_flag = False - else: - Exception("has to be either list of list or list of str/dict") - - metadata_pdr["requires"] = [] - for requires in requires_l: - requires_mod = [] - for el in requires: - if isinstance(el, str): - requires_mod.append(el) - elif isinstance(el, dict): - requires_mod += list(el.items()) - metadata_pdr["requires"].append(requires_mod) - if nested_flag is False: - metadata_pdr["requires"] = metadata_pdr["requires"][0] - - if nm in self.interface_spec["output_templates"]: - if isinstance(self.interface_spec["output_templates"][nm], list): - metadata_pdr["output_file_template"] = self.interface_spec["output_templates"][nm][ - 0 - ] - else: - metadata_pdr["output_file_template"] = self.interface_spec["output_templates"][nm] - elif nm in self.interface_spec["output_callables"]: - metadata_pdr["callable"] = self.interface_spec["output_callables"][nm] - return (tp_pdr, metadata_pdr) - - def function_callables(self): - fun_names = [] - if not self.interface_spec["output_callables"]: - if self.interface_spec["output_templates"]: - tmpls = list(self.interface_spec["output_templates"].values()) - for tmpl in tmpls: - if isinstance(tmpl, list): - fun_names.append(tmpl[0]) - if len(fun_names) < 1: - pass - python_functions_spec = Path(os.path.dirname(__file__)) / "../specs/callables.py" - if not python_functions_spec.exists(): - raise Exception( - "specs/callables.py file is needed if functions are used in the spec files" - ) - - fun_names.extend(list(set(self.interface_spec["output_callables"].values()))) - fun_names.sort() - fun_str = "" - for fun_nm in fun_names: - fun = getattr(callables, fun_nm) - fun_str += inspect.getsource(fun) + "\n" - return fun_str - - def pydra_type_converter(self, field, spec_type, name): - """converting types to types used in pydra""" - if spec_type not in ["input", "output"]: - raise Exception(f"spec_type has to be input or output, but {spec_type} provided") - tp = field.trait_type - if isinstance(tp, traits.trait_types.Int): - tp_pdr = int - elif isinstance(tp, traits.trait_types.Float): - tp_pdr = float - elif isinstance(tp, traits.trait_types.Str): - tp_pdr = str - elif isinstance(tp, traits.trait_types.Bool): - tp_pdr = bool - elif isinstance(tp, traits.trait_types.Dict): - tp_pdr = dict - elif isinstance(tp, traits_extension.InputMultiObject): - if isinstance(field.inner_traits[0].trait_type, traits_extension.File): - tp_pdr = specs.MultiInputFile - else: - tp_pdr = specs.MultiInputObj - elif isinstance(tp, traits_extension.OutputMultiObject): - if isinstance(field.inner_traits[0].trait_type, traits_extension.File): - tp_pdr = specs.MultiOutputFile - else: - tp_pdr = specs.MultiOutputObj - elif isinstance(tp, traits_extension.InputMultiPath): - if isinstance(field.inner_traits[0].trait_type, traits_extension.File): - tp_pdr = specs.MultiInputFile - else: - tp_pdr = specs.MultiInputObj - elif isinstance(tp, traits_extension.OutputMultiPath): - if isinstance(field.inner_traits[0].trait_type, traits_extension.File): - tp_pdr = specs.MultiOutputFile - else: - tp_pdr = specs.MultiOutputObj - elif isinstance(tp, traits.trait_types.List): - if isinstance(field.inner_traits[0].trait_type, traits_extension.File): - if spec_type == "input": - tp_pdr = specs.MultiInputFile - else: - tp_pdr = specs.MultiOutputFile - else: - tp_pdr = list - elif isinstance(tp, traits_extension.File): - if ( - spec_type == "output" or tp.exists is True - ): # TODO check the hash_file metadata in nipype - tp_pdr = specs.File - else: - tp_pdr = str - else: - tp_pdr = ty.Any - return tp_pdr - - def string_formats(self, argstr, name): - import re - - if "%s" in argstr: - argstr_new = argstr.replace("%s", f"{{{name}}}") - elif "%d" in argstr: - argstr_new = argstr.replace("%d", f"{{{name}}}") - elif "%f" in argstr: - argstr_new = argstr.replace("%f", f"{{{name}}}") - elif "%g" in argstr: - argstr_new = argstr.replace("%g", f"{{{name}}}") - elif len(re.findall("%[0-9.]+f", argstr)) == 1: - old_format = re.findall("%[0-9.]+f", argstr)[0] - argstr_new = argstr.replace(old_format, f"{{{name}:{old_format[1:]}}}") - else: - raise Exception(f"format from {argstr} is not supported TODO") - return argstr_new - - -FSL_MODULES = ['aroma', 'dti', 'epi', 'fix', 'maths', 'model', 'possum', 'preprocess', 'utils'] - - -@click.command() -@click.option( - "-i", - "--interface_name", - required=True, - default="all", - help="name of the interface (name used in Nipype, e.g. BET) or all (default)" - "if all is used all interfaces from the spec file will be created", -) -@click.option( - "-m", "--module_name", required=True, help=f"name of the module from the list {FSL_MODULES}" -) -def create_pydra_spec(interface_name, module_name): - if module_name not in FSL_MODULES: - raise Exception( - f"module name {module_name} not available;" f"should be from the list {FSL_MODULES}" - ) - - spec_file = Path(os.path.dirname(__file__)) / f"../specs/fsl_{module_name}_param.yml" - if not spec_file.exists(): - raise Exception( - f"the specification file doesn't exist for the module {module_name}," - f"create the specification file in {spec_file.parent}" - ) - - @functools.lru_cache() - def all_interfaces(module): - nipype_module = getattr(fsl, module) - all_specs = [el for el in dir(nipype_module) if "InputSpec" in el] - all_interf = [el.replace("InputSpec", "") for el in all_specs] - - # interfaces in the spec file - with open(spec_file) as f: - spec_interf = yaml.safe_load(f).keys() - - if set(all_interf) - set(spec_interf): - warnings.warn( - f"some interfaces are not in the spec file: " - f"{set(all_interf) - set(spec_interf)}, " - f"and pydra interfaces will not be created for them" - ) - return spec_interf - - if interface_name == "all": - interface_list = all_interfaces(module_name) - elif interface_name in all_interfaces(module_name): - interface_list = [interface_name] - else: - raise Exception( - f"interface_name has to be 'all' " - f"or a name from the list {all_interfaces(module_name)}" - ) - - dirname_interf = Path(__file__).parent.parent / f"pydra/tasks/fsl/{module_name}" - dirname_interf.mkdir(exist_ok=True) - - for interface_el in interface_list: - converter = FSLConverter( - interface_name=interface_el, - interface_spec_file=Path(__file__).parent.parent - / f"specs/fsl_{module_name}_param.yml", - ) - converter.pydra_specs(write=True, dirname=dirname_interf) - - -if __name__ == '__main__': - create_pydra_spec() diff --git a/tools/data_tests/test.nii.gz b/tools/data_tests/test.nii.gz deleted file mode 100644 index 3424c44..0000000 --- a/tools/data_tests/test.nii.gz +++ /dev/null @@ -1,3 +0,0 @@ -version https://git-lfs.github.com/spec/v1 -oid sha256:ffd0a1a4abf91c63edb64a7add1978e1d990a8bf2fbc057fd93d2367f335c9d4 -size 10379696 diff --git a/tools/tests/test_converter.py b/tools/tests/test_converter.py deleted file mode 100644 index 0444726..0000000 --- a/tools/tests/test_converter.py +++ /dev/null @@ -1,92 +0,0 @@ -import pytest -import pydra -import os, imp -from pathlib import Path - -from ..converter import FSLConverter - -# TODO: rethink teh tests - - -@pytest.mark.skip() -def test_spec(tmpdir): - interface_name = "BET" - converter = FSLConverter(interface_name=interface_name) - input_spec_pydra, output_spec_pydra = converter.pydra_specs() - - in_file = Path(os.path.dirname(__file__)) / "data_tests/test.nii.gz" - out_file = Path(os.path.dirname(__file__)) / "data_tests/test_brain.nii.gz" - cmd = "bet" - - shelly = pydra.ShellCommandTask( - name="bet_task", executable=cmd, input_spec=input_spec_pydra, output_spec=output_spec_pydra - ) - shelly.inputs.in_file = in_file - assert shelly.inputs.executable == "bet" - assert shelly.cmdline == f"bet {in_file} {str(shelly.output_dir / 'test_brain.nii.gz')}" - res = shelly() - assert res.output.out_file.exists() - print("\n Result: ", res) - - shelly_mask = pydra.ShellCommandTask( - name="bet_task", executable=cmd, input_spec=input_spec_pydra, output_spec=output_spec_pydra - ) - shelly_mask.inputs.in_file = in_file - shelly_mask.inputs.mask = True - assert ( - shelly_mask.cmdline - == f"bet {in_file} {str(shelly_mask.output_dir / 'test_brain.nii.gz')} -m" - ) - res = shelly_mask() - assert res.output.out_file.exists() - assert res.output.mask_file.exists() - print("\n Result: ", res) - - -@pytest.mark.skip() -def test_spec_from_file(tmpdir): - interface_name = "BET" - converter = FSLConverter(interface_name=interface_name) - - dirname_spec = Path(tmpdir) - (dirname_spec / "tests").mkdir() - - _, _ = converter.pydra_specs(write=True, dirname=dirname_spec) - - imp.load_source("bet_module", str(dirname_spec / "bet.py")) - import bet_module as bm - - in_file = Path(os.path.dirname(__file__)) / "data_tests/test.nii.gz" - - shelly = bm.BET(name="my_bet") - shelly.inputs.in_file = in_file - assert shelly.inputs.executable == "bet" - assert shelly.cmdline == f"bet {in_file} {str(shelly.output_dir / 'test_brain.nii.gz')}" - res = shelly() - assert res.output.out_file.exists() - print("\n Result: ", res) - - shelly_mask = bm.BET(name="my_bet") - shelly_mask.inputs.in_file = in_file - shelly_mask.inputs.mask = True - assert ( - shelly_mask.cmdline - == f"bet {in_file} {str(shelly_mask.output_dir / 'test_brain.nii.gz')} -m" - ) - res = shelly_mask() - assert res.output.out_file.exists() - assert res.output.mask_file.exists() - print("\n Result: ", res) - - shelly_surf = bm.BET(name="my_bet") - shelly_surf.inputs.in_file = in_file - shelly_surf.inputs.surfaces = True - assert ( - shelly_surf.cmdline - == f"bet {in_file} {str(shelly_surf.output_dir / 'test_brain.nii.gz')} -A" - ) - res = shelly_surf() - assert res.output.out_file.exists() - assert res.output.inskull_mask_file.exists() - assert res.output.skull_mask_file.exists() - print("\n Result: ", res)