-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathN5Write.pyscro
73 lines (62 loc) · 2.99 KB
/
N5Write.pyscro
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import zarr
from pathlib import Path
_container_extensions = ('.zarr', '.n5')
def split_path_at_container(path):
# check whether a path contains a valid file path to a container file, and if so which container format it is
result = None
pathobj = Path(path)
if pathobj.suffix in _container_extensions:
result = [path, '']
else:
for parent in pathobj.parents:
if parent.suffix in _container_extensions:
result = path.split(parent.suffix)
result[0] += parent.suffix
return result
class N5Write(PyScriptObject):
def __init__(self):
self.data.valid_types = ['HxUniformScalarField3']
self.do_it = HxPortDoIt(self, 'write', 'Save N5')
self.output_dir = HxPortFilename(self, 'outputDir', 'Output Directory')
self.output_dir.mode = HxPortFilename.LOAD_DIRECTORY
self.container = None
self.dataset = None
self.container_path = None
self.dataset_path = None
self.slices = None
def update(self):
if self.output_dir.is_new and self.output_dir.filenames is not None:
self.container_path, self.dataset_path = split_path_at_container(self.output_dir.filenames)
self.container = self.access_container(mode='a')
self.dataset = self.container[self.dataset_path]
pass
def access_container(self, mode):
container_extension = Path(self.container_path).suffix
store_path = None
if container_extension == '.n5':
store_path = zarr.N5Store(self.container_path)
elif container_extension == '.zarr':
store_path = self.container_path
container = zarr.open(store=store_path, mode=mode)
return container
def compute(self):
if not self.do_it.was_hit:
return
if self.data.source() is None:
return
# Slices and array have to be transposed b/c amira uses x,y,z axis ordering but we use z,y,x for
# zarr
self.slices = self.bbox_to_slices()
output = self.data.source().get_array().T
print('Preparing to save {0} to {1} using slices {2}'.format(output, self.dataset, self.slices))
self.dataset[self.slices[::-1]] = output
print('Save complete')
def bbox_to_slices(self):
# convert the bounding box of the input data (real units) to a tuple of slices (indices)
shape = self.data.source().get_array().shape
bbox_starts, bbox_stops = self.data.source().bounding_box
# figure out the sampling resolution by dividing the extent of the data by its shape for each axis
rscale = tuple((bsto - bsta) / (s - 1) for bsto, bsta, s in zip(bbox_stops, bbox_starts, shape))
origins = tuple(int(bsta / r) for bsta, r in zip(bbox_starts, rscale))
slices = tuple(slice(o, o + s) for o,s in zip(origins, shape))
return slices