Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: produce spatial index for images #98

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 57 additions & 2 deletions igneous/task_creation.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@

import cloudvolume
from cloudvolume import CloudVolume
from cloudvolume.lib import Vec, Bbox, max2, min2, xyzrange, find_closest_divisor, yellow, jsonify
from cloudvolume.lib import (
Vec, Bbox, max2, min2, xyzrange,
find_closest_divisor, yellow, jsonify,
getprecision
)
from cloudvolume.datasource.precomputed.sharding import ShardingSpecification
from cloudfiles import CloudFiles
from taskqueue import TaskQueue, MockTaskQueue
Expand All @@ -31,7 +35,7 @@
TransferTask, WatershedRemapTask, DeleteTask,
LuminanceLevelsTask, ContrastNormalizationTask,
SkeletonTask, UnshardedSkeletonMergeTask, ShardedSkeletonMergeTask,
MaskAffinitymapTask, InferenceTask
MaskAffinitymapTask, InferenceTask, ImageSpatialIndexTask
)
# from igneous.tasks import BigArrayTask

Expand Down Expand Up @@ -225,6 +229,51 @@ def on_finish(self):

return TouchTaskIterator(bounds, shape)

def create_segmentation_spatial_index_tasks(
cloudpath:str, mip=0, shape=(512, 512, 512),
fill_missing=False
):
"""
Generate a spatial index for a segmentation image layer.
"""
cv = CloudVolume(cloudpath, mip=mip)
shape = Vec(*shape, dtype=int)

if cv.layer_type != "segmentation":
raise ValueError(
f"Can only create the spatial index on segmentation. {cv.cloudpath} is an {cv.layer_type}."
)

cv.scale["spatial_index"] = {
'resolution': vol.resolution.tolist(),
'chunk_size': (shape.astype(cv.resolution) * cv.resolution).tolist(),
}
cv.commit_info()

class ImageSpatialIndexTaskIterator(FinelyDividedTaskIterator):
def task(self, shape, offset):
return partial(ImageSpatialIndexTask,
cloudpath=cloudpath,
shape=shape.clone(),
offset=offset.clone(),
mip=mip,
fill_missing=bool(fill_missing)
)
def on_finish(self):
cv.provenance.processing.append({
'method': {
'task': 'ImageSpatialIndexTask',
'mip': mip,
'shape': shape.tolist(),
'fill_missing': bool(fill_missing),
},
'by': OPERATOR_CONTACT,
'date': strftime('%Y-%m-%d %H:%M %Z'),
})
cv.commit_provenance()

return ImageSpatialIndexTaskIterator(cv.bounds.clone(), shape)

def create_downsampling_tasks(
layer_path, mip=0, fill_missing=False,
axis='z', num_mips=5, preserve_chunk_size=True,
Expand All @@ -243,6 +292,12 @@ def create_downsampling_tasks(
evenly divisible chunk size to 64,64,64 for this shape and use that. The latter can be
useful when mip 0 uses huge chunks and you want to simply visualize the upper mips.
chunk_size: (overrides preserve_chunk_size) force chunk size for new layers to be this.
encoding: "raw", "jpeg", "compressed_segmentation", "compresso", "fpzip", or "kempressed"
depending on which kind of data you're dealing with. raw works for everything but you
might get better compression with another encoding. You can think of encoding as the
image type-specific first stage of compression and the "compress" flag as the data
agnostic second stage compressor. For example, compressed_segmentation and gzip work
well together, but not jpeg and gzip.
sparse: When downsampling segmentation, if true, don't count black pixels when computing
the mode. Useful for e.g. synapses and point labels.
bounds: By default, downsample everything, but you can specify restricted bounding boxes
Expand Down
3 changes: 2 additions & 1 deletion igneous/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@
DownsampleTask, QuantizeTask,
TransferTask, WatershedRemapTask, DeleteTask,
LuminanceLevelsTask, ContrastNormalizationTask,
MaskAffinitymapTask, InferenceTask, BlackoutTask
MaskAffinitymapTask, InferenceTask, BlackoutTask,
ImageSpatialIndexTask
)
56 changes: 54 additions & 2 deletions igneous/tasks/tasks.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
from collections import defaultdict
from collections.abc import Sequence

from io import BytesIO

import json
import math
import os
import random
import re
from typing import Iterable, List, Tuple, Set

import numpy as np
import scipy.ndimage
from tqdm import tqdm

from cloudfiles import CloudFiles
Expand All @@ -30,6 +30,18 @@
MaskAffinitymapTask, InferenceTask
)

def find_objects(labels):
"""
scipy.ndimage.find_objects performs about 7-8x faster on C
ordered arrays, so we just do it that way and convert
the results if it's in F order.
"""
if labels.flags['C_CONTIGUOUS']:
return scipy.ndimage.find_objects(labels)
else:
all_slices = scipy.ndimage.find_objects(labels.T)
return [ (slcs and slcs[::-1]) for slcs in all_slices ]

def downsample_and_upload(
image, bounds, vol, ds_shape,
mip=0, axis='z', skip_first=False,
Expand Down Expand Up @@ -77,6 +89,46 @@ def downsample_and_upload(
new_bounds.maxpt = new_bounds.minpt + Vec(*mipped.shape[:3])
vol[new_bounds] = mipped

@queueable
def ImageSpatialIndexTask(
cloudpath:str,
shape:Iterable[int], offset:Iterable[int],
mip=0, fill_missing=False
):
shape = Vec(*shape, dtype=int)
offset = Vec(*offset, dtype=int)
bbox = Bbox(offset, offset + shape, dtype=int)

cv = CloudVolume(cloudpath, mip=mip, fill_missing=fill_missing)
bbox = Bbox.clamp(bbox, cv.bounds)
labels = cv[bbox]

labels, remap = fastremap.renumber(labels, in_place=True)
slcs = find_objects(labels)
del labels

spatial_index = {}
for orig_label, renum_label in remap.items():
if orig_label == 0:
continue
slc = slcs[renum_label - 1]
if slc is None:
continue

subbox = Bbox.from_slices(slc)
subbox += bbox.minpt
subbox = subbox.astype(cv.resolution.dtype) * cv.resolution
spatial_index[orig_label] = subbox.to_list()

path = cv.meta.join(cloudpath, cv.key, 'spatial_index')
precision = cv.image.spatial_index.precision
cf = CloudFiles(path)
cf.put_json(
path=f"{bbox.to_filename(precision)}.spatial",
content=spatial_index,
compress='br',
)

@queueable
def DeleteTask(layer_path:str, shape, offset, mip=0, num_mips=5):
"""Delete a block of images inside a layer on all mip levels."""
Expand Down
48 changes: 48 additions & 0 deletions igneous_cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os

import click
from cloudvolume import CloudVolume
from taskqueue import TaskQueue
from taskqueue.lib import toabs
from taskqueue.paths import get_protocol
Expand Down Expand Up @@ -504,4 +505,51 @@ def delete_images(
tq = TaskQueue(normalize_path(queue))
tq.insert(tasks, parallel=parallel)

@main.group("index")
def indexgroup():
"""
Create and manage a spatial index for segmentation layers.

This allows you to more efficiently query the locations
of segment ids in space.
"""
pass

@indexgroup.command("create")
@click.argument("path")
@click.option('--queue', required=True, help="AWS SQS queue or directory to be used for a task queue. e.g. sqs://my-queue or ./my-queue. See https://github.com/seung-lab/python-task-queue", type=str)
@click.option('--mip', default=0, help="Which mip level to index. Default: 0")
@click.option('--shape', default=None, help="The grid size of the index as a comma delimited list. Should be a multiple of the chunk size.", type=Tuple3())
@click.option('--fill-missing', is_flag=True, default=False, help="Interpret missing image files as background instead of failing.")
@click.pass_context
def image_spatial_index(
ctx, path, queue,
mip, shape, fill_missing
):
"""
Create a spatial index for a segmentation image layer.
"""
tasks = tc.create_segmentation_spatial_index_tasks(
path, mip=mip,
shape=shape, fill_missing=fill_missing
)
parallel = int(ctx.obj.get("parallel", 1))
tq = TaskQueue(normalize_path(queue))
tq.insert(tasks, parallel=parallel)

@indexgroup.command("sqlite")
@click.argument("cloudpath")
@click.argument("dbpath")
@click.option('--mip', default=0, help="Which mip level. Default: 0")
@click.option('--progress', is_flag=True, default=True, help="Display progress bar. Default: True")
@click.pass_context
def image_spatial_index_to_sqlite(ctx, cloudpath, dbpath, mip, progress):
"""
Download and convert the index into an sqlite3 database.
"""
cv = CloudVolume(cloudpath, mip=mip)
cv.image.spatial_index[mip].to_sqlite(dbpath, progress=progress)