diff --git a/.github/workflows/black.yaml b/.github/workflows/black.yaml index 533fd7c80..a9ebfdec7 100644 --- a/.github/workflows/black.yaml +++ b/.github/workflows/black.yaml @@ -1,5 +1,7 @@ name: black-action + on: [push, pull_request] + jobs: linter_name: name: runner / black diff --git a/.github/workflows/docs.yaml b/.github/workflows/docs.yaml index 5a84cc86b..d8d7b388d 100644 --- a/.github/workflows/docs.yaml +++ b/.github/workflows/docs.yaml @@ -1,8 +1,7 @@ -name: Pages -on: - push: - branches: - - master +name: Generate Pages + +on: [push, pull_request] + jobs: docs: runs-on: ubuntu-latest diff --git a/.github/workflows/publish.yaml b/.github/workflows/publish.yaml deleted file mode 100644 index e8ea1d679..000000000 --- a/.github/workflows/publish.yaml +++ /dev/null @@ -1,38 +0,0 @@ -# This workflow will upload a Python Package using Twine when a release is created -# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-python#publishing-to-package-registries - -# This workflow uses actions that are not certified by GitHub. -# They are provided by a third-party and are governed by -# separate terms of service, privacy policy, and support -# documentation. - -name: Upload Python Package - -on: - push: - branches: [ "master" ] - pull_request: - branches: [ "master" ] - -jobs: - deploy: - - runs-on: ubuntu-latest - - steps: - - uses: actions/checkout@v3 - - name: Set up Python - uses: actions/setup-python@v3 - with: - python-version: '3.x' - - name: Install dependencies - run: | - python -m pip install --upgrade pip - pip install build - - name: Build package - run: python -m build - - name: Publish package - uses: pypa/gh-action-pypi-publish@27b31702a0e7fc50959f5ad993c78deac1bdfc29 - with: - user: __token__ - password: ${{ secrets.PIPY_PASSWORD }} \ No newline at end of file diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 020ca3074..132ee4d28 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -1,7 +1,6 @@ name: Test -on: - push: +on: [push, pull_request] jobs: test: diff --git a/dacapo/apply.py b/dacapo/apply.py index b33cffe46..434002ef6 100644 --- a/dacapo/apply.py +++ b/dacapo/apply.py @@ -1,200 +1,13 @@ import logging -from typing import Optional -from funlib.geometry import Roi, Coordinate -import numpy as np -from dacapo.experiments.datasplits.datasets.arrays.array import Array -from dacapo.experiments.datasplits.datasets.dataset import Dataset -from dacapo.experiments.run import Run - -from dacapo.experiments.tasks.post_processors.post_processor_parameters import ( - PostProcessorParameters, -) -import dacapo.experiments.tasks.post_processors as post_processors -from dacapo.store.array_store import LocalArrayIdentifier -from dacapo.predict import predict -from dacapo.compute_context import LocalTorch, ComputeContext -from dacapo.experiments.datasplits.datasets.arrays import ZarrArray -from dacapo.store import ( - create_config_store, - create_weights_store, -) - -from pathlib import Path logger = logging.getLogger(__name__) -def apply( - run_name: str, - input_container: Path or str, - input_dataset: str, - output_path: Path or str, - validation_dataset: Optional[Dataset or str] = None, - criterion: Optional[str] = "voi", - iteration: Optional[int] = None, - parameters: Optional[PostProcessorParameters or str] = None, - roi: Optional[Roi or str] = None, - num_cpu_workers: int = 30, - output_dtype: Optional[np.dtype or str] = np.uint8, - compute_context: ComputeContext = LocalTorch(), - overwrite: bool = True, - file_format: str = "zarr", -): - """Load weights and apply a model to a dataset. If iteration is None, the best iteration based on the criterion is used. If roi is None, the whole input dataset is used.""" - if isinstance(output_dtype, str): - output_dtype = np.dtype(output_dtype) - - if isinstance(roi, str): - start, end = zip( - *[ - tuple(int(coord) for coord in axis.split(":")) - for axis in roi.strip("[]").split(",") - ] - ) - roi = Roi( - Coordinate(start), - Coordinate(end) - Coordinate(start), - ) - - assert (validation_dataset is not None and isinstance(criterion, str)) or ( - isinstance(iteration, int) - ), "Either validation_dataset and criterion, or iteration must be provided." - - # retrieving run - logger.info("Loading run %s", run_name) - config_store = create_config_store() - run_config = config_store.retrieve_run_config(run_name) - run = Run(run_config) - - # create weights store - weights_store = create_weights_store() - - # load weights - if iteration is None: - # weights_store._load_best(run, criterion) - iteration = weights_store.retrieve_best(run_name, validation_dataset, criterion) - logger.info("Loading weights for iteration %i", iteration) - weights_store.retrieve_weights(run, iteration) # shouldn't this be load_weights? - - # find the best parameters - if isinstance(validation_dataset, str): - val_ds_name = validation_dataset - validation_dataset = [ - dataset for dataset in run.datasplit.validate if dataset.name == val_ds_name - ][0] - logger.info("Finding best parameters for validation dataset %s", validation_dataset) - if parameters is None: - parameters = run.task.evaluator.get_overall_best_parameters( - validation_dataset, criterion - ) - assert ( - parameters is not None - ), "Unable to retieve parameters. Parameters must be provided explicitly." - - elif isinstance(parameters, str): - try: - post_processor_name = parameters.split("(")[0] - post_processor_kwargs = parameters.split("(")[1].strip(")").split(",") - post_processor_kwargs = { - key.strip(): value.strip() - for key, value in [arg.split("=") for arg in post_processor_kwargs] - } - for key, value in post_processor_kwargs.items(): - if value.isdigit(): - post_processor_kwargs[key] = int(value) - elif value.replace(".", "", 1).isdigit(): - post_processor_kwargs[key] = float(value) - except: - raise ValueError( - f"Could not parse parameters string {parameters}. Must be of the form 'post_processor_name(arg1=val1, arg2=val2, ...)'" - ) - try: - parameters = getattr(post_processors, post_processor_name)( - **post_processor_kwargs - ) - except Exception as e: - logger.error( - f"Could not instantiate post-processor {post_processor_name} with arguments {post_processor_kwargs}.", - exc_info=True, - ) - raise e - - assert isinstance( - parameters, PostProcessorParameters - ), "Parameters must be parsable to a PostProcessorParameters object." - - # make array identifiers for input, predictions and outputs - input_array_identifier = LocalArrayIdentifier(input_container, input_dataset) - input_array = ZarrArray.open_from_array_identifier(input_array_identifier) - roi = roi.snap_to_grid(input_array.voxel_size, mode="grow").intersect( - input_array.roi - ) - output_container = Path( - output_path, - "".join(Path(input_container).name.split(".")[:-1]) + f".{file_format}", - ) - prediction_array_identifier = LocalArrayIdentifier( - output_container, f"prediction_{run_name}_{iteration}" - ) - output_array_identifier = LocalArrayIdentifier( - output_container, f"output_{run_name}_{iteration}_{parameters}" - ) - +def apply(run_name: str, iteration: int, dataset_name: str): logger.info( - "Applying best results from run %s at iteration %i to dataset %s", - run.name, + "Applying results from run %s at iteration %d to dataset %s", + run_name, iteration, - Path(input_container, input_dataset), - ) - return apply_run( - run, - parameters, - input_array, - prediction_array_identifier, - output_array_identifier, - roi, - num_cpu_workers, - output_dtype, - compute_context, - overwrite, - ) - - -def apply_run( - run: Run, - parameters: PostProcessorParameters, - input_array: Array, - prediction_array_identifier: LocalArrayIdentifier, - output_array_identifier: LocalArrayIdentifier, - roi: Optional[Roi] = None, - num_cpu_workers: int = 30, - output_dtype: Optional[np.dtype] = np.uint8, - compute_context: ComputeContext = LocalTorch(), - overwrite: bool = True, -): - """Apply the model to a dataset. If roi is None, the whole input dataset is used. Assumes model is already loaded.""" - run.model.eval() - - # render prediction dataset - logger.info("Predicting on dataset %s", prediction_array_identifier) - predict( - run.model, - input_array, - prediction_array_identifier, - output_roi=roi, - num_cpu_workers=num_cpu_workers, - output_dtype=output_dtype, - compute_context=compute_context, - overwrite=overwrite, + dataset_name, ) - - # post-process the output - logger.info("Post-processing output to dataset %s", output_array_identifier) - post_processor = run.task.post_processor - post_processor.set_prediction(prediction_array_identifier) - post_processor.process( - parameters, output_array_identifier, overwrite=overwrite, blockwise=True - ) - - logger.info("Done") - return + raise NotImplementedError("This function is not yet implemented.") diff --git a/dacapo/cli.py b/dacapo/cli.py index f8f06db54..be59df0c0 100644 --- a/dacapo/cli.py +++ b/dacapo/cli.py @@ -1,5 +1,3 @@ -from typing import Optional - import dacapo import click import logging @@ -42,52 +40,21 @@ def validate(run_name, iteration): @cli.command() @click.option( - "-r", "--run_name", required=True, type=str, help="The name of the run to use." + "-r", "--run-name", required=True, type=str, help="The name of the run to use." ) @click.option( - "-ic", - "--input_container", + "-i", + "--iteration", required=True, - type=click.Path(exists=True, file_okay=False), + type=int, + help="The iteration weights and parameters to use.", ) -@click.option("-id", "--input_dataset", required=True, type=str) -@click.option("-op", "--output_path", required=True, type=click.Path(file_okay=False)) -@click.option("-vd", "--validation_dataset", type=str, default=None) -@click.option("-c", "--criterion", default="voi") -@click.option("-i", "--iteration", type=int, default=None) -@click.option("-p", "--parameters", type=str, default=None) @click.option( - "-roi", - "--roi", + "-r", + "--dataset", + required=True, type=str, - required=False, - help="The roi to predict on. Passed in as [lower:upper, lower:upper, ... ]", + help="The name of the dataset to apply the run to.", ) -@click.option("-w", "--num_cpu_workers", type=int, default=30) -@click.option("-dt", "--output_dtype", type=str, default="uint8") -def apply( - run_name: str, - input_container: str, - input_dataset: str, - output_path: str, - validation_dataset: Optional[str] = None, - criterion: Optional[str] = "voi", - iteration: Optional[int] = None, - parameters: Optional[str] = None, - roi: Optional[str] = None, - num_cpu_workers: int = 30, - output_dtype: Optional[str] = "uint8", -): - dacapo.apply( - run_name, - input_container, - input_dataset, - output_path, - validation_dataset, - criterion, - iteration, - parameters, - roi, - num_cpu_workers, - output_dtype, - ) +def apply(run_name, iteration, dataset_name): + dacapo.apply(run_name, iteration, dataset_name) diff --git a/dacapo/experiments/datasplits/datasets/arrays/tiff_array.py b/dacapo/experiments/datasplits/datasets/arrays/tiff_array.py index e16ef26e0..ccdf50376 100644 --- a/dacapo/experiments/datasplits/datasets/arrays/tiff_array.py +++ b/dacapo/experiments/datasplits/datasets/arrays/tiff_array.py @@ -56,7 +56,7 @@ def voxel_size(self) -> Coordinate: @lazy_property.LazyProperty def roi(self) -> Roi: - return Roi(self._offset * self.shape) + return Roi(self._offset, self.shape) @property def writable(self) -> bool: diff --git a/dacapo/experiments/model.py b/dacapo/experiments/model.py index fe1f8e7d5..8ca2b2b9e 100644 --- a/dacapo/experiments/model.py +++ b/dacapo/experiments/model.py @@ -46,7 +46,7 @@ def forward(self, x): result = self.eval_activation(result) return result - def compute_output_shape(self, input_shape: Coordinate) -> Coordinate: + def compute_output_shape(self, input_shape: Coordinate) -> Tuple[int, Coordinate]: """Compute the spatial shape (i.e., not accounting for channels and batch dimensions) of this model, when fed a tensor of the given spatial shape as input.""" diff --git a/dacapo/experiments/tasks/affinities_task.py b/dacapo/experiments/tasks/affinities_task.py index 4a1b8cc4a..859494e7e 100644 --- a/dacapo/experiments/tasks/affinities_task.py +++ b/dacapo/experiments/tasks/affinities_task.py @@ -12,12 +12,10 @@ def __init__(self, task_config): """Create a `DummyTask` from a `DummyTaskConfig`.""" self.predictor = AffinitiesPredictor( - neighborhood=task_config.neighborhood, - lsds=task_config.lsds, - num_voxels=task_config.num_voxels, - downsample_lsds=task_config.downsample_lsds, - grow_boundary_iterations=task_config.grow_boundary_iterations, + neighborhood=task_config.neighborhood, lsds=task_config.lsds + ) + self.loss = AffinitiesLoss( + len(task_config.neighborhood), task_config.lsds_to_affs_weight_ratio ) - self.loss = AffinitiesLoss(len(task_config.neighborhood)) self.post_processor = WatershedPostProcessor(offsets=task_config.neighborhood) self.evaluator = InstanceEvaluator() diff --git a/dacapo/experiments/tasks/affinities_task_config.py b/dacapo/experiments/tasks/affinities_task_config.py index 0a94db79d..a50c2141e 100644 --- a/dacapo/experiments/tasks/affinities_task_config.py +++ b/dacapo/experiments/tasks/affinities_task_config.py @@ -30,23 +30,9 @@ class AffinitiesTaskConfig(TaskConfig): "It has been shown that lsds as an auxiliary task can help affinity predictions." }, ) - num_voxels: int = attr.ib( - default=20, - metadata={ - "help_text": "The number of voxels to use for the gaussian sigma when computing lsds." - }, - ) - downsample_lsds: int = attr.ib( + lsds_to_affs_weight_ratio: float = attr.ib( default=1, metadata={ - "help_text": "The amount to downsample the lsds. " - "This is useful for speeding up training and inference." - }, - ) - grow_boundary_iterations: int = attr.ib( - default=0, - metadata={ - "help_text": "The number of iterations to run the grow boundaries algorithm. " - "This is useful for refining the boundaries of the affinities, and reducing merging of adjacent objects." + "help_text": "If training with lsds, set how much they should be weighted compared to affs." }, ) diff --git a/dacapo/experiments/tasks/losses/affinities_loss.py b/dacapo/experiments/tasks/losses/affinities_loss.py index 65ada8843..74fc7fe67 100644 --- a/dacapo/experiments/tasks/losses/affinities_loss.py +++ b/dacapo/experiments/tasks/losses/affinities_loss.py @@ -3,8 +3,9 @@ class AffinitiesLoss(Loss): - def __init__(self, num_affinities: int): + def __init__(self, num_affinities: int, lsds_to_affs_weight_ratio: float): self.num_affinities = num_affinities + self.lsds_to_affs_weight_ratio = lsds_to_affs_weight_ratio def compute(self, prediction, target, weight): affs, affs_target, affs_weight = ( @@ -21,7 +22,7 @@ def compute(self, prediction, target, weight): return ( torch.nn.BCEWithLogitsLoss(reduction="none")(affs, affs_target) * affs_weight - ).mean() + ( + ).mean() + self.lsds_to_affs_weight_ratio * ( torch.nn.MSELoss(reduction="none")(torch.nn.Sigmoid()(aux), aux_target) * aux_weight ).mean() diff --git a/dacapo/experiments/tasks/post_processors/watershed_post_processor.py b/dacapo/experiments/tasks/post_processors/watershed_post_processor.py index 307806772..1a7c4627b 100644 --- a/dacapo/experiments/tasks/post_processors/watershed_post_processor.py +++ b/dacapo/experiments/tasks/post_processors/watershed_post_processor.py @@ -24,9 +24,7 @@ def enumerate_parameters(self): """Enumerate all possible parameters of this post-processor. Should return instances of ``PostProcessorParameters``.""" - for i, bias in enumerate( - [0.1, 0.3, 0.5, 0.7, 0.9] - ): # TODO: add this to the config + for i, bias in enumerate([0.1, 0.25, 0.5, 0.75, 0.9]): yield WatershedPostProcessorParameters(id=i, bias=bias) def set_prediction(self, prediction_array_identifier): @@ -34,65 +32,45 @@ def set_prediction(self, prediction_array_identifier): prediction_array_identifier ) - def process( - self, - parameters, - output_array_identifier, - overwrite: bool = False, - blockwise: bool = False, - ): # TODO: will probably break with large arrays... - if not blockwise: - output_array = ZarrArray.create_from_array_identifier( - output_array_identifier, - [axis for axis in self.prediction_array.axes if axis != "c"], - self.prediction_array.roi, - None, - self.prediction_array.voxel_size, - np.uint64, - overwrite=overwrite, - ) - # if a previous segmentation is provided, it must have a "grid graph" - # in its metadata. - # pred_data = self.prediction_array[self.prediction_array.roi] - # affs = pred_data[: len(self.offsets)].astype( - # np.float64 - # ) # TODO: shouldn't need to be float64 - affs = self.prediction_array[self.prediction_array.roi][: len(self.offsets)] - if affs.dtype == np.uint8: - affs = affs.astype(np.float64) / 255.0 - else: - affs = affs.astype(np.float64) - segmentation = mws.agglom( - affs - parameters.bias, - self.offsets, - ) - # filter fragments - average_affs = np.mean(affs, axis=0) - - filtered_fragments = [] - - fragment_ids = np.unique(segmentation) - - for fragment, mean in zip( - fragment_ids, - measurements.mean(average_affs, segmentation, fragment_ids), - ): - if mean < parameters.bias: - filtered_fragments.append(fragment) - - filtered_fragments = np.array(filtered_fragments, dtype=segmentation.dtype) - replace = np.zeros_like(filtered_fragments) - - # DGA: had to add in flatten and reshape since remap (in particular indices) didn't seem to work with ndarrays for the input - if filtered_fragments.size > 0: - segmentation = npi.remap( - segmentation.flatten(), filtered_fragments, replace - ).reshape(segmentation.shape) - - output_array[self.prediction_array.roi] = segmentation - - return output_array - else: - raise NotImplementedError( - "Blockwise processing not yet implemented." - ) # TODO: add rusty mws + def process(self, parameters, output_array_identifier): + output_array = ZarrArray.create_from_array_identifier( + output_array_identifier, + [axis for axis in self.prediction_array.axes if axis != "c"], + self.prediction_array.roi, + None, + self.prediction_array.voxel_size, + np.uint64, + ) + # if a previous segmentation is provided, it must have a "grid graph" + # in its metadata. + pred_data = self.prediction_array[self.prediction_array.roi] + affs = pred_data[: len(self.offsets)].astype(np.float64) + segmentation = mws.agglom( + affs - parameters.bias, + self.offsets, + ) + # filter fragments + average_affs = np.mean(affs, axis=0) + + filtered_fragments = [] + + fragment_ids = np.unique(segmentation) + + for fragment, mean in zip( + fragment_ids, measurements.mean(average_affs, segmentation, fragment_ids) + ): + if mean < parameters.bias: + filtered_fragments.append(fragment) + + filtered_fragments = np.array(filtered_fragments, dtype=segmentation.dtype) + replace = np.zeros_like(filtered_fragments) + + # DGA: had to add in flatten and reshape since remap (in particular indices) didn't seem to work with ndarrays for the input + if filtered_fragments.size > 0: + segmentation = npi.remap( + segmentation.flatten(), filtered_fragments, replace + ).reshape(segmentation.shape) + + output_array[self.prediction_array.roi] = segmentation + + return output_array diff --git a/dacapo/experiments/trainers/gunpowder_trainer.py b/dacapo/experiments/trainers/gunpowder_trainer.py index 889ab292c..72427951e 100644 --- a/dacapo/experiments/trainers/gunpowder_trainer.py +++ b/dacapo/experiments/trainers/gunpowder_trainer.py @@ -44,6 +44,11 @@ def __init__(self, trainer_config): self.mask_integral_downsample_factor = 4 self.clip_raw = trainer_config.clip_raw + # Testing out if calculating multiple times and multiplying is necessary + self.add_predictor_nodes_to_dataset = ( + trainer_config.add_predictor_nodes_to_dataset + ) + self.scheduler = None def create_optimizer(self, model): @@ -172,21 +177,32 @@ def build_batch_provider(self, datasets, model, task, snapshot_container=None): reject_probability=self.reject_probability, ) - for augment in self.augments: - dataset_source += augment.node(raw_key, gt_key, mask_key) - + if self.add_predictor_nodes_to_dataset: # Add predictor nodes to dataset_source dataset_source += DaCapoTargetFilter( task.predictor, gt_key=gt_key, - weights_key=weight_key, - target_key=target_key, + weights_key=dataset_weight_key, mask_key=mask_key, ) dataset_sources.append(dataset_source) pipeline = tuple(dataset_sources) + gp.RandomProvider(weights) + # Add predictor nodes to pipeline + pipeline += DaCapoTargetFilter( + task.predictor, + gt_key=gt_key, + target_key=target_key, + weights_key=datasets_weight_key + if self.add_predictor_nodes_to_dataset + else weight_key, + mask_key=mask_key, + ) + + if self.add_predictor_nodes_to_dataset: + pipeline += Product(dataset_weight_key, datasets_weight_key, weight_key) + # Trainer attributes: if self.num_data_fetchers > 1: pipeline += gp.PreCache(num_workers=self.num_data_fetchers) diff --git a/dacapo/experiments/trainers/gunpowder_trainer_config.py b/dacapo/experiments/trainers/gunpowder_trainer_config.py index 032dba23d..255c73ad6 100644 --- a/dacapo/experiments/trainers/gunpowder_trainer_config.py +++ b/dacapo/experiments/trainers/gunpowder_trainer_config.py @@ -27,7 +27,12 @@ class GunpowderTrainerConfig(TrainerConfig): default=None, metadata={"help_text": "Number of iterations before saving a new snapshot."}, ) - min_masked: Optional[float] = attr.ib(default=1e-6) - reject_probability: Optional[float or None] = attr.ib(default=1) - weighted_reject: bool = attr.ib(default=False) + min_masked: Optional[float] = attr.ib(default=0.15) clip_raw: bool = attr.ib(default=False) + + add_predictor_nodes_to_dataset: Optional[bool] = attr.ib( + default=True, + metadata={ + "help_text": "Whether to add a predictor node to dataset_source and apply product of weights" + }, + ) diff --git a/dacapo/predict.py b/dacapo/predict.py index 4fed2d484..491fd96fb 100644 --- a/dacapo/predict.py +++ b/dacapo/predict.py @@ -24,7 +24,7 @@ def predict( num_cpu_workers: int = 4, compute_context: ComputeContext = LocalTorch(), output_roi: Optional[Roi] = None, - output_dtype: Optional[np.dtype] = np.float32, # add necessary type conversions + output_dtype: np.dtype = np.float32, # type: ignore overwrite: bool = False, ): # get the model's input and output size @@ -59,7 +59,6 @@ def predict( model.num_out_channels, output_voxel_size, output_dtype, - overwrite=overwrite, ) # create gunpowder keys diff --git a/mypy.ini b/mypy.ini index 722c11df8..aadc732e4 100644 --- a/mypy.ini +++ b/mypy.ini @@ -68,4 +68,7 @@ ignore_missing_imports = True ignore_missing_imports = True [mypy-mwatershed.*] -ignore_missing_imports = True \ No newline at end of file +ignore_missing_imports = True + +[mypy-numpy_indexed.*] +ignore_missing_imports = True diff --git a/requirements-dev.txt b/requirements-dev.txt index 492c8e6f4..12afa83a4 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1,5 +1,5 @@ black mypy -pytest +pytest==7.4.4 pytest-cov pytest-lazy-fixture \ No newline at end of file diff --git a/setup.py b/setup.py index e0ac028a4..3e6f51064 100644 --- a/setup.py +++ b/setup.py @@ -32,10 +32,11 @@ "funlib.math>=0.1", "funlib.geometry>=0.2", "mwatershed>=0.1", - "funlib.persistence>=0.1", + "funlib.persistence @ git+https://github.com/janelia-cellmap/funlib.persistence", "funlib.evaluate @ git+https://github.com/pattonw/funlib.evaluate", "gunpowder>=1.3", - "lsds>=0.1.3", + # "lsds>=0.1.3", + "lsds @ git+https://github.com/funkelab/lsd", "xarray", "cattrs", "numpy-indexed",