diff --git a/igneous/task_creation.py b/igneous/task_creation.py index 9435b40f..f56f385c 100755 --- a/igneous/task_creation.py +++ b/igneous/task_creation.py @@ -17,7 +17,11 @@ import cloudvolume from cloudvolume import CloudVolume -from cloudvolume.lib import Vec, Bbox, max2, min2, xyzrange, find_closest_divisor, yellow, jsonify +from cloudvolume.lib import ( + Vec, Bbox, max2, min2, xyzrange, + find_closest_divisor, yellow, jsonify, + getprecision +) from cloudvolume.datasource.precomputed.sharding import ShardingSpecification from cloudfiles import CloudFiles from taskqueue import TaskQueue, MockTaskQueue @@ -31,7 +35,7 @@ TransferTask, WatershedRemapTask, DeleteTask, LuminanceLevelsTask, ContrastNormalizationTask, SkeletonTask, UnshardedSkeletonMergeTask, ShardedSkeletonMergeTask, - MaskAffinitymapTask, InferenceTask + MaskAffinitymapTask, InferenceTask, ImageSpatialIndexTask ) # from igneous.tasks import BigArrayTask @@ -225,6 +229,51 @@ def on_finish(self): return TouchTaskIterator(bounds, shape) +def create_segmentation_spatial_index_tasks( + cloudpath:str, mip=0, shape=(512, 512, 512), + fill_missing=False +): + """ + Generate a spatial index for a segmentation image layer. + """ + cv = CloudVolume(cloudpath, mip=mip) + shape = Vec(*shape, dtype=int) + + if cv.layer_type != "segmentation": + raise ValueError( + f"Can only create the spatial index on segmentation. {cv.cloudpath} is an {cv.layer_type}." + ) + + cv.scale["spatial_index"] = { + 'resolution': vol.resolution.tolist(), + 'chunk_size': (shape.astype(cv.resolution) * cv.resolution).tolist(), + } + cv.commit_info() + + class ImageSpatialIndexTaskIterator(FinelyDividedTaskIterator): + def task(self, shape, offset): + return partial(ImageSpatialIndexTask, + cloudpath=cloudpath, + shape=shape.clone(), + offset=offset.clone(), + mip=mip, + fill_missing=bool(fill_missing) + ) + def on_finish(self): + cv.provenance.processing.append({ + 'method': { + 'task': 'ImageSpatialIndexTask', + 'mip': mip, + 'shape': shape.tolist(), + 'fill_missing': bool(fill_missing), + }, + 'by': OPERATOR_CONTACT, + 'date': strftime('%Y-%m-%d %H:%M %Z'), + }) + cv.commit_provenance() + + return ImageSpatialIndexTaskIterator(cv.bounds.clone(), shape) + def create_downsampling_tasks( layer_path, mip=0, fill_missing=False, axis='z', num_mips=5, preserve_chunk_size=True, @@ -243,6 +292,12 @@ def create_downsampling_tasks( evenly divisible chunk size to 64,64,64 for this shape and use that. The latter can be useful when mip 0 uses huge chunks and you want to simply visualize the upper mips. chunk_size: (overrides preserve_chunk_size) force chunk size for new layers to be this. + encoding: "raw", "jpeg", "compressed_segmentation", "compresso", "fpzip", or "kempressed" + depending on which kind of data you're dealing with. raw works for everything but you + might get better compression with another encoding. You can think of encoding as the + image type-specific first stage of compression and the "compress" flag as the data + agnostic second stage compressor. For example, compressed_segmentation and gzip work + well together, but not jpeg and gzip. sparse: When downsampling segmentation, if true, don't count black pixels when computing the mode. Useful for e.g. synapses and point labels. bounds: By default, downsample everything, but you can specify restricted bounding boxes diff --git a/igneous/tasks/__init__.py b/igneous/tasks/__init__.py index 9222edbe..1da1ca77 100644 --- a/igneous/tasks/__init__.py +++ b/igneous/tasks/__init__.py @@ -5,5 +5,6 @@ DownsampleTask, QuantizeTask, TransferTask, WatershedRemapTask, DeleteTask, LuminanceLevelsTask, ContrastNormalizationTask, - MaskAffinitymapTask, InferenceTask, BlackoutTask + MaskAffinitymapTask, InferenceTask, BlackoutTask, + ImageSpatialIndexTask ) \ No newline at end of file diff --git a/igneous/tasks/tasks.py b/igneous/tasks/tasks.py index f7382b7b..0125d6ef 100755 --- a/igneous/tasks/tasks.py +++ b/igneous/tasks/tasks.py @@ -1,15 +1,15 @@ from collections import defaultdict from collections.abc import Sequence - from io import BytesIO - import json import math import os import random import re +from typing import Iterable, List, Tuple, Set import numpy as np +import scipy.ndimage from tqdm import tqdm from cloudfiles import CloudFiles @@ -30,6 +30,18 @@ MaskAffinitymapTask, InferenceTask ) +def find_objects(labels): + """ + scipy.ndimage.find_objects performs about 7-8x faster on C + ordered arrays, so we just do it that way and convert + the results if it's in F order. + """ + if labels.flags['C_CONTIGUOUS']: + return scipy.ndimage.find_objects(labels) + else: + all_slices = scipy.ndimage.find_objects(labels.T) + return [ (slcs and slcs[::-1]) for slcs in all_slices ] + def downsample_and_upload( image, bounds, vol, ds_shape, mip=0, axis='z', skip_first=False, @@ -77,6 +89,46 @@ def downsample_and_upload( new_bounds.maxpt = new_bounds.minpt + Vec(*mipped.shape[:3]) vol[new_bounds] = mipped +@queueable +def ImageSpatialIndexTask( + cloudpath:str, + shape:Iterable[int], offset:Iterable[int], + mip=0, fill_missing=False +): + shape = Vec(*shape, dtype=int) + offset = Vec(*offset, dtype=int) + bbox = Bbox(offset, offset + shape, dtype=int) + + cv = CloudVolume(cloudpath, mip=mip, fill_missing=fill_missing) + bbox = Bbox.clamp(bbox, cv.bounds) + labels = cv[bbox] + + labels, remap = fastremap.renumber(labels, in_place=True) + slcs = find_objects(labels) + del labels + + spatial_index = {} + for orig_label, renum_label in remap.items(): + if orig_label == 0: + continue + slc = slcs[renum_label - 1] + if slc is None: + continue + + subbox = Bbox.from_slices(slc) + subbox += bbox.minpt + subbox = subbox.astype(cv.resolution.dtype) * cv.resolution + spatial_index[orig_label] = subbox.to_list() + + path = cv.meta.join(cloudpath, cv.key, 'spatial_index') + precision = cv.image.spatial_index.precision + cf = CloudFiles(path) + cf.put_json( + path=f"{bbox.to_filename(precision)}.spatial", + content=spatial_index, + compress='br', + ) + @queueable def DeleteTask(layer_path:str, shape, offset, mip=0, num_mips=5): """Delete a block of images inside a layer on all mip levels.""" diff --git a/igneous_cli/cli.py b/igneous_cli/cli.py index 9f13e9e6..0f7aa565 100644 --- a/igneous_cli/cli.py +++ b/igneous_cli/cli.py @@ -2,6 +2,7 @@ import os import click +from cloudvolume import CloudVolume from taskqueue import TaskQueue from taskqueue.lib import toabs from taskqueue.paths import get_protocol @@ -504,4 +505,51 @@ def delete_images( tq = TaskQueue(normalize_path(queue)) tq.insert(tasks, parallel=parallel) +@main.group("index") +def indexgroup(): + """ + Create and manage a spatial index for segmentation layers. + + This allows you to more efficiently query the locations + of segment ids in space. + """ + pass + +@indexgroup.command("create") +@click.argument("path") +@click.option('--queue', required=True, help="AWS SQS queue or directory to be used for a task queue. e.g. sqs://my-queue or ./my-queue. See https://github.com/seung-lab/python-task-queue", type=str) +@click.option('--mip', default=0, help="Which mip level to index. Default: 0") +@click.option('--shape', default=None, help="The grid size of the index as a comma delimited list. Should be a multiple of the chunk size.", type=Tuple3()) +@click.option('--fill-missing', is_flag=True, default=False, help="Interpret missing image files as background instead of failing.") +@click.pass_context +def image_spatial_index( + ctx, path, queue, + mip, shape, fill_missing +): + """ + Create a spatial index for a segmentation image layer. + """ + tasks = tc.create_segmentation_spatial_index_tasks( + path, mip=mip, + shape=shape, fill_missing=fill_missing + ) + parallel = int(ctx.obj.get("parallel", 1)) + tq = TaskQueue(normalize_path(queue)) + tq.insert(tasks, parallel=parallel) + +@indexgroup.command("sqlite") +@click.argument("cloudpath") +@click.argument("dbpath") +@click.option('--mip', default=0, help="Which mip level. Default: 0") +@click.option('--progress', is_flag=True, default=True, help="Display progress bar. Default: True") +@click.pass_context +def image_spatial_index_to_sqlite(ctx, cloudpath, dbpath, mip, progress): + """ + Download and convert the index into an sqlite3 database. + """ + cv = CloudVolume(cloudpath, mip=mip) + cv.image.spatial_index[mip].to_sqlite(dbpath, progress=progress) + + +