Source code for ahds.data_stream

# -*- coding: utf-8 -*-
"""
data_stream
===========

Classes that define data streams (DataList) in Amira (R) files

There are two main types of data streams:

* `AmiraMeshDataStream` is for `AmiraMesh` files
* `AmiraHxSurfaceDataStream` is for `HxSurface` files

Both classes inherit from `AmiraDataStream` class, which handles common functionality such as:

* initialisation with the header metadata
* the `get_data` method calls each subclass's `_decode` method

"""
from __future__ import print_function

import re
import sys
# todo: remove as soon as DataStreams class is removed
import warnings
import zlib

import numpy as np


from .core import _dict_iter_keys, _dict_iter_values, ListBlock, deprecated
from .grammar import _hyper_surface_file

# definition of numpy data types with dedicated endianess and number of bits
# they are used by the below lookup table
_np_ubytebig = np.dtype(np.uint8).newbyteorder('>')
_np_ubytelittle = np.dtype(np.uint8).newbyteorder('<')
_np_bytebig = np.dtype(np.int8).newbyteorder('>')
_np_bytelittle = np.dtype(np.int8).newbyteorder('<')
_np_shortlittle = np.dtype(np.int16).newbyteorder('<')
_np_shortbig = np.dtype(np.int16).newbyteorder('>')
_np_ushortlittle = np.dtype(np.uint16).newbyteorder('<')
_np_ushortbig = np.dtype(np.uint16).newbyteorder('>')
_np_intlittle = np.dtype(np.int32).newbyteorder('<')
_np_intbig = np.dtype(np.int32).newbyteorder('>')
_np_uintlittle = np.dtype(np.uint32).newbyteorder('<')
_np_uintbig = np.dtype(np.uint32).newbyteorder('>')
_np_longlittle = np.dtype(np.int64).newbyteorder('<')
_np_longbig = np.dtype(np.int64).newbyteorder('>')
_np_ulonglittle = np.dtype(np.uint64).newbyteorder('<')
_np_ulongbig = np.dtype(np.uint64).newbyteorder('>')
_np_floatbig = np.dtype(np.float32).newbyteorder('>')
_np_floatlittle = np.dtype(np.float32).newbyteorder('<')
_np_doublebig = np.dtype(np.float64).newbyteorder('>')
_np_doublelittle = np.dtype(np.float64).newbyteorder('<')
_np_complexbig = np.dtype(np.complex64).newbyteorder('>')
_np_complexlittle = np.dtype(np.complex64).newbyteorder('<')
_np_char = np.dtype(np.string_)

"""
lookuptable of the different data types occurring within
AmiraMesh and HyperSurface files grouped by their endianess
which is defined according to Amira Referenceguide [1] as follows

 * BINARY:               for bigendian encoded streams,

 * BINARY-LITTLE-ENDIAN: for little endian encoded streams and

 * ASCII:                for human readable encoded data

The lookup table is split into three sections:

 * True:  for all littleendian data types

 * False: for all bigendian data types and

 * the direct section mapping to the default numpy types for
   reading ASCII encoded data which have no specific endianness besides the
   bigendian characteristic intrinsic to decimal numbers.

[1] pp 519-525 # downloaded Dezember 2018 from 
   http://www1.udel.edu/ctcr/sites/udel.edu.ctcr/files/Amira%20Reference%20Guide.pdf
"""
_type_map = {
    True: {
        'byte': _np_ubytelittle,
        'ubyte': _np_ubytelittle,
        'short': _np_shortlittle,
        'ushort': _np_ushortlittle,
        'int': _np_intlittle,
        'uint': _np_uintlittle,
        'long': _np_longlittle,
        'ulong': _np_ulonglittle,
        'uint64': _np_ulonglittle,
        'float': _np_floatlittle,
        'double': _np_doublelittle,
        'complex': _np_complexlittle,
        'char': _np_char,
        'string': _np_char,
        'ascii': _np_char
    },
    False: {
        'byte': _np_ubytebig,
        'ubyte': _np_ubytebig,
        'short': _np_shortbig,
        'ushort': _np_ushortbig,
        'int': _np_intbig,
        'uint': _np_uintbig,
        'long': _np_longbig,
        'ulong': _np_ulongbig,
        'uint64': _np_ulongbig,
        'float': _np_floatbig,
        'double': _np_doublebig,
        'complex': _np_complexbig,
        'char': _np_char,
        'string': _np_char,
        'ascii': _np_char
    },
    'byte': np.dtype(np.int8),
    'ubyte': np.dtype(np.uint8),
    'short': np.dtype(np.int16),
    'ushort': np.dtype(np.uint16),
    'int': np.dtype(np.int32),
    'uint': np.dtype(np.uint32),
    'long': np.dtype(np.int64),
    'ulong': np.dtype(np.uint64),
    'uint64': np.dtype(np.uint64),
    'float': np.dtype(np.float32),
    'double': np.dtype(np.float64),
    'complex': np.dtype(np.complex64),
    'char': _np_char,
    'string': _np_char,
    'ascii': _np_char
}

# try to import native byterele_decoder binary and fallback to python implementation
try:
    # if import failed for whatever reason
    if sys.version_info[0] > 2:
        from ahds.decoders import byterle_decoder
    else:
        from .decoders import byterle_decoder
except ImportError:
[docs] def byterle_decoder(data, output_size): """If the C-ext. failed to compile or is unimportable use this slower Python equivalent :param str data: a raw stream of data to be unpacked :param int output_size: the number of items when ``data`` is uncompressed :return np.array output: an array of ``np.uint8`` """ from warnings import warn warn("using pure-Python (instead of Python C-extension) implementation of byterle_decoder") input_data = np.frombuffer(data, dtype=_np_ubytelittle, count=len(data)) output = np.zeros(output_size, dtype=np.uint8) i = 0 count = True repeat = False no = None j = 0 while i < len(input_data): if count: no = input_data[i] if no > 127: no &= 0x7f # 2's complement count = False repeat = True i += 1 continue else: i += 1 count = False repeat = False continue elif not count: if repeat: # value = input_data[i:i + no] repeat = False count = True # output[j:j+no] = np.array(value) output[j:j + no] = input_data[i:i + no] i += no j += no continue elif not repeat: # value = input_data[i] # output[j:j+no] = value output[j:j + no] = input_data[i] i += 1 j += no count = True repeat = False continue assert j == output_size return output
# define common alias for the selected byterle_decoder implementation hxbyterle_decode = byterle_decoder
[docs]def hxzip_decode(data, output_size): """Decode HxZip data stream :param str data: a raw stream of data to be unpacked :param int output_size: the number of items when ``data`` is uncompressed :return np.array output: an array of ``np.uint8`` """ return np.frombuffer(zlib.decompress(data), dtype=_np_ubytelittle, count=output_size)
[docs]def set_data_stream(name, header): """Factory function used by AmiraHeader to determine the type of data stream present""" if header.filetype == 'AmiraMesh': return AmiraMeshDataStream(name, header) elif header.filetype == 'HyperSurface': return AmiraHxSurfaceDataStream(name, header)
class AmiraDataStream(ListBlock): """""" __slots__ = ('_stream_data', '_header') def __init__(self, name, header): self._header = header # contains metadata for extracting streams self._stream_data = None super(AmiraDataStream, self).__init__(name) @property def load_stream(self): """Reports whether data streams are loaded or not""" return self._header.load_streams def get_data(self): """Decode and return the stream data in this stream""" try: assert len(self._stream_data) > 0 except AssertionError: raise ValueError('empty stream found') return self._decode(self._stream_data)
[docs]class AmiraMeshDataStream(AmiraDataStream): """Class that defines an AmiraMesh data stream"""
[docs] def read(self): """Extract the data streams from the AmiraMesh file""" with open(self._header.filename, 'rb') as f: # rewind the file pointer to the end of the header f.seek(len(self._header)) start = int(self.data_index) # this data streams index end = start + 1 if self._header.data_stream_count == start: # this is the last stream data = f.read() _regex = ".*?\n@{}\n(?P<stream>.*)\n".format(start).encode('ASCII') regex = re.compile(_regex, re.S) match = regex.match(data) _stream_data = match.group('stream') self._stream_data = _stream_data[:-1] if _stream_data[-1] == 10 else _stream_data else: data = f.read() _regex = ".*?\n@{}\n(?P<stream>.*)\n@{}\n".format(start, end).encode('ASCII') regex = re.compile(_regex, re.S) match = regex.match(data) self._stream_data = match.group('stream')
def _decode(self, data): """Performs data stream decoding by introspecting the header information""" # determine the new output shape # take into account shape and dimension if isinstance(self.shape, tuple): if self.dimension > 1: new_shape = tuple(list(self.shape) + [self.dimension]) else: new_shape = (self.shape, ) elif isinstance(self.shape, int): if self.dimension > 1: new_shape = tuple([self.shape, self.dimension]) else: new_shape = (self.shape, ) # first we handle binary files # NOTE ON HOW LATTICES ARE STORED # AmiraMesh files state the dimensions of the lattice as nx, ny, nz # The data is stored such that the first value has index (0, 0, 0) then # (1, 0, 0), ..., (nx-1, 0, 0), (0, 1, 0), (1, 1, 0), ..., (nx-1, 1, 0) # In other words, the first index changes fastest. # The last index is thus the stack index # (see pg. 720 of https://assets.thermofisher.com/TFS-Assets/MSD/Product-Guides/user-guide-amira-software.pdf if self._header.format == 'BINARY': # _type_map[endianness] uses endianness = True for endian == 'LITTLE' is_little_endian = self._header.endian == 'LITTLE' if self.format is None: return np.frombuffer( data, dtype=_type_map[is_little_endian][self.type] ).reshape(*new_shape) elif self.format == 'HxZip': return np.frombuffer( zlib.decompress(data), dtype=_type_map[is_little_endian][self.type] ).reshape(*new_shape) elif self.format == 'HxByteRLE': size = int(np.prod(np.array(self.shape))) return hxbyterle_decode( data, size ).reshape(*new_shape) else: raise ValueError('unknown data stream format: \'{}\''.format(self.format)) # explicit instead of assumption elif self._header.format == 'ASCII': return np.fromstring( data, dtype=_type_map[self.type], sep="\n \t" ).reshape(*new_shape) else: raise ValueError("unknown file format: {}".format(self._header.format))
[docs]class AmiraHxSurfaceDataStream(AmiraDataStream): """Class that defines an Amira HxSurface data stream"""
[docs] def read(self): """Extract the data streams from the HxSurface file""" with open(self._header.filename, 'rb') as f: # rewind the file pointer to the end of the header f.seek(len(self._header)) data = f.read() # get the vertex count and streams _vertices_regex = r".*?\n" \ r"Vertices (?P<vertex_count>\d+)\n" \ r"(?P<streams>.*)".encode('ASCII') vertices_regex = re.compile(_vertices_regex, re.S) match_vertices = vertices_regex.match(data) # todo: fix for full.surf and simple.surf # print(f"streams: {match_vertices.group('streams')}") vertex_count = int(match_vertices.group('vertex_count')) # get the patches # fixme: general case for NBranchingPoints, NVerticesOnCurves, BoundaryCurves being non-zero stream_regex = r"(?P<vertices>.*?)\n" \ r"NBranchingPoints (?P<branching_point_count>\d+)\n" \ r"NVerticesOnCurves (?P<vertices_on_curves_count>\d+)\n" \ r"BoundaryCurves (?P<boundary_curve_count>\d+)\n" \ r"Patches (?P<patch_count>\d+)\n" \ r"(?P<patches>.*)".encode('ASCII') match_streams = re.match(stream_regex, match_vertices.group('streams'), re.S) # instatiate the vertex block vertices_block = AmiraHxSurfaceDataStream('Vertices', self._header) # set the data for this stream vertices_block._stream_data = match_streams.group('vertices') # length, type and dimension are needed for decoding vertices_block.add_attr('length', vertex_count) vertices_block.add_attr('type', 'float') vertices_block.add_attr('dimension', 3) vertices_block.add_attr('data', vertices_block.get_data()) vertices_block.add_attr('NBranchingPoints', 0) vertices_block.add_attr('NVerticesOnCurves', 0) vertices_block.add_attr('BoundaryCurves', 0) # instantiate the patches block patches_block = AmiraHxSurfaceDataStream('Patches', self._header) patch_count = int(match_streams.group('patch_count')) patches_block.add_attr('length', patch_count) # get the triangles and contents of each patch # fixme: general case for BoundaryID, BranchingPoints being non-zero # i've not seen an example with loaded fields # todo: consider compiling regular expressions # NOTE: # There is a subtlety with this regex: # It might be the case that the last part matches bytes that end in '\n[}]\n' # that are not the end of the stream. The only way to remede this is to include the # extra brace [{] so that it now matches '\n[}]\n[{]', which is more likely to # correspond to the end of the patch. However this introduces a problem: # we will not be able to match the last patch unless we also add [{] to the stream to match. # This also means that start_from argument will be wrong given that it will have past # the starting point of the next patch. This is trivial to solve because we simply # backtrack start_from by 1. # These are noted in NOTE A and NOTE B below. _patch_regex = r"[{]\n" \ r"InnerRegion (?P<patch_inner_region>.*?)\n" \ r"OuterRegion (?P<patch_outer_region>.*?)\n" \ r"BoundaryID (?P<patch_boundary_id>\d+)\n" \ r"BranchingPoints (?P<patch_branching_points>\d+)\n" \ r"\s+\n" \ r"Triangles (?P<triangle_count>.*?)\n" \ r"(?P<triangles>.*?)\n" \ r"[}]\n[{]".encode('ASCII') patch_regex = re.compile(_patch_regex, re.S) # start from the beginning start_from = 0 for p_id in range(patch_count): # NOTE A match_patch = patch_regex.match(match_streams.group('patches') + b'{', start_from) patch_block = AmiraHxSurfaceDataStream('Patch', self._header) patch_block.add_attr('InnerRegion', match_patch.group('patch_inner_region').decode('utf-8')) patch_block.add_attr('OuterRegion', match_patch.group('patch_outer_region').decode('utf-8')) patch_block.add_attr('BoundaryID', int(match_patch.group('patch_boundary_id'))) patch_block.add_attr('BranchingPoints', int(match_patch.group('patch_branching_points'))) # let's now add the triangles from the patch triangles_block = AmiraHxSurfaceDataStream('Triangles', self._header) # set the raw data stream triangles_block._stream_data = match_patch.group('triangles') # decoding needs to have the length, type, and dimension triangles_block.add_attr('length', int(match_patch.group('triangle_count'))) triangles_block.add_attr('type', 'int') triangles_block.add_attr('dimension', 3) # print('debug:', int(match_patch.group('triangle_count')), len(match_patch.group('triangles'))) # print('debug:', match_patch.group('triangles')[:20]) # print('debug:', match_patch.group('triangles')[-20:]) triangles_block.add_attr('data', triangles_block.get_data()) # now we can add the triangles block to the patch... patch_block.add_attr(triangles_block) # then we collate the patches patches_block.append(patch_block) # the next patch begins where the last patch ended # NOTE B start_from = match_patch.end() - 1 # backtrack by 1 # add the patches to the vertices vertices_block.add_attr(patches_block) # add the vertices to the data stream self.add_attr(vertices_block)
def _decode(self, data): is_little_endian = self._header.endian == 'LITTLE' if self._header.format == 'BINARY': return np.frombuffer( data, dtype=_type_map[is_little_endian][self.type] ).reshape(self.length, self.dimension) elif self._header.format == 'ASCII': return np.fromstring( data, dtype=_type_map[self.type], sep="\n \t" ).reshape(self.length, self.dimension)
@deprecated( "DataStreams class is obsolete, access data using stream_data and data attributes of corresponding metadata block attributes of AmiraHeader instance") class DataStreams(object): __slots__ = ("_header", "__stream_data") def __init__(self, header): self._header = header with warnings.catch_warnings(): warnings.simplefilter('ignore') if self._header.filetype == "AmiraMesh": self.__stream_data = self._header.data_pointers else: self.__stream_data = dict() for _streamblock in _dict_iter_keys(_hyper_surface_file): _streamlist = _streamblock # self._header._stream_loader.__class__._group_array_map.get(_streamblock,_streamblock).format('List') _streamlist = getattr(self._header, _streamlist, None) if _streamlist is None: continue self.__stream_data[_streamblock] = _streamlist def __getattribute__(self, attr): if attr in ("file", "header", "stream_data", "filetype"): return super(DataStreams, self).__getattribute__(attr)() return super(DataStreams, self).__getattribute__(attr) @deprecated("use <AmiraHeader>.filename instead") def file(self): return self._header.filename @deprecated("use AmiraHeader instance directly") def header(self): return self._header @deprecated( "access data of individual streams through corresponding attributes and dedicated stream_data and data attributes of meta data blocks") def stream_data(self): return self.__stream_data @deprecated("use <AmiraHeader>.filetype attribute instead") def filetype(self): return self._header.filetype @deprecated def __len__(self): return len(self.__stream_data) @deprecated def __iter__(self): return iter(_dict_iter_values(self.__stream_data)) @deprecated def __getitem__(self, key): return self.__stream_data[key] @deprecated def __repr__(self): return "{} object with {} stream(s): {} ".format( self.__class__, len(self), ", ".join(_dict_iter_keys(self.__stream_data)) )