diff --git a/src/dynapyt/run_analysis.py b/src/dynapyt/run_analysis.py index e63db94..9e596a3 100644 --- a/src/dynapyt/run_analysis.py +++ b/src/dynapyt/run_analysis.py @@ -3,20 +3,31 @@ import importlib from os.path import abspath from tempfile import gettempdir -from shutil import rmtree import sys from pathlib import Path from . import runtime as _rt def run_analysis( - entry: str, analyses: List[str], name: str = None, coverage: bool = False + entry: str, + analyses: List[str], + name: str = None, + coverage: bool = False, + coverage_dir: str = None, ): - coverage_dir = Path(gettempdir()) / "dynapyt_coverage" + global _rt + _rt = importlib.reload(_rt) + if coverage: - coverage_dir.mkdir(exist_ok=True) + if coverage_dir is None: + coverage_path = Path(gettempdir()) / "dynapyt_coverage" + coverage_path.mkdir(exist_ok=True) + else: + coverage_path = Path(coverage) + coverage_path.mkdir(exist_ok=True) + _rt.set_coverage(coverage_path) else: - rmtree(str(coverage_dir), ignore_errors=True) + _rt.set_coverage(None) analyses_file = Path(gettempdir()) / "dynapyt_analyses.txt" if analyses_file.exists(): @@ -38,6 +49,8 @@ def run_analysis( globals_dict["__file__"] = entry_full_path exec(open(entry_full_path).read(), globals_dict) else: + if importlib.util.find_spec(entry) is None: + raise ValueError(f"Could not find entry {entry}") importlib.import_module(entry) _rt.end_execution() diff --git a/src/dynapyt/runtime.py b/src/dynapyt/runtime.py index 89d87c9..844391d 100644 --- a/src/dynapyt/runtime.py +++ b/src/dynapyt/runtime.py @@ -15,20 +15,19 @@ analyses = None covered = None +coverage_path = None current_file = None end_execution_called = False def end_execution(): - global covered, end_execution_called + global covered, coverage_path, end_execution_called if end_execution_called: return end_execution_called = True call_if_exists("end_execution") if covered is not None: - coverage_file = ( - Path(tempfile.gettempdir()) / "dynapyt_coverage" / "covered.jsonl" - ) + coverage_file = coverage_path / "covered.jsonl" with FileLock(f"{str(coverage_file)}.lock"): if coverage_file.exists(): existing_coverage = {} @@ -59,15 +58,25 @@ def end_execution(): def set_analysis(new_analyses: List[Any]): global analyses, covered - if analyses is None: - analyses = [] - coverage_dir = Path(tempfile.gettempdir()) / "dynapyt_coverage" - if coverage_dir.exists(): - covered = {} - signal.signal(signal.SIGINT, end_execution) - signal.signal(signal.SIGTERM, end_execution) - atexit.register(end_execution) - analyses = load_analyses(new_analyses) + analyses = [] + signal.signal(signal.SIGINT, end_execution) + signal.signal(signal.SIGTERM, end_execution) + atexit.register(end_execution) + analyses = load_analyses(new_analyses) + + +def set_coverage(coverage_path: Path): + if coverage_path is not None: + global covered + covered = {} + coverage_path.mkdir(exist_ok=True) + coverage_file = coverage_path / "covered.jsonl" + if coverage_file.exists(): + with open(str(coverage_file), "r") as f: + content = f.read().splitlines() + for c in content: + tmp = json.loads(c) + covered.update(tmp) def filtered(func, f, args): diff --git a/tests/run_single_test.py b/tests/run_single_test.py index 1e3ed08..4baceee 100644 --- a/tests/run_single_test.py +++ b/tests/run_single_test.py @@ -68,22 +68,19 @@ def test_runner(directory_pair: Tuple[str, str], capsys): instrument_file(join(abs_dir, "__init__.py"), selected_hooks) # analyze - # class_ = getattr(module, "TestAnalysis") - analysis_instances = [class_[1]() for class_ in analysis_classes] - rt.analyses = None - rt.set_analysis(analysis_instances) captured = capsys.readouterr() # clear stdout # print(f"Before analysis: {captured.out}") # for debugging purposes - for analysis_instance in analysis_instances: - if hasattr(analysis_instance, "begin_execution"): - analysis_instance.begin_execution() if run_as_file: run_analysis(program_file, [f"{module_prefix}.analysis.TestAnalysis"]) else: - import_module(f"{module_prefix}.program") - for analysis_instance in analysis_instances: - if hasattr(analysis_instance, "end_execution"): - analysis_instance.end_execution() + run_analysis( + f"{module_prefix}.program", + [ + f"{module_prefix}.analysis.{ac[0]}" + for ac in analysis_classes + if not ac[0].endswith("BaseAnalysis") + ], + ) # check output expected_file = join(abs_dir, "expected.txt")