# -*- coding: utf-8 -*-
"""
data_stream
The following image shows the class hierarchy for data streams.
.. image:: ../docs/ahds_classes.png
"""
import re
import struct
from UserList import UserList
import numpy
from skimage.measure._find_contours import find_contours
from ahds import header
# type of data to find in the stream
FIND = {
'decimal': '\d', # [0-9]
'alphanum_': '\w', # [a-aA-Z0-9_]
}
try:
from ahds.decoders import byterle_decoder
except ImportError:
[docs] def byterle_decoder(data, output_size):
"""Python drop-in replacement for compiled equivalent
:param int output_size: the number of items when ``data`` is uncompressed
:param str data: a raw stream of data to be unpacked
:return numpy.array output: an array of ``numpy.uint8``
"""
from warnings import warn
warn("using pure-Python (instead of Python C-extension) implementation of byterle_decoder")
input_data = struct.unpack('<{}B'.format(len(data)), data)
output = numpy.zeros(output_size, dtype=numpy.uint8)
i = 0
count = True
repeat = False
no = None
j = 0
while i < len(input_data):
if count:
no = input_data[i]
if no > 127:
no &= 0x7f # 2's complement
count = False
repeat = True
i += 1
continue
else:
i += 1
count = False
repeat = False
continue
elif not count:
if repeat:
value = input_data[i:i + no]
repeat = False
count = True
output[j:j + no] = numpy.array(value)
i += no
j += no
continue
elif not repeat:
value = input_data[i]
output[j:j + no] = value
i += 1
j += no
count = True
repeat = False
continue
assert j == output_size
return output
[docs]def hxbyterle_decode(output_size, data):
"""Decode HxRLE data stream
If C-extension is not compiled it will use a (slower) Python equivalent
:param int output_size: the number of items when ``data`` is uncompressed
:param str data: a raw stream of data to be unpacked
:return numpy.array output: an array of ``numpy.uint8``
"""
output = byterle_decoder(data, output_size)
assert len(output) == output_size
return output
[docs]def hxzip_decode(data_size, data):
"""Decode HxZip data stream
:param int data_size: the number of items when ``data`` is uncompressed
:param str data: a raw stream of data to be unpacked
:return numpy.array output: an array of ``numpy.uint8``
"""
import zlib
data_stream = zlib.decompress(data)
output = numpy.array(struct.unpack('<{}B'.format(len(data_stream)), data_stream), dtype=numpy.uint8)
assert len(output) == data_size
return output
[docs]def unpack_binary(data_pointer, definitions, data):
"""Unpack binary data using ``struct.unpack``
:param data_pointer: metadata for the ``data_pointer`` attribute for this data stream
:type data_pointer: :py:class:`ahds.header.Block`
:param definitions: definitions specified in the header
:type definitions: :py:class:`ahds.header.Block`
:param bytes data: raw binary data to be unpacked
:return tuple output: unpacked data
"""
if data_pointer.data_dimension:
data_dimension = data_pointer.data_dimension
else:
data_dimension = 1 # if data_dimension is None
if data_pointer.data_type == "float":
data_type = "f" * data_dimension
elif data_pointer.data_type == "int":
data_type = "i" * data_dimension # assume signed int
elif data_pointer.data_type == "byte":
data_type = "b" * data_dimension # assume signed char
# get this streams size from the definitions
try:
data_length = int(getattr(definitions, data_pointer.data_name))
except AttributeError:
# quickfix
"""
:TODO: nNodes definition fix
"""
try:
data_length = int(getattr(definitions, 'Nodes'))
except AttributeError:
x, y, z = definitions.Lattice
data_length = x * y * z
output = numpy.array(struct.unpack('<' + '{}'.format(data_type) * data_length, data)) # assume little-endian
output = output.reshape(data_length, data_dimension)
return output
[docs]def unpack_ascii(data):
"""Unpack ASCII data using string methods``
:param data_pointer: metadata for the ``data_pointer`` attribute for this data stream
:type data_pointer: :py:class:`ahds.header.Block`
:param definitions: definitions specified in the header
:type definitions: :py:class:`ahds.header.Block`
:param bytes data: raw binary data to be unpacked
:return list output: unpacked data
"""
# string: split at newlines -> exclude last list item -> strip space from each
numstrings = map(lambda s: s.strip(), data.split('\n')[:-1])
print numstrings
# check if string is digit (integer); otherwise float
if len(numstrings) == len(filter(lambda n: n.isdigit(), numstrings)):
output = map(int, numstrings)
else:
output = map(float, numstrings)
return output
[docs]class Image(object):
"""Encapsulates individual images"""
def __init__(self, z, array):
self.z = z
self._array = array
self._byte_values = set(self._array.flatten().tolist())
@property
def byte_values(self):
return self._byte_values
@property
def array(self):
"""Accessor to underlying array data"""
return self._array
[docs] def equalise(self):
"""Increase the dynamic range of the image"""
multiplier = 255 // len(self._byte_values)
return self._array * multiplier
@property
def as_contours(self):
"""A dictionary of lists of contours keyed by byte_value"""
contours = dict()
for byte_value in self._byte_values:
if byte_value == 0:
continue
mask = (self._array == byte_value) * 255
found_contours = find_contours(mask, 254, fully_connected='high') # a list of array
contours[byte_value] = ContourSet(found_contours)
return contours
@property
def as_segments(self):
return {self.z: self.as_contours}
[docs] def show(self):
"""Display the image"""
with_matplotlib = True
try:
import matplotlib.pyplot as plt
except RuntimeError:
import skimage.io as io
with_matplotlib = False
if with_matplotlib:
equalised_img = self.equalise()
_, ax = plt.subplots()
ax.imshow(equalised_img, cmap='gray')
import random
for contour_set in self.as_contours.itervalues():
r, g, b = random.random(), random.random(), random.random()
[ax.plot(contour[:, 1], contour[:, 0], linewidth=2, color=(r, g, b, 1)) for contour in contour_set]
ax.axis('image')
ax.set_xticks([])
ax.set_yticks([])
plt.show()
else:
io.imshow(self.equalise())
io.show()
def __repr__(self):
return "<Image with dimensions {}>".format(self.array.shape)
def __str__(self):
return "<Image with dimensions {}>".format(self.array.shape)
[docs]class ImageSet(UserList):
"""Encapsulation for set of :py:class:`ahds.data_stream.Image` objects"""
def __getitem__(self, index):
return Image(index, self.data[index])
@property
def segments(self):
"""A dictionary of lists of contours keyed by z-index"""
segments = dict()
for i in xrange(len(self)):
image = self[i]
for z, contour in image.as_segments.iteritems():
for byte_value, contour_set in contour.iteritems():
if byte_value not in segments:
segments[byte_value] = dict()
if z not in segments[byte_value]:
segments[byte_value][z] = contour_set
else:
segments[byte_value][z] += contour_set
return segments
def __repr__(self):
return "<ImageSet with {} images>".format(len(self))
[docs]class ContourSet(UserList):
"""Encapsulation for a set of :py:class:`ahds.data_stream.Contour` objects"""
def __getitem__(self, index):
return Contour(index, self.data[index])
def __repr__(self):
string = "{} with {} contours".format(self.__class__, len(self))
return string
[docs]class Contour(object):
"""Encapsulates the array representing a contour"""
def __init__(self, z, array):
self.z = z
self._array = array
def __len__(self):
return len(self._array)
def __iter__(self):
return iter(self._array)
@staticmethod
def string_repr(self):
string = "<Contour at z={} with {} points>".format(self.z, len(self))
return string
def __repr__(self):
return self.string_repr(self)
def __str__(self):
return self.string_repr(self)
[docs]class AmiraDataStream(object):
"""Base class for all Amira DataStreams"""
match = None
regex = None
bytes_per_datatype = 4
dimension = 1
datatype = None
find_type = FIND['decimal']
def __init__(self, amira_header, data_pointer, stream_data):
self._amira_header = amira_header
self._data_pointer = data_pointer
self._stream_data = stream_data
self._decoded_length = 0
@property
def header(self):
"""An :py:class:`ahds.header.AmiraHeader` object"""
return self._amira_header
@property
def data_pointer(self):
"""The data pointer for this data stream"""
return self._data_pointer
@property
def stream_data(self):
"""All the raw data from the file"""
return self._stream_data
@property
def encoded_data(self):
"""Encoded raw data in this stream"""
return None
@property
def decoded_data(self):
"""Decoded data for this stream"""
return None
@property
def decoded_length(self):
"""The length of the decoded stream data in relevant units e.g. tuples, integers (not bytes)"""
return self._decoded_length
@decoded_length.setter
def decoded_length(self, value):
self._decoded_length = value
def __repr__(self):
return "{} object of {:,} bytes".format(self.__class__, len(self.stream_data))
[docs]class AmiraMeshDataStream(AmiraDataStream):
"""Class encapsulating an AmiraMesh data stream"""
last_stream = False
match = 'stream'
def __init__(self, *args, **kwargs):
if self.last_stream:
self.regex = r"\n@{}\n(?P<%s>.*)" % self.match
else:
self.regex = r"\n@{}\n(?P<%s>.*)\n@{}" % self.match
super(AmiraMeshDataStream, self).__init__(*args, **kwargs)
if hasattr(self.header.definitions, 'Lattice'):
X, Y, Z = self.header.definitions.Lattice
data_size = X * Y * Z
self.decoded_length = data_size
elif hasattr(self.header.definitions, 'Vertices'):
self.decoded_length = None
elif self.header.parameters.ContentType == "\"HxSpreadSheet\"":
pass
elif self.header.parameters.ContentType == "\"SurfaceField\",":
pass
else:
raise ValueError("Unable to determine data size")
@property
def encoded_data(self):
i = self.data_pointer.data_index
m = re.search(self.regex.format(i, i + 1), self.stream_data, flags=re.S)
return m.group(self.match)
@property
def decoded_data(self):
if self.data_pointer.data_format == "HxByteRLE":
return hxbyterle_decode(self.decoded_length, self.encoded_data)
elif self.data_pointer.data_format == "HxZip":
return hxzip_decode(self.decoded_length, self.encoded_data)
elif self.header.designation.format == "ASCII":
return unpack_ascii(self.encoded_data)
elif self.data_pointer.data_format is None: # try to unpack data
return unpack_binary(self.data_pointer, self.header.definitions, self.encoded_data)
else:
return None
def to_images(self):
if hasattr(self.header.definitions, 'Lattice'):
X, Y, Z = self.header.definitions.Lattice
else:
raise ValueError("Unable to determine data size")
image_data = self.decoded_data.reshape(Z, Y, X)
imgs = ImageSet(image_data[:])
return imgs
[docs] def to_volume(self):
"""Return a 3D volume of the data"""
if hasattr(self.header.definitions, "Lattice"):
X, Y, Z = self.header.definitions.Lattice
else:
raise ValueError("Unable to determine data size")
volume = self.decoded_data.reshape(Z, Y, X)
return volume
[docs]class AmiraHxSurfaceDataStream(AmiraDataStream):
"""Base class for all HyperSurface data streams that inherits from :py:class:`ahds.data_stream.AmiraDataStream`"""
def __init__(self, *args, **kwargs):
self.regex = r"%s (?P<%s>%s+)\n" % (self.match, self.match.lower(), self.find_type)
super(AmiraHxSurfaceDataStream, self).__init__(*args, **kwargs)
self._match = re.search(self.regex, self.stream_data)
self._name = None
self._count = None
self._start_offset = None
self._end_offset = None
@property
def name(self):
return self._name
@name.setter
def name(self, value):
self._name = value
@property
def count(self):
return self._count
@count.setter
def count(self, value):
self._count = value
@property
def start_offset(self):
return self._start_offset
@start_offset.setter
def start_offset(self, value):
self._start_offset = value
@property
def end_offset(self):
return self._end_offset
@end_offset.setter
def end_offset(self, value):
self._end_offset = value
@property
def match_object(self):
return self._match
def __str__(self):
return """\
\r{} object
\r\tname: {}
\r\tcount: {}
\r\tstart_offset: {}
\r\tend_offset: {}
\r\tmatch_object: {}""".format(
self.__class__,
self.name,
self.count,
self.start_offset,
self.end_offset,
self.match_object,
)
[docs]class VoidDataStream(AmiraHxSurfaceDataStream):
def __init__(self, *args, **kwargs):
super(VoidDataStream, self).__init__(*args, **kwargs)
@property
def encoded_data(self):
return []
@property
def decoded_data(self):
return []
[docs]class NamedDataStream(VoidDataStream):
find_type = FIND['alphanum_']
def __init__(self, *args, **kwargs):
super(NamedDataStream, self).__init__(*args, **kwargs)
self.name = self.match_object.group(self.match.lower())
[docs]class ValuedDataStream(VoidDataStream):
def __init__(self, *args, **kwargs):
super(ValuedDataStream, self).__init__(*args, **kwargs)
self.count = int(self.match_object.group(self.match.lower()))
[docs]class LoadedDataStream(AmiraHxSurfaceDataStream):
def __init__(self, *args, **kwargs):
super(LoadedDataStream, self).__init__(*args, **kwargs)
self.count = int(self.match_object.group(self.match.lower()))
self.start_offset = self.match_object.end()
self.end_offset = max(
[self.start_offset, self.start_offset + self.count * (self.bytes_per_datatype * self.dimension)])
@property
def encoded_data(self):
return self.stream_data[self.start_offset:self.end_offset]
@property
def decoded_data(self):
points = struct.unpack('>' + ((self.datatype * self.dimension) * self.count), self.encoded_data)
x, y, z = (points[::3], points[1::3], points[2::3])
return zip(x, y, z)
[docs]class VerticesDataStream(LoadedDataStream):
match = "Vertices"
datatype = 'f'
dimension = 3
[docs]class NBranchingPointsDataStream(ValuedDataStream):
match = "NBranchingPoints"
[docs]class NVerticesOnCurvesDataStream(ValuedDataStream):
match = "NVerticesOnCurves"
[docs]class BoundaryCurvesDataStream(ValuedDataStream):
match = "BoundaryCurves"
[docs]class PatchesInnerRegionDataStream(NamedDataStream):
match = "InnerRegion"
[docs]class PatchesOuterRegionDataStream(NamedDataStream):
match = "OuterRegion"
[docs]class PatchesBoundaryIDDataStream(ValuedDataStream):
match = "BoundaryID"
[docs]class PatchesBranchingPointsDataStream(ValuedDataStream):
match = "BranchingPoints"
[docs]class PatchesTrianglesDataStream(LoadedDataStream):
match = "Triangles"
datatype = 'i'
dimension = 3
[docs]class PatchesDataStream(LoadedDataStream):
match = "Patches"
def __init__(self, *args, **kwargs):
super(PatchesDataStream, self).__init__(*args, **kwargs)
self._patches = dict()
for _ in xrange(self.count):
# in order of appearance
inner_region = PatchesInnerRegionDataStream(self.header, None, self.stream_data[self.start_offset:])
outer_region = PatchesOuterRegionDataStream(self.header, None, self.stream_data[self.start_offset:])
boundary_id = PatchesBoundaryIDDataStream(self.header, None, self.stream_data[self.start_offset:])
branching_points = PatchesBranchingPointsDataStream(self.header, None, self.stream_data[self.start_offset:])
triangles = PatchesTrianglesDataStream(self.header, None, self.stream_data[self.start_offset:])
patch = {
'InnerRegion': inner_region,
'OuterRegion': outer_region,
'BoundaryID': boundary_id,
'BranchingPoints': branching_points,
'Triangles': triangles,
}
if inner_region.name not in self._patches:
self._patches[inner_region.name] = [patch]
else:
self._patches[inner_region.name] += [patch]
# start searching from the end of the last search
self.start_offset = self._patches[inner_region.name][-1]['Triangles'].end_offset
self.end_offset = None
def __iter__(self):
return iter(self._patches.keys())
def __getitem__(self, index):
return self._patches[index]
def __len__(self):
return len(self._patches)
@property
def encoded_data(self):
return None
@property
def decoded_data(self):
return None
[docs]class DataStreams(object):
"""Class to encapsulate all the above functionality"""
def __init__(self, fn, *args, **kwargs):
# private attrs
self._fn = fn # property
self._amira_header = header.AmiraHeader.from_file(fn) # property
self._data_streams = dict()
self._filetype = None
self._stream_data = None
self._data_streams = self._configure()
def _configure(self):
with open(self._fn, 'rb') as f:
self._stream_data = f.read().strip('\n')
if self._amira_header.designation.filetype == "AmiraMesh":
self._filetype = "AmiraMesh"
i = 0
while i < len(self._amira_header.data_pointers.attrs) - 1: # refactor
data_pointer = getattr(self._amira_header.data_pointers, 'data_pointer_{}'.format(i + 1))
self._data_streams[i + 1] = AmiraMeshDataStream(self._amira_header, data_pointer,
self._stream_data)
i += 1
AmiraMeshDataStream.last_stream = True
data_pointer = getattr(self._amira_header.data_pointers, 'data_pointer_{}'.format(i + 1))
self._data_streams[i + 1] = AmiraMeshDataStream(self._amira_header, data_pointer, self._stream_data)
# reset AmiraMeshDataStream.last_stream
AmiraMeshDataStream.last_stream = False
elif self._amira_header.designation.filetype == "HyperSurface":
self._filetype = "HyperSurface"
if self._amira_header.designation.format == "BINARY":
self._data_streams['Vertices'] = VerticesDataStream(self._amira_header, None, self._stream_data)
self._data_streams['NBranchingPoints'] = NBranchingPointsDataStream(self._amira_header, None,
self._stream_data)
self._data_streams['NVerticesOnCurves'] = NVerticesOnCurvesDataStream(self._amira_header, None,
self._stream_data)
self._data_streams['BoundaryCurves'] = BoundaryCurvesDataStream(self._amira_header, None,
self._stream_data)
self._data_streams['Patches'] = PatchesDataStream(self._amira_header, None, self._stream_data)
elif self._amira_header.designation.format == "ASCII":
self._data_streams['Vertices'] = VerticesDataStream(self._amira_header, None, self._stream_data)
print self._data_streams['Vertices']
# f.seek(self._data_streams['Vertices'].start_offset)
# print f.readline(),
# print f.readline(),
# print f.readline(),
# print f.readline(),
return self._data_streams
@property
def file(self):
return self._fn
@property
def header(self):
return self._amira_header
@property
def stream_data(self):
return self._stream_data
@property
def filetype(self):
return self._filetype
def __iter__(self):
return iter(self._data_streams.values())
def __len__(self):
return len(self._data_streams)
def __getitem__(self, key):
return self._data_streams[key]
def __repr__(self):
return "{} object with {} stream(s): {}".format(
self.__class__,
len(self),
", ".join(map(str, self._data_streams.keys())),
)